Skip to main content

rsigma_eval/
field_observer.rs

1//! Opt-in observer that records every field name seen at evaluation
2//! time so consumers can report which event fields are not referenced
3//! by any loaded rule (gap signal) and which rule fields have never
4//! been seen in events (broken-coverage signal).
5//!
6//! Lives in `rsigma-eval` because the observer only depends on the
7//! [`Event`] trait. The daemon (`rsigma-runtime` + `rsigma-cli`'s
8//! `engine daemon`) and the one-shot evaluator (`rsigma-cli`'s
9//! `engine eval`) both consume the same type so the report shape is
10//! consistent across runtimes.
11//!
12//! # Design
13//!
14//! - Backed by a `std::sync::Mutex<HashMap<Arc<str>, u64>>`. The mutex
15//!   is only held long enough to bump or insert a counter; the lock
16//!   never wraps user code that could panic, so poisoning is
17//!   effectively impossible in practice and the `lock().unwrap()`
18//!   calls below treat poisoning as a programmer bug.
19//! - A hard cap (`max_keys`) bounds memory. Once the cap is reached
20//!   new keys are dropped and the `overflow_dropped` counter is
21//!   incremented; existing counters keep updating so the observer
22//!   keeps surfacing high-frequency keys even on a saturated set.
23//! - The observer is opt-in: callers construct an `Arc<FieldObserver>`
24//!   only when their `--observe-fields` flag is set. When unset the
25//!   observation call sites stay unwired and the hot path is
26//!   untouched.
27//! - Keys are stored as `Arc<str>` so a snapshot only refcount-bumps
28//!   each key rather than copying the string. Trade: one extra
29//!   allocation per first-time-insert (Arc header) in exchange for
30//!   near-free repeated snapshots.
31//! - Lifetime counters (`lifetime_events_observed`,
32//!   `lifetime_overflow_dropped`) are monotonic across resets so
33//!   Prometheus counter bridges don't desync when the daemon resets
34//!   the observer via `DELETE /api/v1/fields/observer`.
35//!
36//! # Coordinates
37//!
38//! - Iterate [`Event::field_keys`](crate::Event::field_keys) once per
39//!   event before evaluation. For JSON events this is a recursive
40//!   walk that allocates one `String` per leaf path (dot-joined paths
41//!   are not substrings of the source value); for flat formats like
42//!   `KvEvent` the override returns `Cow::Borrowed`. The cost is
43//!   acceptable in the opt-in diagnostic mode but is not free.
44//! - Render the gap and broken-coverage signals by joining a
45//!   [`snapshot`](FieldObserver::snapshot) against a
46//!   [`RuleFieldSet`](crate::RuleFieldSet).
47
48use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::sync::Mutex;
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::time::Instant;
53
54use crate::event::Event;
55use crate::fields::{FieldOrigin, RuleFieldSet};
56
57/// Single field-name counter as exposed via the snapshot API.
58///
59/// The field name is held as `Arc<str>` so snapshotting only bumps a
60/// refcount rather than copying every key out of the observer's
61/// internal map. Treat as a string slice for read access:
62/// `entry.field.as_ref()` or `&*entry.field`.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct FieldObservationEntry {
65    /// Dot-joined field path (matches what `Event::field_keys` returns).
66    pub field: Arc<str>,
67    /// Number of events that contained this field since the last reset.
68    pub count: u64,
69}
70
71/// Immutable snapshot of an observer's state at one moment in time.
72///
73/// Returned by [`FieldObserver::snapshot`]; consumers (the daemon's
74/// HTTP handlers, the `engine eval` report writer) render coverage
75/// reports from this against a [`RuleFieldSet`](crate::RuleFieldSet).
76#[derive(Debug, Clone, Default)]
77pub struct FieldObservation {
78    /// Per-field counters, sorted by descending count then ascending name.
79    pub entries: Vec<FieldObservationEntry>,
80    /// Number of events evaluated by the observer since construction or
81    /// the last reset.
82    pub events_observed: u64,
83    /// Distinct field names tracked (saturates at `max_keys`).
84    pub unique_keys: usize,
85    /// Number of insert attempts dropped because the observer was at
86    /// capacity since the last reset.
87    pub overflow_dropped: u64,
88    /// Lifetime total of events evaluated since the observer was
89    /// constructed, ignoring resets. Drives Prometheus counters, which
90    /// must be monotonic.
91    pub lifetime_events_observed: u64,
92    /// Lifetime total of insert attempts dropped because the observer
93    /// was at capacity, ignoring resets. Drives Prometheus counters.
94    pub lifetime_overflow_dropped: u64,
95    /// Configured ceiling for distinct keys.
96    pub max_keys: usize,
97    /// Seconds since the observer was created (or last reset).
98    pub uptime_seconds: f64,
99}
100
101/// Capped, opt-in field-name counter shared across producers (the
102/// daemon's event task, the eval streaming loop) and consumers (the
103/// daemon's HTTP handlers, the eval report writer).
104pub struct FieldObserver {
105    inner: Mutex<HashMap<Arc<str>, u64>>,
106    max_keys: usize,
107    /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset"
108    /// view exposed in [`FieldObservation::overflow_dropped`].
109    overflow_dropped: AtomicU64,
110    /// Resets to 0 on [`reset`](Self::reset). Drives the "since-last-reset"
111    /// view exposed in [`FieldObservation::events_observed`].
112    events_observed: AtomicU64,
113    /// Monotonic. Never reset. Drives the Prometheus counter bridge so
114    /// the lifetime metric stays consistent across observer resets.
115    lifetime_events_observed: AtomicU64,
116    /// Monotonic. Never reset. Drives the Prometheus counter bridge.
117    lifetime_overflow_dropped: AtomicU64,
118    start: Mutex<Instant>,
119}
120
121impl FieldObservation {
122    /// Join the snapshot against a [`RuleFieldSet`] and return the
123    /// partitioned coverage view in a single pass.
124    ///
125    /// Returned references borrow from `self` (the entries) and the
126    /// supplied `rule_field_set` (the missing entries), so this is
127    /// allocation-light: one `Vec` for the unknown borrows, one `Vec`
128    /// for the missing borrows, one `HashSet` for the seen lookup.
129    ///
130    /// Centralises the logic shared between the daemon's
131    /// `GET /api/v1/fields*` handlers and the `engine eval` end-of-run
132    /// report so the two surfaces cannot drift on field semantics.
133    pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> {
134        let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new();
135        let mut intersection_count: usize = 0;
136        let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len());
137        for entry in &self.entries {
138            let field: &str = &entry.field;
139            seen.insert(field);
140            if rule_field_set.contains(field) {
141                intersection_count += 1;
142            } else {
143                unknown.push(entry);
144            }
145        }
146        let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set
147            .iter()
148            .filter(|(name, _)| !seen.contains(name))
149            .collect();
150        FieldCoverage {
151            unknown,
152            intersection_count,
153            missing,
154        }
155    }
156}
157
158/// Borrowed view over a [`FieldObservation`] joined against a
159/// [`RuleFieldSet`]. Produced by [`FieldObservation::coverage`].
160///
161/// Consumers (the daemon HTTP handlers, the eval report writer) own
162/// the JSON shape; this struct only provides the partitioned data.
163pub struct FieldCoverage<'a> {
164    /// Observed fields not referenced by any loaded rule (gap signal).
165    /// Ordered the same way as [`FieldObservation::entries`]: by
166    /// descending count, then ascending name.
167    pub unknown: Vec<&'a FieldObservationEntry>,
168    /// Count of observed fields that *are* rule-referenced.
169    pub intersection_count: usize,
170    /// Rule field names that have not appeared in any observed event
171    /// (broken-coverage signal), paired with their [`FieldOrigin`] so
172    /// consumers can render rule titles and source kinds.
173    pub missing: Vec<(&'a str, &'a FieldOrigin)>,
174}
175
176impl FieldObserver {
177    /// Create a new observer with the given upper bound on distinct keys.
178    ///
179    /// A `max_keys` of 0 is allowed and disables tracking entirely; every
180    /// observed field counts as overflow. Callers wanting "no cap" should
181    /// pick a large finite number (e.g. `usize::MAX / 2`).
182    pub fn new(max_keys: usize) -> Self {
183        Self {
184            inner: Mutex::new(HashMap::new()),
185            max_keys,
186            overflow_dropped: AtomicU64::new(0),
187            events_observed: AtomicU64::new(0),
188            lifetime_events_observed: AtomicU64::new(0),
189            lifetime_overflow_dropped: AtomicU64::new(0),
190            start: Mutex::new(Instant::now()),
191        }
192    }
193
194    /// Walk the event's field keys and update the per-field counters.
195    ///
196    /// Insertion of a new key is skipped once the observer is at capacity;
197    /// already-tracked keys keep counting. The method takes `&self`, so
198    /// the observer can be shared behind an `Arc` without locking from
199    /// the caller's side.
200    pub fn observe<E: Event + ?Sized>(&self, event: &E) {
201        self.events_observed.fetch_add(1, Ordering::Relaxed);
202        self.lifetime_events_observed
203            .fetch_add(1, Ordering::Relaxed);
204        let keys = event.field_keys();
205        if keys.is_empty() {
206            return;
207        }
208        let mut overflow_local = 0u64;
209        let mut counts = self.inner.lock().expect("field observer mutex poisoned");
210        for key in keys {
211            if let Some(slot) = counts.get_mut(key.as_ref()) {
212                *slot = slot.saturating_add(1);
213            } else if counts.len() < self.max_keys {
214                counts.insert(Arc::<str>::from(key.as_ref()), 1);
215            } else {
216                overflow_local = overflow_local.saturating_add(1);
217            }
218        }
219        drop(counts);
220        if overflow_local > 0 {
221            self.overflow_dropped
222                .fetch_add(overflow_local, Ordering::Relaxed);
223            self.lifetime_overflow_dropped
224                .fetch_add(overflow_local, Ordering::Relaxed);
225        }
226    }
227
228    /// Snapshot the current counts. Entries are sorted by descending
229    /// count, then by ascending name for deterministic output.
230    ///
231    /// Cheap relative to the cardinality of the observer: each entry
232    /// only refcount-clones the `Arc<str>` key rather than copying the
233    /// key bytes, so a 10 000-key snapshot costs ~10 000 atomic
234    /// increments plus one `Vec` allocation, not 10 000 `String`
235    /// allocations.
236    pub fn snapshot(&self) -> FieldObservation {
237        let counts = self.inner.lock().expect("field observer mutex poisoned");
238        let mut entries: Vec<FieldObservationEntry> = counts
239            .iter()
240            .map(|(k, v)| FieldObservationEntry {
241                field: Arc::clone(k),
242                count: *v,
243            })
244            .collect();
245        let unique_keys = entries.len();
246        drop(counts);
247        entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field)));
248        FieldObservation {
249            entries,
250            events_observed: self.events_observed.load(Ordering::Relaxed),
251            unique_keys,
252            overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed),
253            lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed),
254            lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed),
255            max_keys: self.max_keys,
256            uptime_seconds: self
257                .start
258                .lock()
259                .expect("field observer start mutex poisoned")
260                .elapsed()
261                .as_secs_f64(),
262        }
263    }
264
265    /// Reset every counter and the overflow tally. Returns the previous
266    /// `(unique_keys, events_observed)` pair so the API endpoint can
267    /// report what was cleared.
268    pub fn reset(&self) -> (usize, u64) {
269        let mut counts = self.inner.lock().expect("field observer mutex poisoned");
270        let previous_keys = counts.len();
271        counts.clear();
272        drop(counts);
273        let previous_events = self.events_observed.swap(0, Ordering::Relaxed);
274        self.overflow_dropped.store(0, Ordering::Relaxed);
275        *self
276            .start
277            .lock()
278            .expect("field observer start mutex poisoned") = Instant::now();
279        (previous_keys, previous_events)
280    }
281
282    /// Total events observed since the observer was created or last reset.
283    pub fn events_observed(&self) -> u64 {
284        self.events_observed.load(Ordering::Relaxed)
285    }
286
287    /// Lifetime total of events observed since the observer was
288    /// constructed, ignoring resets. Monotonic; suitable for driving
289    /// Prometheus counters.
290    pub fn lifetime_events_observed(&self) -> u64 {
291        self.lifetime_events_observed.load(Ordering::Relaxed)
292    }
293
294    /// Distinct keys currently tracked (does not include overflow drops).
295    pub fn unique_keys(&self) -> usize {
296        self.inner
297            .lock()
298            .expect("field observer mutex poisoned")
299            .len()
300    }
301
302    /// Insert attempts dropped because the observer was at capacity
303    /// since the last reset.
304    pub fn overflow_dropped(&self) -> u64 {
305        self.overflow_dropped.load(Ordering::Relaxed)
306    }
307
308    /// Lifetime total of insert attempts dropped because the observer
309    /// was at capacity, ignoring resets. Monotonic.
310    pub fn lifetime_overflow_dropped(&self) -> u64 {
311        self.lifetime_overflow_dropped.load(Ordering::Relaxed)
312    }
313
314    /// Configured per-observer ceiling for distinct keys.
315    pub fn max_keys(&self) -> usize {
316        self.max_keys
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::event::JsonEvent;
324    use serde_json::json;
325
326    #[test]
327    fn observes_flat_json_fields() {
328        let observer = FieldObserver::new(100);
329        let v = json!({"CommandLine": "whoami", "User": "admin"});
330        observer.observe(&JsonEvent::borrow(&v));
331        let snap = observer.snapshot();
332        assert_eq!(snap.events_observed, 1);
333        assert_eq!(snap.unique_keys, 2);
334        assert_eq!(snap.overflow_dropped, 0);
335        let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
336        assert!(names.contains(&"CommandLine"));
337        assert!(names.contains(&"User"));
338    }
339
340    #[test]
341    fn observes_nested_json_with_dotted_leaves() {
342        let observer = FieldObserver::new(100);
343        let v = json!({"actor": {"id": "u1"}});
344        observer.observe(&JsonEvent::borrow(&v));
345        let snap = observer.snapshot();
346        let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
347        // Only the leaf is observed; the intermediate `actor` is not.
348        // This keeps the gap signal free of false positives on objects
349        // whose children are rule-referenced.
350        assert!(names.contains(&"actor.id"));
351        assert!(!names.contains(&"actor"));
352    }
353
354    #[test]
355    fn counts_accumulate_across_observations() {
356        let observer = FieldObserver::new(100);
357        for _ in 0..5 {
358            let v = json!({"CommandLine": "whoami"});
359            observer.observe(&JsonEvent::borrow(&v));
360        }
361        let snap = observer.snapshot();
362        assert_eq!(snap.events_observed, 5);
363        let entry = snap
364            .entries
365            .iter()
366            .find(|e| &*e.field == "CommandLine")
367            .expect("CommandLine tracked");
368        assert_eq!(entry.count, 5);
369    }
370
371    #[test]
372    fn cap_enforced_and_overflow_recorded() {
373        let observer = FieldObserver::new(2);
374        let v = json!({"a": 1, "b": 2, "c": 3, "d": 4});
375        observer.observe(&JsonEvent::borrow(&v));
376        let snap = observer.snapshot();
377        assert_eq!(snap.unique_keys, 2);
378        assert_eq!(snap.overflow_dropped, 2);
379        // Existing keys keep counting after cap is hit:
380        observer.observe(&JsonEvent::borrow(&v));
381        let snap2 = observer.snapshot();
382        assert_eq!(snap2.unique_keys, 2);
383        assert_eq!(snap2.overflow_dropped, 4);
384        for entry in &snap2.entries {
385            assert_eq!(entry.count, 2, "tracked key counter advanced");
386        }
387    }
388
389    #[test]
390    fn snapshot_sorts_by_count_desc_then_name() {
391        let observer = FieldObserver::new(100);
392        for _ in 0..3 {
393            observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
394        }
395        observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
396        observer.observe(&JsonEvent::borrow(&json!({"chill": 1})));
397        let snap = observer.snapshot();
398        let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
399        assert_eq!(order, vec!["hot", "chill", "warm"]);
400    }
401
402    #[test]
403    fn reset_clears_state_and_returns_previous_counts() {
404        let observer = FieldObserver::new(100);
405        observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2})));
406        observer.observe(&JsonEvent::borrow(&json!({"a": 1})));
407        let (prev_keys, prev_events) = observer.reset();
408        assert_eq!(prev_keys, 2);
409        assert_eq!(prev_events, 2);
410        let snap = observer.snapshot();
411        assert_eq!(snap.events_observed, 0);
412        assert_eq!(snap.unique_keys, 0);
413        assert_eq!(snap.overflow_dropped, 0);
414        assert!(snap.entries.is_empty());
415    }
416
417    #[test]
418    fn lifetime_counters_survive_reset() {
419        // Regression: the Prometheus counter bridge relies on monotonic
420        // lifetime totals. Resetting the observer must not lose data
421        // points that the next /metrics scrape needs to see.
422        let observer = FieldObserver::new(2);
423        // 3 events, 4 unique fields => 2 fit, 2 overflow per event.
424        for _ in 0..3 {
425            observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
426        }
427        let before = observer.snapshot();
428        assert_eq!(before.events_observed, 3);
429        assert_eq!(before.lifetime_events_observed, 3);
430        assert_eq!(before.overflow_dropped, 6);
431        assert_eq!(before.lifetime_overflow_dropped, 6);
432
433        observer.reset();
434        let after_reset = observer.snapshot();
435        assert_eq!(after_reset.events_observed, 0);
436        assert_eq!(after_reset.overflow_dropped, 0);
437        // Lifetime totals MUST NOT reset:
438        assert_eq!(after_reset.lifetime_events_observed, 3);
439        assert_eq!(after_reset.lifetime_overflow_dropped, 6);
440
441        // Continue observing; lifetime keeps climbing from where it was.
442        observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
443        let after = observer.snapshot();
444        assert_eq!(after.events_observed, 1);
445        assert_eq!(after.lifetime_events_observed, 4);
446        assert_eq!(after.overflow_dropped, 2);
447        assert_eq!(after.lifetime_overflow_dropped, 8);
448    }
449
450    #[test]
451    fn plain_event_observation_is_a_noop_for_counters() {
452        let observer = FieldObserver::new(100);
453        let plain = crate::event::PlainEvent::new("disk full".into());
454        observer.observe(&plain);
455        let snap = observer.snapshot();
456        // events_observed still ticks: the observer saw the event but it had
457        // no structured fields to record.
458        assert_eq!(snap.events_observed, 1);
459        assert_eq!(snap.unique_keys, 0);
460        assert_eq!(snap.overflow_dropped, 0);
461    }
462
463    #[test]
464    fn coverage_partitions_observed_against_rule_set() {
465        // Two rules: one references CommandLine (also matches the event),
466        // the other references ProcessGuid (never appears in the event).
467        // Event carries CommandLine plus two unrelated fields.
468        let yaml = r#"
469title: Whoami
470status: test
471logsource:
472    category: test
473detection:
474    selection:
475        CommandLine|contains: whoami
476    condition: selection
477---
478title: Process Tampering
479status: test
480logsource:
481    category: test
482detection:
483    selection:
484        ProcessGuid: "{abc}"
485    condition: selection
486"#;
487        let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
488        let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
489        let observer = FieldObserver::new(100);
490        observer.observe(&JsonEvent::borrow(
491            &json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}),
492        ));
493
494        let snap = observer.snapshot();
495        let coverage = snap.coverage(&rule_field_set);
496
497        assert_eq!(coverage.intersection_count, 1, "CommandLine intersects");
498        let unknown: Vec<&str> = coverage
499            .unknown
500            .iter()
501            .map(|e| -> &str { &e.field })
502            .collect();
503        assert!(unknown.contains(&"User"));
504        assert!(unknown.contains(&"src_ip"));
505        assert!(!unknown.contains(&"CommandLine"));
506        let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect();
507        assert!(
508            missing.contains(&"ProcessGuid"),
509            "ProcessGuid was rule-referenced but never observed"
510        );
511        assert!(!missing.contains(&"CommandLine"));
512    }
513
514    #[test]
515    fn coverage_empty_observer_yields_only_missing() {
516        let yaml = r#"
517title: A
518status: test
519logsource:
520    category: test
521detection:
522    selection:
523        FieldA: x
524    condition: selection
525"#;
526        let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
527        let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
528        let observer = FieldObserver::new(100);
529
530        let snap = observer.snapshot();
531        let coverage = snap.coverage(&rule_field_set);
532        assert_eq!(coverage.intersection_count, 0);
533        assert!(coverage.unknown.is_empty());
534        assert_eq!(coverage.missing.len(), 1);
535        assert_eq!(coverage.missing[0].0, "FieldA");
536    }
537
538    #[test]
539    fn coverage_unknown_preserves_snapshot_ordering() {
540        let observer = FieldObserver::new(100);
541        for _ in 0..3 {
542            observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
543        }
544        observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
545        let empty_rule_set = crate::fields::RuleFieldSet::default();
546
547        let snap = observer.snapshot();
548        let coverage = snap.coverage(&empty_rule_set);
549        let order: Vec<&str> = coverage
550            .unknown
551            .iter()
552            .map(|e| -> &str { &e.field })
553            .collect();
554        // Snapshot is already sorted by descending count then ascending
555        // name; coverage's filter-only pass must preserve that order.
556        assert_eq!(order, vec!["hot", "warm"]);
557    }
558}