1use std::collections::{HashMap, HashSet};
48use std::sync::Arc;
49use std::sync::Mutex;
50use std::sync::atomic::{AtomicU64, Ordering};
51use std::time::Instant;
52
53use crate::event::Event;
54use crate::fields::{FieldOrigin, RuleFieldSet};
55
56#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct FieldObservationEntry {
64 pub field: Arc<str>,
66 pub count: u64,
68}
69
70#[derive(Debug, Clone, Default)]
76pub struct FieldObservation {
77 pub entries: Vec<FieldObservationEntry>,
79 pub events_observed: u64,
82 pub unique_keys: usize,
84 pub overflow_dropped: u64,
87 pub lifetime_events_observed: u64,
91 pub lifetime_overflow_dropped: u64,
94 pub max_keys: usize,
96 pub uptime_seconds: f64,
98}
99
100pub struct FieldObserver {
104 inner: Mutex<HashMap<Arc<str>, u64>>,
105 max_keys: usize,
106 overflow_dropped: AtomicU64,
109 events_observed: AtomicU64,
112 lifetime_events_observed: AtomicU64,
115 lifetime_overflow_dropped: AtomicU64,
117 start: Mutex<Instant>,
118}
119
120impl FieldObservation {
121 pub fn coverage<'a>(&'a self, rule_field_set: &'a RuleFieldSet) -> FieldCoverage<'a> {
133 let mut unknown: Vec<&'a FieldObservationEntry> = Vec::new();
134 let mut intersection_count: usize = 0;
135 let mut seen: HashSet<&'a str> = HashSet::with_capacity(self.entries.len());
136 for entry in &self.entries {
137 let field: &str = &entry.field;
138 seen.insert(field);
139 if rule_field_set.contains(field) {
140 intersection_count += 1;
141 } else {
142 unknown.push(entry);
143 }
144 }
145 let missing: Vec<(&'a str, &'a FieldOrigin)> = rule_field_set
146 .iter()
147 .filter(|(name, _)| !seen.contains(name))
148 .collect();
149 FieldCoverage {
150 unknown,
151 intersection_count,
152 missing,
153 }
154 }
155}
156
157pub struct FieldCoverage<'a> {
163 pub unknown: Vec<&'a FieldObservationEntry>,
167 pub intersection_count: usize,
169 pub missing: Vec<(&'a str, &'a FieldOrigin)>,
173}
174
175impl FieldObserver {
176 pub fn new(max_keys: usize) -> Self {
182 Self {
183 inner: Mutex::new(HashMap::new()),
184 max_keys,
185 overflow_dropped: AtomicU64::new(0),
186 events_observed: AtomicU64::new(0),
187 lifetime_events_observed: AtomicU64::new(0),
188 lifetime_overflow_dropped: AtomicU64::new(0),
189 start: Mutex::new(Instant::now()),
190 }
191 }
192
193 pub fn observe<E: Event + ?Sized>(&self, event: &E) {
200 self.events_observed.fetch_add(1, Ordering::Relaxed);
201 self.lifetime_events_observed
202 .fetch_add(1, Ordering::Relaxed);
203 let keys = event.field_keys();
204 if keys.is_empty() {
205 return;
206 }
207 let mut overflow_local = 0u64;
208 let mut counts = self.inner.lock().expect("field observer mutex poisoned");
209 for key in keys {
210 if let Some(slot) = counts.get_mut(key.as_ref()) {
211 *slot = slot.saturating_add(1);
212 } else if counts.len() < self.max_keys {
213 counts.insert(Arc::<str>::from(key.as_ref()), 1);
214 } else {
215 overflow_local = overflow_local.saturating_add(1);
216 }
217 }
218 drop(counts);
219 if overflow_local > 0 {
220 self.overflow_dropped
221 .fetch_add(overflow_local, Ordering::Relaxed);
222 self.lifetime_overflow_dropped
223 .fetch_add(overflow_local, Ordering::Relaxed);
224 }
225 }
226
227 pub fn snapshot(&self) -> FieldObservation {
236 let counts = self.inner.lock().expect("field observer mutex poisoned");
237 let mut entries: Vec<FieldObservationEntry> = counts
238 .iter()
239 .map(|(k, v)| FieldObservationEntry {
240 field: Arc::clone(k),
241 count: *v,
242 })
243 .collect();
244 let unique_keys = entries.len();
245 drop(counts);
246 entries.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.field.cmp(&b.field)));
247 FieldObservation {
248 entries,
249 events_observed: self.events_observed.load(Ordering::Relaxed),
250 unique_keys,
251 overflow_dropped: self.overflow_dropped.load(Ordering::Relaxed),
252 lifetime_events_observed: self.lifetime_events_observed.load(Ordering::Relaxed),
253 lifetime_overflow_dropped: self.lifetime_overflow_dropped.load(Ordering::Relaxed),
254 max_keys: self.max_keys,
255 uptime_seconds: self
256 .start
257 .lock()
258 .expect("field observer start mutex poisoned")
259 .elapsed()
260 .as_secs_f64(),
261 }
262 }
263
264 pub fn reset(&self) -> (usize, u64) {
268 let mut counts = self.inner.lock().expect("field observer mutex poisoned");
269 let previous_keys = counts.len();
270 counts.clear();
271 drop(counts);
272 let previous_events = self.events_observed.swap(0, Ordering::Relaxed);
273 self.overflow_dropped.store(0, Ordering::Relaxed);
274 *self
275 .start
276 .lock()
277 .expect("field observer start mutex poisoned") = Instant::now();
278 (previous_keys, previous_events)
279 }
280
281 pub fn events_observed(&self) -> u64 {
283 self.events_observed.load(Ordering::Relaxed)
284 }
285
286 pub fn lifetime_events_observed(&self) -> u64 {
290 self.lifetime_events_observed.load(Ordering::Relaxed)
291 }
292
293 pub fn unique_keys(&self) -> usize {
295 self.inner
296 .lock()
297 .expect("field observer mutex poisoned")
298 .len()
299 }
300
301 pub fn overflow_dropped(&self) -> u64 {
304 self.overflow_dropped.load(Ordering::Relaxed)
305 }
306
307 pub fn lifetime_overflow_dropped(&self) -> u64 {
310 self.lifetime_overflow_dropped.load(Ordering::Relaxed)
311 }
312
313 pub fn max_keys(&self) -> usize {
315 self.max_keys
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use crate::event::JsonEvent;
323 use serde_json::json;
324
325 #[test]
326 fn observes_flat_json_fields() {
327 let observer = FieldObserver::new(100);
328 let v = json!({"CommandLine": "whoami", "User": "admin"});
329 observer.observe(&JsonEvent::borrow(&v));
330 let snap = observer.snapshot();
331 assert_eq!(snap.events_observed, 1);
332 assert_eq!(snap.unique_keys, 2);
333 assert_eq!(snap.overflow_dropped, 0);
334 let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
335 assert!(names.contains(&"CommandLine"));
336 assert!(names.contains(&"User"));
337 }
338
339 #[test]
340 fn observes_nested_json_with_dotted_leaves() {
341 let observer = FieldObserver::new(100);
342 let v = json!({"actor": {"id": "u1"}});
343 observer.observe(&JsonEvent::borrow(&v));
344 let snap = observer.snapshot();
345 let names: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
346 assert!(names.contains(&"actor.id"));
350 assert!(!names.contains(&"actor"));
351 }
352
353 #[test]
354 fn counts_accumulate_across_observations() {
355 let observer = FieldObserver::new(100);
356 for _ in 0..5 {
357 let v = json!({"CommandLine": "whoami"});
358 observer.observe(&JsonEvent::borrow(&v));
359 }
360 let snap = observer.snapshot();
361 assert_eq!(snap.events_observed, 5);
362 let entry = snap
363 .entries
364 .iter()
365 .find(|e| &*e.field == "CommandLine")
366 .expect("CommandLine tracked");
367 assert_eq!(entry.count, 5);
368 }
369
370 #[test]
371 fn cap_enforced_and_overflow_recorded() {
372 let observer = FieldObserver::new(2);
373 let v = json!({"a": 1, "b": 2, "c": 3, "d": 4});
374 observer.observe(&JsonEvent::borrow(&v));
375 let snap = observer.snapshot();
376 assert_eq!(snap.unique_keys, 2);
377 assert_eq!(snap.overflow_dropped, 2);
378 observer.observe(&JsonEvent::borrow(&v));
380 let snap2 = observer.snapshot();
381 assert_eq!(snap2.unique_keys, 2);
382 assert_eq!(snap2.overflow_dropped, 4);
383 for entry in &snap2.entries {
384 assert_eq!(entry.count, 2, "tracked key counter advanced");
385 }
386 }
387
388 #[test]
389 fn snapshot_sorts_by_count_desc_then_name() {
390 let observer = FieldObserver::new(100);
391 for _ in 0..3 {
392 observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
393 }
394 observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
395 observer.observe(&JsonEvent::borrow(&json!({"chill": 1})));
396 let snap = observer.snapshot();
397 let order: Vec<&str> = snap.entries.iter().map(|e| -> &str { &e.field }).collect();
398 assert_eq!(order, vec!["hot", "chill", "warm"]);
399 }
400
401 #[test]
402 fn reset_clears_state_and_returns_previous_counts() {
403 let observer = FieldObserver::new(100);
404 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2})));
405 observer.observe(&JsonEvent::borrow(&json!({"a": 1})));
406 let (prev_keys, prev_events) = observer.reset();
407 assert_eq!(prev_keys, 2);
408 assert_eq!(prev_events, 2);
409 let snap = observer.snapshot();
410 assert_eq!(snap.events_observed, 0);
411 assert_eq!(snap.unique_keys, 0);
412 assert_eq!(snap.overflow_dropped, 0);
413 assert!(snap.entries.is_empty());
414 }
415
416 #[test]
417 fn lifetime_counters_survive_reset() {
418 let observer = FieldObserver::new(2);
422 for _ in 0..3 {
424 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
425 }
426 let before = observer.snapshot();
427 assert_eq!(before.events_observed, 3);
428 assert_eq!(before.lifetime_events_observed, 3);
429 assert_eq!(before.overflow_dropped, 6);
430 assert_eq!(before.lifetime_overflow_dropped, 6);
431
432 observer.reset();
433 let after_reset = observer.snapshot();
434 assert_eq!(after_reset.events_observed, 0);
435 assert_eq!(after_reset.overflow_dropped, 0);
436 assert_eq!(after_reset.lifetime_events_observed, 3);
438 assert_eq!(after_reset.lifetime_overflow_dropped, 6);
439
440 observer.observe(&JsonEvent::borrow(&json!({"a": 1, "b": 2, "c": 3, "d": 4})));
442 let after = observer.snapshot();
443 assert_eq!(after.events_observed, 1);
444 assert_eq!(after.lifetime_events_observed, 4);
445 assert_eq!(after.overflow_dropped, 2);
446 assert_eq!(after.lifetime_overflow_dropped, 8);
447 }
448
449 #[test]
450 fn plain_event_observation_is_a_noop_for_counters() {
451 let observer = FieldObserver::new(100);
452 let plain = crate::event::PlainEvent::new("disk full".into());
453 observer.observe(&plain);
454 let snap = observer.snapshot();
455 assert_eq!(snap.events_observed, 1);
458 assert_eq!(snap.unique_keys, 0);
459 assert_eq!(snap.overflow_dropped, 0);
460 }
461
462 #[test]
463 fn coverage_partitions_observed_against_rule_set() {
464 let yaml = r#"
468title: Whoami
469status: test
470logsource:
471 category: test
472detection:
473 selection:
474 CommandLine|contains: whoami
475 condition: selection
476---
477title: Process Tampering
478status: test
479logsource:
480 category: test
481detection:
482 selection:
483 ProcessGuid: "{abc}"
484 condition: selection
485"#;
486 let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
487 let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
488 let observer = FieldObserver::new(100);
489 observer.observe(&JsonEvent::borrow(
490 &json!({"CommandLine":"whoami","User":"alice","src_ip":"10.0.0.1"}),
491 ));
492
493 let snap = observer.snapshot();
494 let coverage = snap.coverage(&rule_field_set);
495
496 assert_eq!(coverage.intersection_count, 1, "CommandLine intersects");
497 let unknown: Vec<&str> = coverage
498 .unknown
499 .iter()
500 .map(|e| -> &str { &e.field })
501 .collect();
502 assert!(unknown.contains(&"User"));
503 assert!(unknown.contains(&"src_ip"));
504 assert!(!unknown.contains(&"CommandLine"));
505 let missing: Vec<&str> = coverage.missing.iter().map(|(n, _)| *n).collect();
506 assert!(
507 missing.contains(&"ProcessGuid"),
508 "ProcessGuid was rule-referenced but never observed"
509 );
510 assert!(!missing.contains(&"CommandLine"));
511 }
512
513 #[test]
514 fn coverage_empty_observer_yields_only_missing() {
515 let yaml = r#"
516title: A
517status: test
518logsource:
519 category: test
520detection:
521 selection:
522 FieldA: x
523 condition: selection
524"#;
525 let collection = rsigma_parser::parse_sigma_yaml(yaml).expect("parse");
526 let rule_field_set = crate::fields::RuleFieldSet::collect(&collection, &[], true);
527 let observer = FieldObserver::new(100);
528
529 let snap = observer.snapshot();
530 let coverage = snap.coverage(&rule_field_set);
531 assert_eq!(coverage.intersection_count, 0);
532 assert!(coverage.unknown.is_empty());
533 assert_eq!(coverage.missing.len(), 1);
534 assert_eq!(coverage.missing[0].0, "FieldA");
535 }
536
537 #[test]
538 fn coverage_unknown_preserves_snapshot_ordering() {
539 let observer = FieldObserver::new(100);
540 for _ in 0..3 {
541 observer.observe(&JsonEvent::borrow(&json!({"hot": 1})));
542 }
543 observer.observe(&JsonEvent::borrow(&json!({"warm": 1})));
544 let empty_rule_set = crate::fields::RuleFieldSet::default();
545
546 let snap = observer.snapshot();
547 let coverage = snap.coverage(&empty_rule_set);
548 let order: Vec<&str> = coverage
549 .unknown
550 .iter()
551 .map(|e| -> &str { &e.field })
552 .collect();
553 assert_eq!(order, vec!["hot", "warm"]);
556 }
557}