1use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::sync::Mutex;
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::time::Instant;
53
54use crate::event::Event;
55use crate::fields::{FieldOrigin, RuleFieldSet};
56
57#[derive(Debug, Clone, PartialEq, Eq)]
64pub struct FieldObservationEntry {
65 pub field: Arc<str>,
67 pub count: u64,
69}
70
71#[derive(Debug, Clone, Default)]
77pub struct FieldObservation {
78 pub entries: Vec<FieldObservationEntry>,
80 pub events_observed: u64,
83 pub unique_keys: usize,
85 pub overflow_dropped: u64,
88 pub lifetime_events_observed: u64,
92 pub lifetime_overflow_dropped: u64,
95 pub max_keys: usize,
97 pub uptime_seconds: f64,
99}
100
101pub struct FieldObserver {
105 inner: Mutex<HashMap<Arc<str>, u64>>,
106 max_keys: usize,
107 overflow_dropped: AtomicU64,
110 events_observed: AtomicU64,
113 lifetime_events_observed: AtomicU64,
116 lifetime_overflow_dropped: AtomicU64,
118 start: Mutex<Instant>,
119}
120
121impl FieldObservation {
122 pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> {
134 let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new();
135 let mut intersection_count: usize = 0;
136 let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len());
137 for entry in &self.entries {
138 let field: &str = &entry.field;
139 seen.insert(field);
140 if rule_field_set.contains(field) {
141 intersection_count += 1;
142 } else {
143 unknown.push(entry);
144 }
145 }
146 let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set
147 .iter()
148 .filter(|(name, _)| !seen.contains(name))
149 .collect();
150 FieldCoverage {
151 unknown,
152 intersection_count,
153 missing,
154 }
155 }
156}
157
158pub struct FieldCoverage<'a> {
164 pub unknown: Vec<&'a FieldObservationEntry>,
168 pub intersection_count: usize,
170 pub missing: Vec<(&'a str, &'a FieldOrigin)>,
174}
175
176impl FieldObserver {
177 pub fn new(max_keys: usize) -> Self {
183 Self {
184 inner: Mutex::new(HashMap::new()),
185 max_keys,
186 overflow_dropped: AtomicU64::new(0),
187 events_observed: AtomicU64::new(0),
188 lifetime_events_observed: AtomicU64::new(0),
189 lifetime_overflow_dropped: AtomicU64::new(0),
190 start: Mutex::new(Instant::now()),
191 }
192 }
193
194 pub fn observe<E: Event + ?Sized>(&self, event: &E) {
201 self.events_observed.fetch_add(1, Ordering::Relaxed);
202 self.lifetime_events_observed
203 .fetch_add(1, Ordering::Relaxed);
204 let keys = event.field_keys();
205 if keys.is_empty() {
206 return;
207 }
208 let mut overflow_local = 0u64;
209 let mut counts = self.inner.lock().expect("field observer mutex poisoned");
210 for key in keys {
211 if let Some(slot) = counts.get_mut(key.as_ref()) {
212 *slot = slot.saturating_add(1);
213 } else if counts.len() < self.max_keys {
214 counts.insert(Arc::<str>::from(key.as_ref()), 1);
215 } else {
216 overflow_local = overflow_local.saturating_add(1);
217 }
218 }
219 drop(counts);
220 if overflow_local > 0 {
221 self.overflow_dropped
222 .fetch_add(overflow_local, Ordering::Relaxed);
223 self.lifetime_overflow_dropped
224 .fetch_add(overflow_local, Ordering::Relaxed);
225 }
226 }
227
228 pub fn snapshot(&self) -> FieldObservation {
237 let counts = self.inner.lock().expect("field observer mutex poisoned");
238 let mut entries: Vec<FieldObservationEntry> = counts
239 .iter()
240 .map(|(k, v)| FieldObservationEntry {
241 field: Arc::clone(k),
242 count: *v,
243 })
244 .collect();
245 let unique_keys = entries.len();
246 drop(counts);
247 entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field)));
248 FieldObservation {
249 entries,
250 events_observed: self.events_observed.load(Ordering::Relaxed),
251 unique_keys,
252 overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed),
253 lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed),
254 lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed),
255 max_keys: self.max_keys,
256 uptime_seconds: self
257 .start
258 .lock()
259 .expect("field observer start mutex poisoned")
260 .elapsed()
261 .as_secs_f64(),
262 }
263 }
264
265 pub fn reset(&self) -> (usize, u64) {
269 let mut counts = self.inner.lock().expect("field observer mutex poisoned");
270 let previous_keys = counts.len();
271 counts.clear();
272 drop(counts);
273 let previous_events = self.events_observed.swap(0, Ordering::Relaxed);
274 self.overflow_dropped.store(0, Ordering::Relaxed);
275 *self
276 .start
277 .lock()
278 .expect("field observer start mutex poisoned") = Instant::now();
279 (previous_keys, previous_events)
280 }
281
282 pub fn events_observed(&self) -> u64 {
284 self.events_observed.load(Ordering::Relaxed)
285 }
286
287 pub fn lifetime_events_observed(&self) -> u64 {
291 self.lifetime_events_observed.load(Ordering::Relaxed)
292 }
293
294 pub fn unique_keys(&self) -> usize {
296 self.inner
297 .lock()
298 .expect("field observer mutex poisoned")
299 .len()
300 }
301
302 pub fn overflow_dropped(&self) -> u64 {
305 self.overflow_dropped.load(Ordering::Relaxed)
306 }
307
308 pub fn lifetime_overflow_dropped(&self) -> u64 {
311 self.lifetime_overflow_dropped.load(Ordering::Relaxed)
312 }
313
314 pub fn max_keys(&self) -> usize {
316 self.max_keys
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use crate::event::JsonEvent;
324 use serde_json::json;
325
326 #[test]
327 fn observes_flat_json_fields() {
328 let observer = FieldObserver::new(100);
329 let v = json!({"CommandLine": "whoami", "User": "admin"});
330 observer.observe(&JsonEvent::borrow(&v));
331 let snap = observer.snapshot();
332 assert_eq!(snap.events_observed, 1);
333 assert_eq!(snap.unique_keys, 2);
334 assert_eq!(snap.overflow_dropped, 0);
335 let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
336 assert!(names.contains(&"CommandLine"));
337 assert!(names.contains(&"User"));
338 }
339
340 #[test]
341 fn observes_nested_json_with_dotted_leaves() {
342 let observer = FieldObserver::new(100);
343 let v = json!({"actor": {"id": "u1"}});
344 observer.observe(&JsonEvent::borrow(&v));
345 let snap = observer.snapshot();
346 let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
347 assert!(names.contains(&"actor.id"));
351 assert!(!names.contains(&"actor"));
352 }
353
354 #[test]
355 fn counts_accumulate_across_observations() {
356 let observer = FieldObserver::new(100);
357 for _ in 0..5 {
358 let v = json!({"CommandLine": "whoami"});
359 observer.observe(&JsonEvent::borrow(&v));
360 }
361 let snap = observer.snapshot();
362 assert_eq!(snap.events_observed, 5);
363 let entry = snap
364 .entries
365 .iter()
366 .find(|e| &*e.field == "CommandLine")
367 .expect("CommandLine tracked");
368 assert_eq!(entry.count, 5);
369 }
370
371 #[test]
372 fn cap_enforced_and_overflow_recorded() {
373 let observer = FieldObserver::new(2);
374 let v = json!({"a": 1, "b": 2, "c": 3, "d": 4});
375 observer.observe(&JsonEvent::borrow(&v));
376 let snap = observer.snapshot();
377 assert_eq!(snap.unique_keys, 2);
378 assert_eq!(snap.overflow_dropped, 2);
379 observer.observe(&JsonEvent::borrow(&v));
381 let snap2 = observer.snapshot();
382 assert_eq!(snap2.unique_keys, 2);
383 assert_eq!(snap2.overflow_dropped, 4);
384 for entry in &snap2.entries {
385 assert_eq!(entry.count, 2, "tracked key counter advanced");
386 }
387 }
388
389 #[test]
390 fn snapshot_sorts_by_count_desc_then_name() {
391 let observer = FieldObserver::new(100);
392 for _ in 0..3 {
393 observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
394 }
395 observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
396 observer.observe(&JsonEvent::borrow(&json!({"chill": 1})));
397 let snap = observer.snapshot();
398 let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
399 assert_eq!(order, vec!["hot", "chill", "warm"]);
400 }
401
402 #[test]
403 fn reset_clears_state_and_returns_previous_counts() {
404 let observer = FieldObserver::new(100);
405 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2})));
406 observer.observe(&JsonEvent::borrow(&json!({"a": 1})));
407 let (prev_keys, prev_events) = observer.reset();
408 assert_eq!(prev_keys, 2);
409 assert_eq!(prev_events, 2);
410 let snap = observer.snapshot();
411 assert_eq!(snap.events_observed, 0);
412 assert_eq!(snap.unique_keys, 0);
413 assert_eq!(snap.overflow_dropped, 0);
414 assert!(snap.entries.is_empty());
415 }
416
417 #[test]
418 fn lifetime_counters_survive_reset() {
419 let observer = FieldObserver::new(2);
423 for _ in 0..3 {
425 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
426 }
427 let before = observer.snapshot();
428 assert_eq!(before.events_observed, 3);
429 assert_eq!(before.lifetime_events_observed, 3);
430 assert_eq!(before.overflow_dropped, 6);
431 assert_eq!(before.lifetime_overflow_dropped, 6);
432
433 observer.reset();
434 let after_reset = observer.snapshot();
435 assert_eq!(after_reset.events_observed, 0);
436 assert_eq!(after_reset.overflow_dropped, 0);
437 assert_eq!(after_reset.lifetime_events_observed, 3);
439 assert_eq!(after_reset.lifetime_overflow_dropped, 6);
440
441 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
443 let after = observer.snapshot();
444 assert_eq!(after.events_observed, 1);
445 assert_eq!(after.lifetime_events_observed, 4);
446 assert_eq!(after.overflow_dropped, 2);
447 assert_eq!(after.lifetime_overflow_dropped, 8);
448 }
449
450 #[test]
451 fn plain_event_observation_is_a_noop_for_counters() {
452 let observer = FieldObserver::new(100);
453 let plain = crate::event::PlainEvent::new("disk full".into());
454 observer.observe(&plain);
455 let snap = observer.snapshot();
456 assert_eq!(snap.events_observed, 1);
459 assert_eq!(snap.unique_keys, 0);
460 assert_eq!(snap.overflow_dropped, 0);
461 }
462
463 #[test]
464 fn coverage_partitions_observed_against_rule_set() {
465 let yaml = r#"
469title: Whoami
470status: test
471logsource:
472 category: test
473detection:
474 selection:
475 CommandLine|contains: whoami
476 condition: selection
477---
478title: Process Tampering
479status: test
480logsource:
481 category: test
482detection:
483 selection:
484 ProcessGuid: "{abc}"
485 condition: selection
486"#;
487 let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
488 let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
489 let observer = FieldObserver::new(100);
490 observer.observe(&JsonEvent::borrow(
491 &json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}),
492 ));
493
494 let snap = observer.snapshot();
495 let coverage = snap.coverage(&rule_field_set);
496
497 assert_eq!(coverage.intersection_count, 1, "CommandLine intersects");
498 let unknown: Vec<&str> = coverage
499 .unknown
500 .iter()
501 .map(|e| -> &str { &e.field })
502 .collect();
503 assert!(unknown.contains(&"User"));
504 assert!(unknown.contains(&"src_ip"));
505 assert!(!unknown.contains(&"CommandLine"));
506 let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect();
507 assert!(
508 missing.contains(&"ProcessGuid"),
509 "ProcessGuid was rule-referenced but never observed"
510 );
511 assert!(!missing.contains(&"CommandLine"));
512 }
513
514 #[test]
515 fn coverage_empty_observer_yields_only_missing() {
516 let yaml = r#"
517title: A
518status: test
519logsource:
520 category: test
521detection:
522 selection:
523 FieldA: x
524 condition: selection
525"#;
526 let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
527 let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
528 let observer = FieldObserver::new(100);
529
530 let snap = observer.snapshot();
531 let coverage = snap.coverage(&rule_field_set);
532 assert_eq!(coverage.intersection_count, 0);
533 assert!(coverage.unknown.is_empty());
534 assert_eq!(coverage.missing.len(), 1);
535 assert_eq!(coverage.missing[0].0, "FieldA");
536 }
537
538 #[test]
539 fn coverage_unknown_preserves_snapshot_ordering() {
540 let observer = FieldObserver::new(100);
541 for _ in 0..3 {
542 observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
543 }
544 observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
545 let empty_rule_set = crate::fields::RuleFieldSet::default();
546
547 let snap = observer.snapshot();
548 let coverage = snap.coverage(&empty_rule_set);
549 let order: Vec<&str> = coverage
550 .unknown
551 .iter()
552 .map(|e| -> &str { &e.field })
553 .collect();
554 assert_eq!(order, vec!["hot", "warm"]);
557 }
558}