Skip to main content

rsigma_eval/
field_observer.rs

1//! Opt-in observer that records every field name seen at evaluation
2//! time so consumers can report which event fields are not referenced
3//! by any loaded rule (gap signal) and which rule fields have never
4//! been seen in events (broken-coverage signal).
5//!
6//! Lives in `rsigma-eval` because the observer only depends on the
7//! [`Event`] trait. The daemon (`rsigma-runtime` + `rsigma-cli`'s
8//! `engine daemon`) and the one-shot evaluator (`rsigma-cli`'s
9//! `engine eval`) both consume the same type so the report shape is
10//! consistent across runtimes.
11//!
12//! # Design
13//!
14//! - Backed by a `std::sync::Mutex<HashMap<Arc<str>, u64>>`. The mutex
15//!   is only held long enough to bump or insert a counter; the lock
16//!   never wraps user code that could panic, so poisoning is
17//!   effectively impossible in practice and the `lock().unwrap()`
18//!   calls below treat poisoning as a programmer bug.
19//! - A hard cap (`max_keys`) bounds memory. Once the cap is reached
20//!   new keys are dropped and the `overflow_dropped` counter is
21//!   incremented; existing counters keep updating so the observer
22//!   keeps surfacing high-frequency keys even on a saturated set.
23//! - The observer is opt-in: callers construct an `Arc<FieldObserver>`
24//!   only when their `--observe-fields` flag is set. When unset the
25//!   observation call sites stay unwired and the hot path is
26//!   untouched.
27//! - Keys are stored as `Arc<str>` so a snapshot only refcount-bumps
28//!   each key rather than copying the string. Trade: one extra
29//!   allocation per first-time-insert (Arc header) in exchange for
30//!   near-free repeated snapshots.
31//! - Lifetime counters (`lifetime_events_observed`,
32//!   `lifetime_overflow_dropped`) are monotonic across resets so
33//!   Prometheus counter bridges don't desync when the daemon resets
34//!   the observer via `DELETE /api/v1/fields/observer`.
35//!
36//! # Coordinates
37//!
38//! - Iterate [`Event::field_keys`] once per event before evaluation.
39//!   For JSON events this is a recursive walk that allocates one
40//!   `String` per leaf path (dot-joined paths are not substrings of
41//!   the source value); for flat formats like `KvEvent` the override
42//!   returns `Cow::Borrowed`. The cost is acceptable in the opt-in
43//!   diagnostic mode but is not free.
44//! - Render the gap and broken-coverage signals by joining a
45//!   [`FieldObserver::snapshot`] against a [`RuleFieldSet`].
46
47use std::collections::{HashMap, HashSet};
48use std::sync::Arc;
49use std::sync::Mutex;
50use std::sync::atomic::{AtomicU64, Ordering};
51use std::time::Instant;
52
53use crate::event::Event;
54use crate::fields::{FieldOrigin, RuleFieldSet};
55
56/// Single field-name counter as exposed via the snapshot API.
57///
58/// The field name is held as `Arc<str>` so snapshotting only bumps a
59/// refcount rather than copying every key out of the observer's
60/// internal map. Treat as a string slice for read access:
61/// `entry.field.as_ref()` or `&*entry.field`.
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct FieldObservationEntry {
64    /// Dot-joined field path (matches what `Event::field_keys` returns).
65    pub field: Arc<str>,
66    /// Number of events that contained this field since the last reset.
67    pub count: u64,
68}
69
70/// Immutable snapshot of an observer's state at one moment in time.
71///
72/// Returned by [`FieldObserver::snapshot`]; consumers (the daemon's
73/// HTTP handlers, the `engine eval` report writer) render coverage
74/// reports from this against a [`RuleFieldSet`].
75#[derive(Debug, Clone, Default)]
76pub struct FieldObservation {
77    /// Per-field counters, sorted by descending count then ascending name.
78    pub entries: Vec<FieldObservationEntry>,
79    /// Number of events evaluated by the observer since construction or
80    /// the last reset.
81    pub events_observed: u64,
82    /// Distinct field names tracked (saturates at `max_keys`).
83    pub unique_keys: usize,
84    /// Number of insert attempts dropped because the observer was at
85    /// capacity since the last reset.
86    pub overflow_dropped: u64,
87    /// Lifetime total of events evaluated since the observer was
88    /// constructed, ignoring resets. Drives Prometheus counters, which
89    /// must be monotonic.
90    pub lifetime_events_observed: u64,
91    /// Lifetime total of insert attempts dropped because the observer
92    /// was at capacity, ignoring resets. Drives Prometheus counters.
93    pub lifetime_overflow_dropped: u64,
94    /// Configured ceiling for distinct keys.
95    pub max_keys: usize,
96    /// Seconds since the observer was created (or last reset).
97    pub uptime_seconds: f64,
98}
99
100/// Capped, opt-in field-name counter shared across producers (the
101/// daemon's event task, the eval streaming loop) and consumers (the
102/// daemon's HTTP handlers, the eval report writer).
103pub struct FieldObserver {
104    inner: Mutex<HashMap<Arc<str>, u64>>,
105    max_keys: usize,
106    /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset"
107    /// view exposed in [`FieldObservation::overflow_dropped`].
108    overflow_dropped: AtomicU64,
109    /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset"
110    /// view exposed in [`FieldObservation::events_observed`].
111    events_observed: AtomicU64,
112    /// Monotonic. Never reset. Drives the Prometheus counter bridge so
113    /// the lifetime metric stays consistent across observer resets.
114    lifetime_events_observed: AtomicU64,
115    /// Monotonic. Never reset. Drives the Prometheus counter bridge.
116    lifetime_overflow_dropped: AtomicU64,
117    start: Mutex<Instant>,
118}
119
120impl FieldObservation {
121    /// Join the snapshot against a [`RuleFieldSet`] and return the
122    /// partitioned coverage view in a single pass.
123    ///
124    /// Returned references borrow from `self` (the entries) and the
125    /// supplied `rule_field_set` (the missing entries), so this is
126    /// allocation-light: one `Vec` for the unknown borrows, one `Vec`
127    /// for the missing borrows, one `HashSet` for the seen lookup.
128    ///
129    /// Centralises the logic shared between the daemon's
130    /// `GET /api/v1/fields*` handlers and the `engine eval` end-of-run
131    /// report so the two surfaces cannot drift on field semantics.
132    pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> {
133        let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new();
134        let mut intersection_count: usize = 0;
135        let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len());
136        for entry in &self.entries {
137            let field: &str = &entry.field;
138            seen.insert(field);
139            if rule_field_set.contains(field) {
140                intersection_count += 1;
141            } else {
142                unknown.push(entry);
143            }
144        }
145        let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set
146            .iter()
147            .filter(|(name, _)| !seen.contains(name))
148            .collect();
149        FieldCoverage {
150            unknown,
151            intersection_count,
152            missing,
153        }
154    }
155}
156
157/// Borrowed view over a [`FieldObservation`] joined against a
158/// [`RuleFieldSet`]. Produced by [`FieldObservation::coverage`].
159///
160/// Consumers (the daemon HTTP handlers, the eval report writer) own
161/// the JSON shape; this struct only provides the partitioned data.
162pub struct FieldCoverage<'a> {
163    /// Observed fields not referenced by any loaded rule (gap signal).
164    /// Ordered the same way as [`FieldObservation::entries`]: by
165    /// descending count, then ascending name.
166    pub unknown: Vec<&'a FieldObservationEntry>,
167    /// Count of observed fields that *are* rule-referenced.
168    pub intersection_count: usize,
169    /// Rule field names that have not appeared in any observed event
170    /// (broken-coverage signal), paired with their [`FieldOrigin`] so
171    /// consumers can render rule titles and source kinds.
172    pub missing: Vec<(&'a str, &'a FieldOrigin)>,
173}
174
175impl FieldObserver {
176    /// Create a new observer with the given upper bound on distinct keys.
177    ///
178    /// A `max_keys` of 0 is allowed and disables tracking entirely; every
179    /// observed field counts as overflow. Callers wanting "no cap" should
180    /// pick a large finite number (e.g. `usize::MAX / 2`).
181    pub fn new(max_keys: usize) -> Self {
182        Self {
183            inner: Mutex::new(HashMap::new()),
184            max_keys,
185            overflow_dropped: AtomicU64::new(0),
186            events_observed: AtomicU64::new(0),
187            lifetime_events_observed: AtomicU64::new(0),
188            lifetime_overflow_dropped: AtomicU64::new(0),
189            start: Mutex::new(Instant::now()),
190        }
191    }
192
193    /// Walk the event's field keys and update the per-field counters.
194    ///
195    /// Insertion of a new key is skipped once the observer is at capacity;
196    /// already-tracked keys keep counting. The method takes `&self`, so
197    /// the observer can be shared behind an `Arc` without locking from
198    /// the caller's side.
199    pub fn observe<E: Event + ?Sized>(&self, event: &E) {
200        self.events_observed.fetch_add(1, Ordering::Relaxed);
201        self.lifetime_events_observed
202            .fetch_add(1, Ordering::Relaxed);
203        let keys = event.field_keys();
204        if keys.is_empty() {
205            return;
206        }
207        let mut overflow_local = 0u64;
208        let mut counts = self.inner.lock().expect("field observer mutex poisoned");
209        for key in keys {
210            if let Some(slot) = counts.get_mut(key.as_ref()) {
211                *slot = slot.saturating_add(1);
212            } else if counts.len() < self.max_keys {
213                counts.insert(Arc::<str>::from(key.as_ref()), 1);
214            } else {
215                overflow_local = overflow_local.saturating_add(1);
216            }
217        }
218        drop(counts);
219        if overflow_local > 0 {
220            self.overflow_dropped
221                .fetch_add(overflow_local, Ordering::Relaxed);
222            self.lifetime_overflow_dropped
223                .fetch_add(overflow_local, Ordering::Relaxed);
224        }
225    }
226
227    /// Snapshot the current counts. Entries are sorted by descending
228    /// count, then by ascending name for deterministic output.
229    ///
230    /// Cheap relative to the cardinality of the observer: each entry
231    /// only refcount-clones the `Arc<str>` key rather than copying the
232    /// key bytes, so a 10 000-key snapshot costs ~10 000 atomic
233    /// increments plus one `Vec` allocation, not 10 000 `String`
234    /// allocations.
235    pub fn snapshot(&self) -> FieldObservation {
236        let counts = self.inner.lock().expect("field observer mutex poisoned");
237        let mut entries: Vec<FieldObservationEntry> = counts
238            .iter()
239            .map(|(k, v)| FieldObservationEntry {
240                field: Arc::clone(k),
241                count: *v,
242            })
243            .collect();
244        let unique_keys = entries.len();
245        drop(counts);
246        entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field)));
247        FieldObservation {
248            entries,
249            events_observed: self.events_observed.load(Ordering::Relaxed),
250            unique_keys,
251            overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed),
252            lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed),
253            lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed),
254            max_keys: self.max_keys,
255            uptime_seconds: self
256                .start
257                .lock()
258                .expect("field observer start mutex poisoned")
259                .elapsed()
260                .as_secs_f64(),
261        }
262    }
263
264    /// Reset every counter and the overflow tally. Returns the previous
265    /// `(unique_keys, events_observed)` pair so the API endpoint can
266    /// report what was cleared.
267    pub fn reset(&self) -> (usize, u64) {
268        let mut counts = self.inner.lock().expect("field observer mutex poisoned");
269        let previous_keys = counts.len();
270        counts.clear();
271        drop(counts);
272        let previous_events = self.events_observed.swap(0, Ordering::Relaxed);
273        self.overflow_dropped.store(0, Ordering::Relaxed);
274        *self
275            .start
276            .lock()
277            .expect("field observer start mutex poisoned") = Instant::now();
278        (previous_keys, previous_events)
279    }
280
281    /// Total events observed since the observer was created or last reset.
282    pub fn events_observed(&self) -> u64 {
283        self.events_observed.load(Ordering::Relaxed)
284    }
285
286    /// Lifetime total of events observed since the observer was
287    /// constructed, ignoring resets. Monotonic; suitable for driving
288    /// Prometheus counters.
289    pub fn lifetime_events_observed(&self) -> u64 {
290        self.lifetime_events_observed.load(Ordering::Relaxed)
291    }
292
293    /// Distinct keys currently tracked (does not include overflow drops).
294    pub fn unique_keys(&self) -> usize {
295        self.inner
296            .lock()
297            .expect("field observer mutex poisoned")
298            .len()
299    }
300
301    /// Insert attempts dropped because the observer was at capacity
302    /// since the last reset.
303    pub fn overflow_dropped(&self) -> u64 {
304        self.overflow_dropped.load(Ordering::Relaxed)
305    }
306
307    /// Lifetime total of insert attempts dropped because the observer
308    /// was at capacity, ignoring resets. Monotonic.
309    pub fn lifetime_overflow_dropped(&self) -> u64 {
310        self.lifetime_overflow_dropped.load(Ordering::Relaxed)
311    }
312
313    /// Configured per-observer ceiling for distinct keys.
314    pub fn max_keys(&self) -> usize {
315        self.max_keys
316    }
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use crate::event::JsonEvent;
323    use serde_json::json;
324
325    #[test]
326    fn observes_flat_json_fields() {
327        let observer = FieldObserver::new(100);
328        let v = json!({"CommandLine": "whoami", "User": "admin"});
329        observer.observe(&JsonEvent::borrow(&v));
330        let snap = observer.snapshot();
331        assert_eq!(snap.events_observed, 1);
332        assert_eq!(snap.unique_keys, 2);
333        assert_eq!(snap.overflow_dropped, 0);
334        let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
335        assert!(names.contains(&"CommandLine"));
336        assert!(names.contains(&"User"));
337    }
338
339    #[test]
340    fn observes_nested_json_with_dotted_leaves() {
341        let observer = FieldObserver::new(100);
342        let v = json!({"actor": {"id": "u1"}});
343        observer.observe(&JsonEvent::borrow(&v));
344        let snap = observer.snapshot();
345        let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
346        // Only the leaf is observed; the intermediate `actor` is not.
347        // This keeps the gap signal free of false positives on objects
348        // whose children are rule-referenced.
349        assert!(names.contains(&"actor.id"));
350        assert!(!names.contains(&"actor"));
351    }
352
353    #[test]
354    fn counts_accumulate_across_observations() {
355        let observer = FieldObserver::new(100);
356        for _ in 0..5 {
357            let v = json!({"CommandLine": "whoami"});
358            observer.observe(&JsonEvent::borrow(&v));
359        }
360        let snap = observer.snapshot();
361        assert_eq!(snap.events_observed, 5);
362        let entry = snap
363            .entries
364            .iter()
365            .find(|e| &*e.field == "CommandLine")
366            .expect("CommandLine tracked");
367        assert_eq!(entry.count, 5);
368    }
369
370    #[test]
371    fn cap_enforced_and_overflow_recorded() {
372        let observer = FieldObserver::new(2);
373        let v = json!({"a": 1, "b": 2, "c": 3, "d": 4});
374        observer.observe(&JsonEvent::borrow(&v));
375        let snap = observer.snapshot();
376        assert_eq!(snap.unique_keys, 2);
377        assert_eq!(snap.overflow_dropped, 2);
378        // Existing keys keep counting after cap is hit:
379        observer.observe(&JsonEvent::borrow(&v));
380        let snap2 = observer.snapshot();
381        assert_eq!(snap2.unique_keys, 2);
382        assert_eq!(snap2.overflow_dropped, 4);
383        for entry in &snap2.entries {
384            assert_eq!(entry.count, 2, "tracked key counter advanced");
385        }
386    }
387
388    #[test]
389    fn snapshot_sorts_by_count_desc_then_name() {
390        let observer = FieldObserver::new(100);
391        for _ in 0..3 {
392            observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
393        }
394        observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
395        observer.observe(&JsonEvent::borrow(&json!({"chill": 1})));
396        let snap = observer.snapshot();
397        let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
398        assert_eq!(order, vec!["hot", "chill", "warm"]);
399    }
400
401    #[test]
402    fn reset_clears_state_and_returns_previous_counts() {
403        let observer = FieldObserver::new(100);
404        observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2})));
405        observer.observe(&JsonEvent::borrow(&json!({"a": 1})));
406        let (prev_keys, prev_events) = observer.reset();
407        assert_eq!(prev_keys, 2);
408        assert_eq!(prev_events, 2);
409        let snap = observer.snapshot();
410        assert_eq!(snap.events_observed, 0);
411        assert_eq!(snap.unique_keys, 0);
412        assert_eq!(snap.overflow_dropped, 0);
413        assert!(snap.entries.is_empty());
414    }
415
416    #[test]
417    fn lifetime_counters_survive_reset() {
418        // Regression: the Prometheus counter bridge relies on monotonic
419        // lifetime totals. Resetting the observer must not lose data
420        // points that the next /metrics scrape needs to see.
421        let observer = FieldObserver::new(2);
422        // 3 events, 4 unique fields => 2 fit, 2 overflow per event.
423        for _ in 0..3 {
424            observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
425        }
426        let before = observer.snapshot();
427        assert_eq!(before.events_observed, 3);
428        assert_eq!(before.lifetime_events_observed, 3);
429        assert_eq!(before.overflow_dropped, 6);
430        assert_eq!(before.lifetime_overflow_dropped, 6);
431
432        observer.reset();
433        let after_reset = observer.snapshot();
434        assert_eq!(after_reset.events_observed, 0);
435        assert_eq!(after_reset.overflow_dropped, 0);
436        // Lifetime totals MUST NOT reset:
437        assert_eq!(after_reset.lifetime_events_observed, 3);
438        assert_eq!(after_reset.lifetime_overflow_dropped, 6);
439
440        // Continue observing; lifetime keeps climbing from where it was.
441        observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
442        let after = observer.snapshot();
443        assert_eq!(after.events_observed, 1);
444        assert_eq!(after.lifetime_events_observed, 4);
445        assert_eq!(after.overflow_dropped, 2);
446        assert_eq!(after.lifetime_overflow_dropped, 8);
447    }
448
449    #[test]
450    fn plain_event_observation_is_a_noop_for_counters() {
451        let observer = FieldObserver::new(100);
452        let plain = crate::event::PlainEvent::new("disk full".into());
453        observer.observe(&plain);
454        let snap = observer.snapshot();
455        // events_observed still ticks: the observer saw the event but it had
456        // no structured fields to record.
457        assert_eq!(snap.events_observed, 1);
458        assert_eq!(snap.unique_keys, 0);
459        assert_eq!(snap.overflow_dropped, 0);
460    }
461
462    #[test]
463    fn coverage_partitions_observed_against_rule_set() {
464        // Two rules: one references CommandLine (also matches the event),
465        // the other references ProcessGuid (never appears in the event).
466        // Event carries CommandLine plus two unrelated fields.
467        let yaml = r#"
468title: Whoami
469status: test
470logsource:
471    category: test
472detection:
473    selection:
474        CommandLine|contains: whoami
475    condition: selection
476---
477title: Process Tampering
478status: test
479logsource:
480    category: test
481detection:
482    selection:
483        ProcessGuid: "{abc}"
484    condition: selection
485"#;
486        let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
487        let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
488        let observer = FieldObserver::new(100);
489        observer.observe(&JsonEvent::borrow(
490            &json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}),
491        ));
492
493        let snap = observer.snapshot();
494        let coverage = snap.coverage(&rule_field_set);
495
496        assert_eq!(coverage.intersection_count, 1, "CommandLine intersects");
497        let unknown: Vec<&str> = coverage
498            .unknown
499            .iter()
500            .map(|e| -> &str { &e.field })
501            .collect();
502        assert!(unknown.contains(&"User"));
503        assert!(unknown.contains(&"src_ip"));
504        assert!(!unknown.contains(&"CommandLine"));
505        let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect();
506        assert!(
507            missing.contains(&"ProcessGuid"),
508            "ProcessGuid was rule-referenced but never observed"
509        );
510        assert!(!missing.contains(&"CommandLine"));
511    }
512
513    #[test]
514    fn coverage_empty_observer_yields_only_missing() {
515        let yaml = r#"
516title: A
517status: test
518logsource:
519    category: test
520detection:
521    selection:
522        FieldA: x
523    condition: selection
524"#;
525        let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
526        let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
527        let observer = FieldObserver::new(100);
528
529        let snap = observer.snapshot();
530        let coverage = snap.coverage(&rule_field_set);
531        assert_eq!(coverage.intersection_count, 0);
532        assert!(coverage.unknown.is_empty());
533        assert_eq!(coverage.missing.len(), 1);
534        assert_eq!(coverage.missing[0].0, "FieldA");
535    }
536
537    #[test]
538    fn coverage_unknown_preserves_snapshot_ordering() {
539        let observer = FieldObserver::new(100);
540        for _ in 0..3 {
541            observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
542        }
543        observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
544        let empty_rule_set = crate::fields::RuleFieldSet::default();
545
546        let snap = observer.snapshot();
547        let coverage = snap.coverage(&empty_rule_set);
548        let order: Vec<&str> = coverage
549            .unknown
550            .iter()
551            .map(|e| -> &str { &e.field })
552            .collect();
553        // Snapshot is already sorted by descending count then ascending
554        // name; coverage's filter-only pass must preserve that order.
555        assert_eq!(order, vec!["hot", "warm"]);
556    }
557}