use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use crate::event::Event;
use crate::fields::{FieldOrigin, RuleFieldSet};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FieldObservationEntry {
pub field: Arc<str>,
pub count: u64,
}
#[derive(Debug, Clone, Default)]
pub struct FieldObservation {
pub entries: Vec<FieldObservationEntry>,
pub events_observed: u64,
pub unique_keys: usize,
pub overflow_dropped: u64,
pub lifetime_events_observed: u64,
pub lifetime_overflow_dropped: u64,
pub max_keys: usize,
pub uptime_seconds: f64,
}
pub struct FieldObserver {
inner: Mutex<HashMap<Arc<str>, u64>>,
max_keys: usize,
overflow_dropped: AtomicU64,
events_observed: AtomicU64,
lifetime_events_observed: AtomicU64,
lifetime_overflow_dropped: AtomicU64,
start: Mutex<Instant>,
}
impl FieldObservation {
pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> {
let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new();
let mut intersection_count: usize = 0;
let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len());
for entry in &self.entries {
let field: &str = &entry.field;
seen.insert(field);
if rule_field_set.contains(field) {
intersection_count += 1;
} else {
unknown.push(entry);
}
}
let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set
.iter()
.filter(|(name, _)| !seen.contains(name))
.collect();
FieldCoverage {
unknown,
intersection_count,
missing,
}
}
}
pub struct FieldCoverage<'a> {
pub unknown: Vec<&'a FieldObservationEntry>,
pub intersection_count: usize,
pub missing: Vec<(&'a str, &'a FieldOrigin)>,
}
impl FieldObserver {
pub fn new(max_keys: usize) -> Self {
Self {
inner: Mutex::new(HashMap::new()),
max_keys,
overflow_dropped: AtomicU64::new(0),
events_observed: AtomicU64::new(0),
lifetime_events_observed: AtomicU64::new(0),
lifetime_overflow_dropped: AtomicU64::new(0),
start: Mutex::new(Instant::now()),
}
}
pub fn observe<E: Event + ?Sized>(&self, event: &E) {
self.events_observed.fetch_add(1, Ordering::Relaxed);
self.lifetime_events_observed
.fetch_add(1, Ordering::Relaxed);
let keys = event.field_keys();
if keys.is_empty() {
return;
}
let mut overflow_local = 0u64;
let mut counts = self.inner.lock().expect("field observer mutex poisoned");
for key in keys {
if let Some(slot) = counts.get_mut(key.as_ref()) {
*slot = slot.saturating_add(1);
} else if counts.len() < self.max_keys {
counts.insert(Arc::<str>::from(key.as_ref()), 1);
} else {
overflow_local = overflow_local.saturating_add(1);
}
}
drop(counts);
if overflow_local > 0 {
self.overflow_dropped
.fetch_add(overflow_local, Ordering::Relaxed);
self.lifetime_overflow_dropped
.fetch_add(overflow_local, Ordering::Relaxed);
}
}
pub fn snapshot(&self) -> FieldObservation {
let counts = self.inner.lock().expect("field observer mutex poisoned");
let mut entries: Vec<FieldObservationEntry> = counts
.iter()
.map(|(k, v)| FieldObservationEntry {
field: Arc::clone(k),
count: *v,
})
.collect();
let unique_keys = entries.len();
drop(counts);
entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field)));
FieldObservation {
entries,
events_observed: self.events_observed.load(Ordering::Relaxed),
unique_keys,
overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed),
lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed),
lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed),
max_keys: self.max_keys,
uptime_seconds: self
.start
.lock()
.expect("field observer start mutex poisoned")
.elapsed()
.as_secs_f64(),
}
}
pub fn reset(&self) -> (usize, u64) {
let mut counts = self.inner.lock().expect("field observer mutex poisoned");
let previous_keys = counts.len();
counts.clear();
drop(counts);
let previous_events = self.events_observed.swap(0, Ordering::Relaxed);
self.overflow_dropped.store(0, Ordering::Relaxed);
*self
.start
.lock()
.expect("field observer start mutex poisoned") = Instant::now();
(previous_keys, previous_events)
}
pub fn events_observed(&self) -> u64 {
self.events_observed.load(Ordering::Relaxed)
}
pub fn lifetime_events_observed(&self) -> u64 {
self.lifetime_events_observed.load(Ordering::Relaxed)
}
pub fn unique_keys(&self) -> usize {
self.inner
.lock()
.expect("field observer mutex poisoned")
.len()
}
pub fn overflow_dropped(&self) -> u64 {
self.overflow_dropped.load(Ordering::Relaxed)
}
pub fn lifetime_overflow_dropped(&self) -> u64 {
self.lifetime_overflow_dropped.load(Ordering::Relaxed)
}
pub fn max_keys(&self) -> usize {
self.max_keys
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::JsonEvent;
use serde_json::json;
#[test]
fn observes_flat_json_fields() {
let observer = FieldObserver::new(100);
let v = json!({"CommandLine": "whoami", "User": "admin"});
observer.observe(&JsonEvent::borrow(&v));
let snap = observer.snapshot();
assert_eq!(snap.events_observed, 1);
assert_eq!(snap.unique_keys, 2);
assert_eq!(snap.overflow_dropped, 0);
let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
assert!(names.contains(&"CommandLine"));
assert!(names.contains(&"User"));
}
#[test]
fn observes_nested_json_with_dotted_leaves() {
let observer = FieldObserver::new(100);
let v = json!({"actor": {"id": "u1"}});
observer.observe(&JsonEvent::borrow(&v));
let snap = observer.snapshot();
let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
assert!(names.contains(&"actor.id"));
assert!(!names.contains(&"actor"));
}
#[test]
fn counts_accumulate_across_observations() {
let observer = FieldObserver::new(100);
for _ in 0..5 {
let v = json!({"CommandLine": "whoami"});
observer.observe(&JsonEvent::borrow(&v));
}
let snap = observer.snapshot();
assert_eq!(snap.events_observed, 5);
let entry = snap
.entries
.iter()
.find(|e| &*e.field == "CommandLine")
.expect("CommandLine tracked");
assert_eq!(entry.count, 5);
}
#[test]
fn cap_enforced_and_overflow_recorded() {
let observer = FieldObserver::new(2);
let v = json!({"a": 1, "b": 2, "c": 3, "d": 4});
observer.observe(&JsonEvent::borrow(&v));
let snap = observer.snapshot();
assert_eq!(snap.unique_keys, 2);
assert_eq!(snap.overflow_dropped, 2);
observer.observe(&JsonEvent::borrow(&v));
let snap2 = observer.snapshot();
assert_eq!(snap2.unique_keys, 2);
assert_eq!(snap2.overflow_dropped, 4);
for entry in &snap2.entries {
assert_eq!(entry.count, 2, "tracked key counter advanced");
}
}
#[test]
fn snapshot_sorts_by_count_desc_then_name() {
let observer = FieldObserver::new(100);
for _ in 0..3 {
observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
}
observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
observer.observe(&JsonEvent::borrow(&json!({"chill": 1})));
let snap = observer.snapshot();
let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
assert_eq!(order, vec!["hot", "chill", "warm"]);
}
#[test]
fn reset_clears_state_and_returns_previous_counts() {
let observer = FieldObserver::new(100);
observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2})));
observer.observe(&JsonEvent::borrow(&json!({"a": 1})));
let (prev_keys, prev_events) = observer.reset();
assert_eq!(prev_keys, 2);
assert_eq!(prev_events, 2);
let snap = observer.snapshot();
assert_eq!(snap.events_observed, 0);
assert_eq!(snap.unique_keys, 0);
assert_eq!(snap.overflow_dropped, 0);
assert!(snap.entries.is_empty());
}
#[test]
fn lifetime_counters_survive_reset() {
let observer = FieldObserver::new(2);
for _ in 0..3 {
observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
}
let before = observer.snapshot();
assert_eq!(before.events_observed, 3);
assert_eq!(before.lifetime_events_observed, 3);
assert_eq!(before.overflow_dropped, 6);
assert_eq!(before.lifetime_overflow_dropped, 6);
observer.reset();
let after_reset = observer.snapshot();
assert_eq!(after_reset.events_observed, 0);
assert_eq!(after_reset.overflow_dropped, 0);
assert_eq!(after_reset.lifetime_events_observed, 3);
assert_eq!(after_reset.lifetime_overflow_dropped, 6);
observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
let after = observer.snapshot();
assert_eq!(after.events_observed, 1);
assert_eq!(after.lifetime_events_observed, 4);
assert_eq!(after.overflow_dropped, 2);
assert_eq!(after.lifetime_overflow_dropped, 8);
}
#[test]
fn plain_event_observation_is_a_noop_for_counters() {
let observer = FieldObserver::new(100);
let plain = crate::event::PlainEvent::new("disk full".into());
observer.observe(&plain);
let snap = observer.snapshot();
assert_eq!(snap.events_observed, 1);
assert_eq!(snap.unique_keys, 0);
assert_eq!(snap.overflow_dropped, 0);
}
#[test]
fn coverage_partitions_observed_against_rule_set() {
let yaml = r#"
title: Whoami
status: test
logsource:
category: test
detection:
selection:
CommandLine|contains: whoami
condition: selection
---
title: Process Tampering
status: test
logsource:
category: test
detection:
selection:
ProcessGuid: "{abc}"
condition: selection
"#;
let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
let observer = FieldObserver::new(100);
observer.observe(&JsonEvent::borrow(
&json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}),
));
let snap = observer.snapshot();
let coverage = snap.coverage(&rule_field_set);
assert_eq!(coverage.intersection_count, 1, "CommandLine intersects");
let unknown: Vec<&str> = coverage
.unknown
.iter()
.map(|e| -> &str { &e.field })
.collect();
assert!(unknown.contains(&"User"));
assert!(unknown.contains(&"src_ip"));
assert!(!unknown.contains(&"CommandLine"));
let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect();
assert!(
missing.contains(&"ProcessGuid"),
"ProcessGuid was rule-referenced but never observed"
);
assert!(!missing.contains(&"CommandLine"));
}
#[test]
fn coverage_empty_observer_yields_only_missing() {
let yaml = r#"
title: A
status: test
logsource:
category: test
detection:
selection:
FieldA: x
condition: selection
"#;
let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
let observer = FieldObserver::new(100);
let snap = observer.snapshot();
let coverage = snap.coverage(&rule_field_set);
assert_eq!(coverage.intersection_count, 0);
assert!(coverage.unknown.is_empty());
assert_eq!(coverage.missing.len(), 1);
assert_eq!(coverage.missing[0].0, "FieldA");
}
#[test]
fn coverage_unknown_preserves_snapshot_ordering() {
let observer = FieldObserver::new(100);
for _ in 0..3 {
observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
}
observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
let empty_rule_set = crate::fields::RuleFieldSet::default();
let snap = observer.snapshot();
let coverage = snap.coverage(&empty_rule_set);
let order: Vec<&str> = coverage
.unknown
.iter()
.map(|e| -> &str { &e.field })
.collect();
assert_eq!(order, vec!["hot", "warm"]);
}
}