1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use regex::Regex;
7use serde::Serialize;
8
9use crate::error::{Error, Result};
10use crate::rules::{CorrelationRule, RuleCondition, RuleSeverity};
11use hunt_query::timeline::{NormalizedVerdict, TimelineEvent};
12
13#[derive(Debug, Clone, Serialize)]
15pub struct Alert {
16 pub rule_name: String,
18 pub severity: RuleSeverity,
20 pub title: String,
22 pub triggered_at: DateTime<Utc>,
24 pub evidence: Vec<TimelineEvent>,
26 pub description: String,
28}
29
30#[derive(Debug, Clone)]
32struct WindowState {
33 bound_events: HashMap<String, Vec<TimelineEvent>>,
35 window_start: DateTime<Utc>,
37}
38
39#[derive(Debug, Clone)]
41struct CompiledPatterns {
42 target: Option<Regex>,
43 not_target: Option<Regex>,
44}
45
46#[derive(Debug)]
48pub struct CorrelationEngine {
49 rules: Vec<CorrelationRule>,
51 patterns: HashMap<(usize, usize), CompiledPatterns>,
53 windows: HashMap<usize, Vec<WindowState>>,
55}
56
57impl CorrelationEngine {
58 pub fn new(rules: Vec<CorrelationRule>) -> Result<Self> {
60 let mut patterns = HashMap::new();
61
62 for (ri, rule) in rules.iter().enumerate() {
63 for (ci, cond) in rule.conditions.iter().enumerate() {
64 let target = match &cond.target_pattern {
65 Some(pat) => Some(Regex::new(pat).map_err(|e| {
66 Error::Regex(format!("rule '{}' condition {ci}: {e}", rule.name))
67 })?),
68 None => None,
69 };
70 let not_target = match &cond.not_target_pattern {
71 Some(pat) => Some(Regex::new(pat).map_err(|e| {
72 Error::Regex(format!(
73 "rule '{}' condition {ci} not_target: {e}",
74 rule.name
75 ))
76 })?),
77 None => None,
78 };
79 patterns.insert((ri, ci), CompiledPatterns { target, not_target });
80 }
81 }
82
83 Ok(Self {
84 rules,
85 patterns,
86 windows: HashMap::new(),
87 })
88 }
89
90 pub fn process_event(&mut self, event: &TimelineEvent) -> Vec<Alert> {
94 self.evict_expired_at(event.timestamp);
95
96 let mut alerts = Vec::new();
97
98 for ri in 0..self.rules.len() {
99 let rule_alerts = self.evaluate_rule(ri, event);
100 alerts.extend(rule_alerts);
101 }
102
103 alerts
104 }
105
106 pub fn evict_expired_at(&mut self, now: DateTime<Utc>) {
109 for (ri, windows) in &mut self.windows {
110 let window_dur = self.rules[*ri].window;
111 windows.retain(|ws| {
112 let elapsed = now.signed_duration_since(ws.window_start);
113 elapsed <= window_dur
114 });
115 }
116
117 self.windows.retain(|_, v| !v.is_empty());
119 }
120
121 pub fn evict_expired(&mut self) {
124 self.evict_expired_at(Utc::now());
125 }
126
127 pub fn evict_expired_capped(&mut self, max_window: chrono::Duration) {
130 let now = Utc::now();
131 for (ri, windows) in &mut self.windows {
132 let rule_dur = self.rules[*ri].window;
133 let effective = if max_window < rule_dur {
134 max_window
135 } else {
136 rule_dur
137 };
138 windows.retain(|ws| {
139 let elapsed = now.signed_duration_since(ws.window_start);
140 elapsed <= effective
141 });
142 }
143 self.windows.retain(|_, v| !v.is_empty());
144 }
145
146 pub fn rules(&self) -> &[CorrelationRule] {
148 &self.rules
149 }
150
151 pub fn flush(&mut self) -> Vec<Alert> {
153 self.evict_expired();
154
155 let mut alerts = Vec::new();
156
157 for (ri, windows) in self.windows.drain() {
158 let rule = &self.rules[ri];
159 for ws in windows {
160 if all_conditions_met(rule, &ws) {
161 alerts.push(build_alert(rule, &ws));
162 }
163 }
164 }
165
166 alerts
167 }
168
169 fn evaluate_rule(&mut self, ri: usize, event: &TimelineEvent) -> Vec<Alert> {
171 let mut alerts = Vec::new();
172 let rule = &self.rules[ri];
173
174 let pre_existing_count = self.windows.get(&ri).map_or(0, |w| w.len());
179 let mut dependent_advanced = vec![false; pre_existing_count];
182
183 for (ci, cond) in rule.conditions.iter().enumerate() {
185 let cp = match self.patterns.get(&(ri, ci)) {
186 Some(cp) => cp,
187 None => continue,
188 };
189
190 if !condition_matches(cond, cp, event) {
191 continue;
192 }
193
194 if cond.after.is_none() {
195 let mut bound = HashMap::new();
197 bound.insert(cond.bind.clone(), vec![event.clone()]);
198 let ws = WindowState {
199 bound_events: bound,
200 window_start: event.timestamp,
201 };
202
203 self.windows.entry(ri).or_default().push(ws);
204 } else {
205 let after_bind = cond.after.as_deref();
207
208 if pre_existing_count == 0 {
209 continue;
210 }
211
212 let windows = match self.windows.get_mut(&ri) {
213 Some(w) => w,
214 None => continue,
215 };
216
217 for (wi, ws) in windows.iter_mut().take(pre_existing_count).enumerate() {
218 if dependent_advanced.get(wi).copied().unwrap_or(false) {
220 continue;
221 }
222
223 if ws.bound_events.contains_key(&cond.bind) {
225 continue;
226 }
227
228 let after_ok = match after_bind {
230 Some(ab) => ws.bound_events.contains_key(ab),
231 None => true,
232 };
233 if !after_ok {
234 continue;
235 }
236
237 if let Some(ab) = after_bind {
240 if let Some(after_events) = ws.bound_events.get(ab) {
241 if let Some(latest_after) =
242 after_events.iter().map(|e| e.timestamp).max()
243 {
244 let elapsed = event.timestamp.signed_duration_since(latest_after);
245 if elapsed < chrono::Duration::zero() {
246 continue;
247 }
248 if let Some(within_dur) = cond.within {
249 if elapsed > within_dur {
250 continue;
251 }
252 }
253 }
254 }
255 }
256
257 ws.bound_events
259 .entry(cond.bind.clone())
260 .or_default()
261 .push(event.clone());
262 if let Some(slot) = dependent_advanced.get_mut(wi) {
263 *slot = true;
264 }
265 }
266 }
267 }
268
269 if let Some(windows) = self.windows.get_mut(&ri) {
271 let rule = &self.rules[ri];
272 let mut completed_indices = Vec::new();
273
274 for (wi, ws) in windows.iter().enumerate() {
275 if all_conditions_met(rule, ws) {
276 alerts.push(build_alert(rule, ws));
277 completed_indices.push(wi);
278 }
279 }
280
281 for wi in completed_indices.into_iter().rev() {
283 windows.remove(wi);
284 }
285 }
286
287 alerts
288 }
289}
290
291fn all_conditions_met(rule: &CorrelationRule, ws: &WindowState) -> bool {
293 rule.conditions
294 .iter()
295 .all(|cond| ws.bound_events.contains_key(&cond.bind))
296}
297
298fn condition_matches(
300 cond: &RuleCondition,
301 compiled: &CompiledPatterns,
302 event: &TimelineEvent,
303) -> bool {
304 let event_source_str = event.source.to_string().to_lowercase();
306 let source_ok = cond
307 .source
308 .iter()
309 .any(|s| s.to_lowercase() == event_source_str);
310 if !source_ok {
311 return false;
312 }
313
314 if let Some(ref at) = cond.action_type {
316 match &event.action_type {
317 Some(eat) => {
318 if !eat.eq_ignore_ascii_case(at) {
319 return false;
320 }
321 }
322 None => return false,
323 }
324 }
325
326 if let Some(ref v) = cond.verdict {
328 let expected = match v.to_lowercase().as_str() {
329 "allow" => NormalizedVerdict::Allow,
330 "deny" => NormalizedVerdict::Deny,
331 "warn" => NormalizedVerdict::Warn,
332 "forwarded" => NormalizedVerdict::Forwarded,
333 "dropped" => NormalizedVerdict::Dropped,
334 "none" => NormalizedVerdict::None,
335 _ => return false,
336 };
337 if event.verdict != expected {
338 return false;
339 }
340 }
341
342 if let Some(ref re) = compiled.target {
344 if !re.is_match(&event.summary) {
345 return false;
346 }
347 }
348
349 if let Some(ref re) = compiled.not_target {
351 if re.is_match(&event.summary) {
352 return false;
353 }
354 }
355
356 true
357}
358
359fn build_alert(rule: &CorrelationRule, ws: &WindowState) -> Alert {
361 let mut evidence = Vec::new();
363 for bind_name in &rule.output.evidence {
364 if let Some(events) = ws.bound_events.get(bind_name) {
365 evidence.extend(events.iter().cloned());
366 }
367 }
368
369 let triggered_at = evidence
371 .iter()
372 .map(|e| e.timestamp)
373 .max()
374 .unwrap_or_else(Utc::now);
375
376 Alert {
377 rule_name: rule.name.clone(),
378 severity: rule.severity,
379 title: rule.output.title.clone(),
380 triggered_at,
381 evidence,
382 description: rule.description.clone(),
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::rules::parse_rule;
390 use chrono::TimeZone;
391 use hunt_query::query::EventSource;
392 use hunt_query::timeline::{NormalizedVerdict, TimelineEvent, TimelineEventKind};
393
394 fn make_event(
395 source: EventSource,
396 action_type: &str,
397 verdict: NormalizedVerdict,
398 summary: &str,
399 ts: DateTime<Utc>,
400 ) -> TimelineEvent {
401 TimelineEvent {
402 timestamp: ts,
403 source,
404 kind: TimelineEventKind::GuardDecision,
405 verdict,
406 severity: None,
407 summary: summary.to_string(),
408 process: None,
409 namespace: None,
410 pod: None,
411 action_type: Some(action_type.to_string()),
412 signature_valid: None,
413 raw: None,
414 }
415 }
416
417 fn exfil_rule() -> CorrelationRule {
418 parse_rule(
419 r#"
420schema: clawdstrike.hunt.correlation.v1
421name: "MCP Tool Exfiltration Attempt"
422severity: high
423description: >
424 Detects an MCP tool reading sensitive files followed by
425 network egress to an external domain within 30 seconds.
426window: 30s
427conditions:
428 - source: receipt
429 action_type: file
430 verdict: allow
431 target_pattern: "/etc/passwd|/etc/shadow|\\.ssh/|\\.(env|pem|key)$"
432 bind: file_access
433 - source: [receipt, hubble]
434 action_type: egress
435 not_target_pattern: "->\\s*(localhost|127\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|10\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|172\\.(1[6-9]|2[0-9]|3[01])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|192\\.168\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]))(?::[0-9]{1,5})?(?:$|[^A-Za-z0-9.:-])"
436 after: file_access
437 within: 30s
438 bind: egress_event
439output:
440 title: "Potential data exfiltration via MCP tool"
441 evidence:
442 - file_access
443 - egress_event
444"#,
445 )
446 .unwrap()
447 }
448
449 fn single_condition_rule() -> CorrelationRule {
450 parse_rule(
451 r#"
452schema: clawdstrike.hunt.correlation.v1
453name: "Forbidden Path Access"
454severity: critical
455description: "Detects any denied file access"
456window: 5m
457conditions:
458 - source: receipt
459 action_type: file
460 verdict: deny
461 bind: denied_access
462output:
463 title: "File access denied"
464 evidence:
465 - denied_access
466"#,
467 )
468 .unwrap()
469 }
470
471 #[test]
472 fn engine_new_compiles_regex() {
473 let rule = exfil_rule();
474 let engine = CorrelationEngine::new(vec![rule]).unwrap();
475 assert_eq!(engine.rules().len(), 1);
476 }
477
478 #[test]
479 fn engine_new_rejects_bad_regex() {
480 let mut rule = exfil_rule();
481 rule.conditions[0].target_pattern = Some("[invalid".to_string());
482 let result = CorrelationEngine::new(vec![rule]);
483 assert!(result.is_err());
484 let msg = result.unwrap_err().to_string();
485 assert!(msg.contains("regex"), "got: {msg}");
486 }
487
488 #[test]
489 fn single_condition_rule_fires_immediately() {
490 let rule = single_condition_rule();
491 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
492
493 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
494 let event = make_event(
495 EventSource::Receipt,
496 "file",
497 NormalizedVerdict::Deny,
498 "/etc/passwd",
499 ts,
500 );
501
502 let alerts = engine.process_event(&event);
503 assert_eq!(alerts.len(), 1);
504 assert_eq!(alerts[0].rule_name, "Forbidden Path Access");
505 assert_eq!(alerts[0].severity, RuleSeverity::Critical);
506 assert_eq!(alerts[0].evidence.len(), 1);
507 }
508
509 #[test]
510 fn two_condition_sequence_generates_alert() {
511 let rule = exfil_rule();
512 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
513
514 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
515 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
516
517 let e1 = make_event(
519 EventSource::Receipt,
520 "file",
521 NormalizedVerdict::Allow,
522 "read /etc/passwd",
523 ts1,
524 );
525 let alerts = engine.process_event(&e1);
526 assert!(
527 alerts.is_empty(),
528 "should not alert on first condition only"
529 );
530
531 let e2 = make_event(
533 EventSource::Receipt,
534 "egress",
535 NormalizedVerdict::Allow,
536 "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
537 ts2,
538 );
539 let alerts = engine.process_event(&e2);
540 assert_eq!(alerts.len(), 1);
541 assert_eq!(alerts[0].title, "Potential data exfiltration via MCP tool");
542 assert_eq!(alerts[0].evidence.len(), 2);
543 }
544
545 #[test]
546 fn egress_to_internal_excluded_by_not_target() {
547 let rule = exfil_rule();
548 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
549
550 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
551 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
552
553 let e1 = make_event(
554 EventSource::Receipt,
555 "file",
556 NormalizedVerdict::Allow,
557 "read /etc/passwd",
558 ts1,
559 );
560 engine.process_event(&e1);
561
562 let e2 = make_event(
564 EventSource::Receipt,
565 "egress",
566 NormalizedVerdict::Allow,
567 "egress TCP 10.0.0.1:8080 -> 192.168.1.1:8080",
568 ts2,
569 );
570 let alerts = engine.process_event(&e2);
571 assert!(
572 alerts.is_empty(),
573 "internal egress should not trigger alert"
574 );
575 }
576
577 #[test]
578 fn egress_to_localhost_subdomain_still_alerts() {
579 let rule = exfil_rule();
581 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
582
583 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
584 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
585
586 let e1 = make_event(
587 EventSource::Receipt,
588 "file",
589 NormalizedVerdict::Allow,
590 "read /etc/passwd",
591 ts1,
592 );
593 engine.process_event(&e1);
594
595 let e2 = make_event(
596 EventSource::Receipt,
597 "egress",
598 NormalizedVerdict::Allow,
599 "egress TCP 10.0.0.1:8080 -> localhost.evil.com:443",
600 ts2,
601 );
602 let alerts = engine.process_event(&e2);
603 assert_eq!(
604 alerts.len(),
605 1,
606 "localhost subdomains are external and should not be excluded"
607 );
608 }
609
610 #[test]
611 fn egress_to_172_20_range_excluded_as_private() {
612 let rule = exfil_rule();
614 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
615
616 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
617 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
618
619 let e1 = make_event(
620 EventSource::Receipt,
621 "file",
622 NormalizedVerdict::Allow,
623 "read /etc/passwd",
624 ts1,
625 );
626 engine.process_event(&e1);
627
628 let e2 = make_event(
630 EventSource::Receipt,
631 "egress",
632 NormalizedVerdict::Allow,
633 "egress TCP 10.0.0.1:8080 -> 172.25.0.1:8080",
634 ts2,
635 );
636 let alerts = engine.process_event(&e2);
637 assert!(
638 alerts.is_empty(),
639 "172.25.x.x is RFC 1918 private and should be excluded"
640 );
641 }
642
643 #[test]
644 fn egress_to_172_2_not_excluded_as_public() {
645 let rule = exfil_rule();
647 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
648
649 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
650 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
651
652 let e1 = make_event(
653 EventSource::Receipt,
654 "file",
655 NormalizedVerdict::Allow,
656 "read /etc/passwd",
657 ts1,
658 );
659 engine.process_event(&e1);
660
661 let e2 = make_event(
663 EventSource::Receipt,
664 "egress",
665 NormalizedVerdict::Allow,
666 "egress TCP 10.0.0.1:8080 -> 172.2.0.1:8080",
667 ts2,
668 );
669 let alerts = engine.process_event(&e2);
670 assert_eq!(
671 alerts.len(),
672 1,
673 "172.2.x.x is a public IP and should trigger exfiltration alert"
674 );
675 }
676
677 #[test]
678 fn egress_to_100_not_excluded_as_public() {
679 let rule = exfil_rule();
681 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
682
683 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
684 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
685
686 let e1 = make_event(
687 EventSource::Receipt,
688 "file",
689 NormalizedVerdict::Allow,
690 "read /etc/passwd",
691 ts1,
692 );
693 engine.process_event(&e1);
694
695 let e2 = make_event(
696 EventSource::Receipt,
697 "egress",
698 NormalizedVerdict::Allow,
699 "egress TCP 10.0.0.1:8080 -> 100.1.2.3:8080",
700 ts2,
701 );
702 let alerts = engine.process_event(&e2);
703 assert_eq!(
704 alerts.len(),
705 1,
706 "100.x.x.x is a public IP and should trigger exfiltration alert"
707 );
708 }
709
710 #[test]
711 fn egress_without_direction_prefix_private_source_public_dest_still_alerts() {
712 let rule = exfil_rule();
715 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
716
717 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
718 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
719
720 let e1 = make_event(
721 EventSource::Receipt,
722 "file",
723 NormalizedVerdict::Allow,
724 "read /etc/passwd",
725 ts1,
726 );
727 engine.process_event(&e1);
728
729 let e2 = make_event(
730 EventSource::Receipt,
731 "egress",
732 NormalizedVerdict::Allow,
733 "10.0.0.1 -> 93.184.216.34:443",
734 ts2,
735 );
736 let alerts = engine.process_event(&e2);
737 assert_eq!(
738 alerts.len(),
739 1,
740 "private source at summary start must not suppress external destination alerts"
741 );
742 }
743
744 #[test]
745 fn within_constraint_rejects_late_event() {
746 let rule = exfil_rule();
747 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
748
749 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
750 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 31).unwrap();
752
753 let e1 = make_event(
754 EventSource::Receipt,
755 "file",
756 NormalizedVerdict::Allow,
757 "read /home/user/.ssh/id_rsa",
758 ts1,
759 );
760 engine.process_event(&e1);
761
762 let e2 = make_event(
763 EventSource::Receipt,
764 "egress",
765 NormalizedVerdict::Allow,
766 "evil.com:443",
767 ts2,
768 );
769 let alerts = engine.process_event(&e2);
770 assert!(
771 alerts.is_empty(),
772 "event outside within window should not trigger"
773 );
774 }
775
776 #[test]
777 fn after_without_within_rejects_out_of_order_event() {
778 let rule = parse_rule(
779 r#"
780schema: clawdstrike.hunt.correlation.v1
781name: "Ordered Dependent Sequence"
782severity: medium
783description: "Dependent events must occur after their prerequisite"
784window: 5m
785conditions:
786 - source: receipt
787 action_type: file
788 bind: first
789 - source: receipt
790 action_type: egress
791 after: first
792 bind: second
793output:
794 title: "Ordered sequence matched"
795 evidence:
796 - first
797 - second
798"#,
799 )
800 .unwrap();
801 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
802
803 let ts_first = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
804 let ts_older = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
805 let ts_newer = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 20).unwrap();
806
807 let first = make_event(
808 EventSource::Receipt,
809 "file",
810 NormalizedVerdict::Allow,
811 "read /etc/passwd",
812 ts_first,
813 );
814 engine.process_event(&first);
815
816 let out_of_order = make_event(
817 EventSource::Receipt,
818 "egress",
819 NormalizedVerdict::Allow,
820 "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
821 ts_older,
822 );
823 let alerts = engine.process_event(&out_of_order);
824 assert!(
825 alerts.is_empty(),
826 "dependent event older than prerequisite must not match"
827 );
828
829 let ordered = make_event(
830 EventSource::Receipt,
831 "egress",
832 NormalizedVerdict::Allow,
833 "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
834 ts_newer,
835 );
836 let alerts = engine.process_event(&ordered);
837 assert_eq!(
838 alerts.len(),
839 1,
840 "dependent event after prerequisite should still match"
841 );
842 }
843
844 #[test]
845 fn event_matching_no_rules() {
846 let rule = exfil_rule();
847 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
848
849 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
850 let event = make_event(
852 EventSource::Tetragon,
853 "process",
854 NormalizedVerdict::None,
855 "process_exec /usr/bin/ls",
856 ts,
857 );
858
859 let alerts = engine.process_event(&event);
860 assert!(alerts.is_empty());
861 }
862
863 #[test]
864 fn multiple_rules_same_event() {
865 let rule1 = single_condition_rule();
866 let rule2 = parse_rule(
867 r#"
868schema: clawdstrike.hunt.correlation.v1
869name: "Any File Deny"
870severity: medium
871description: "Any file denial"
872window: 1m
873conditions:
874 - source: receipt
875 action_type: file
876 verdict: deny
877 bind: evt
878output:
879 title: "File denial observed"
880 evidence:
881 - evt
882"#,
883 )
884 .unwrap();
885
886 let mut engine = CorrelationEngine::new(vec![rule1, rule2]).unwrap();
887
888 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
889 let event = make_event(
890 EventSource::Receipt,
891 "file",
892 NormalizedVerdict::Deny,
893 "/secret",
894 ts,
895 );
896
897 let alerts = engine.process_event(&event);
898 assert_eq!(alerts.len(), 2, "both rules should fire");
899
900 let names: Vec<&str> = alerts.iter().map(|a| a.rule_name.as_str()).collect();
901 assert!(names.contains(&"Forbidden Path Access"));
902 assert!(names.contains(&"Any File Deny"));
903 }
904
905 #[test]
906 fn single_event_cannot_satisfy_root_and_dependent_condition() {
907 let rule = parse_rule(
912 r#"
913schema: clawdstrike.hunt.correlation.v1
914name: "Self-match guard"
915severity: high
916description: "Should require two distinct events"
917window: 30s
918conditions:
919 - source: receipt
920 action_type: egress
921 bind: first
922 - source: receipt
923 action_type: egress
924 after: first
925 within: 30s
926 bind: second
927output:
928 title: "Two egress events"
929 evidence:
930 - first
931 - second
932"#,
933 )
934 .unwrap();
935 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
936
937 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
938 let event = make_event(
939 EventSource::Receipt,
940 "egress",
941 NormalizedVerdict::Allow,
942 "evil.com:443",
943 ts,
944 );
945
946 let alerts = engine.process_event(&event);
948 assert!(
949 alerts.is_empty(),
950 "a single event must not satisfy both root and dependent conditions"
951 );
952
953 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
955 let event2 = make_event(
956 EventSource::Receipt,
957 "egress",
958 NormalizedVerdict::Allow,
959 "other.com:443",
960 ts2,
961 );
962 let alerts = engine.process_event(&event2);
963 assert_eq!(
964 alerts.len(),
965 1,
966 "two distinct events should complete the sequence"
967 );
968 }
969
970 #[test]
971 fn single_event_cannot_satisfy_chained_dependent_conditions() {
972 let rule = parse_rule(
973 r#"
974schema: clawdstrike.hunt.correlation.v1
975name: "Dependent chain"
976severity: high
977description: "Should require three distinct events"
978window: 30s
979conditions:
980 - source: receipt
981 action_type: file
982 bind: first
983 - source: receipt
984 action_type: egress
985 after: first
986 within: 30s
987 bind: second
988 - source: receipt
989 action_type: egress
990 after: second
991 within: 30s
992 bind: third
993output:
994 title: "Three-step sequence"
995 evidence:
996 - first
997 - second
998 - third
999"#,
1000 )
1001 .unwrap();
1002
1003 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1004 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1005 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
1006 let ts3 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
1007
1008 let first = make_event(
1009 EventSource::Receipt,
1010 "file",
1011 NormalizedVerdict::Allow,
1012 "read /tmp/data",
1013 ts1,
1014 );
1015 assert!(engine.process_event(&first).is_empty());
1016
1017 let second = make_event(
1020 EventSource::Receipt,
1021 "egress",
1022 NormalizedVerdict::Allow,
1023 "evil.com:443",
1024 ts2,
1025 );
1026 assert!(
1027 engine.process_event(&second).is_empty(),
1028 "a single dependent event must not satisfy an entire chain"
1029 );
1030
1031 let third = make_event(
1032 EventSource::Receipt,
1033 "egress",
1034 NormalizedVerdict::Allow,
1035 "other.com:443",
1036 ts3,
1037 );
1038 assert_eq!(engine.process_event(&third).len(), 1);
1039 }
1040
1041 #[test]
1042 fn condition_matches_source_check() {
1043 let cond = RuleCondition {
1044 source: vec!["receipt".to_string()],
1045 action_type: None,
1046 verdict: None,
1047 target_pattern: None,
1048 not_target_pattern: None,
1049 after: None,
1050 within: None,
1051 bind: "test".to_string(),
1052 };
1053 let cp = CompiledPatterns {
1054 target: None,
1055 not_target: None,
1056 };
1057
1058 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1059
1060 let receipt_event = make_event(
1061 EventSource::Receipt,
1062 "file",
1063 NormalizedVerdict::Allow,
1064 "test",
1065 ts,
1066 );
1067 assert!(condition_matches(&cond, &cp, &receipt_event));
1068
1069 let tetragon_event = make_event(
1070 EventSource::Tetragon,
1071 "process",
1072 NormalizedVerdict::None,
1073 "test",
1074 ts,
1075 );
1076 assert!(!condition_matches(&cond, &cp, &tetragon_event));
1077 }
1078
1079 #[test]
1080 fn condition_matches_target_pattern() {
1081 let cond = RuleCondition {
1082 source: vec!["receipt".to_string()],
1083 action_type: None,
1084 verdict: None,
1085 target_pattern: Some(r"\.env$".to_string()),
1086 not_target_pattern: None,
1087 after: None,
1088 within: None,
1089 bind: "test".to_string(),
1090 };
1091 let cp = CompiledPatterns {
1092 target: Some(Regex::new(r"\.env$").unwrap()),
1093 not_target: None,
1094 };
1095
1096 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1097
1098 let matching = make_event(
1099 EventSource::Receipt,
1100 "file",
1101 NormalizedVerdict::Allow,
1102 "read /app/.env",
1103 ts,
1104 );
1105 assert!(condition_matches(&cond, &cp, &matching));
1106
1107 let non_matching = make_event(
1108 EventSource::Receipt,
1109 "file",
1110 NormalizedVerdict::Allow,
1111 "read /app/config.toml",
1112 ts,
1113 );
1114 assert!(!condition_matches(&cond, &cp, &non_matching));
1115 }
1116
1117 #[test]
1118 fn condition_matches_not_target_pattern() {
1119 let cond = RuleCondition {
1120 source: vec!["receipt".to_string()],
1121 action_type: None,
1122 verdict: None,
1123 target_pattern: None,
1124 not_target_pattern: Some(r"^localhost".to_string()),
1125 after: None,
1126 within: None,
1127 bind: "test".to_string(),
1128 };
1129 let cp = CompiledPatterns {
1130 target: None,
1131 not_target: Some(Regex::new(r"^localhost").unwrap()),
1132 };
1133
1134 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1135
1136 let excluded = make_event(
1137 EventSource::Receipt,
1138 "egress",
1139 NormalizedVerdict::Allow,
1140 "localhost:8080",
1141 ts,
1142 );
1143 assert!(!condition_matches(&cond, &cp, &excluded));
1144
1145 let allowed = make_event(
1146 EventSource::Receipt,
1147 "egress",
1148 NormalizedVerdict::Allow,
1149 "evil.com:443",
1150 ts,
1151 );
1152 assert!(condition_matches(&cond, &cp, &allowed));
1153 }
1154
1155 #[test]
1156 fn condition_matches_verdict_filter() {
1157 let cond = RuleCondition {
1158 source: vec!["receipt".to_string()],
1159 action_type: None,
1160 verdict: Some("deny".to_string()),
1161 target_pattern: None,
1162 not_target_pattern: None,
1163 after: None,
1164 within: None,
1165 bind: "test".to_string(),
1166 };
1167 let cp = CompiledPatterns {
1168 target: None,
1169 not_target: None,
1170 };
1171
1172 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1173
1174 let deny_event = make_event(
1175 EventSource::Receipt,
1176 "file",
1177 NormalizedVerdict::Deny,
1178 "test",
1179 ts,
1180 );
1181 assert!(condition_matches(&cond, &cp, &deny_event));
1182
1183 let allow_event = make_event(
1184 EventSource::Receipt,
1185 "file",
1186 NormalizedVerdict::Allow,
1187 "test",
1188 ts,
1189 );
1190 assert!(!condition_matches(&cond, &cp, &allow_event));
1191 }
1192
1193 #[test]
1194 fn flush_emits_partially_complete_windows() {
1195 let rule = exfil_rule();
1197 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1198
1199 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1200 let e1 = make_event(
1201 EventSource::Receipt,
1202 "file",
1203 NormalizedVerdict::Allow,
1204 "read /etc/passwd",
1205 ts1,
1206 );
1207 engine.process_event(&e1);
1208
1209 let alerts = engine.flush();
1211 assert!(
1212 alerts.is_empty(),
1213 "incomplete window should not produce alert on flush"
1214 );
1215 }
1216
1217 #[test]
1218 fn flush_does_not_emit_alerts_from_expired_windows() {
1219 let rule = single_condition_rule();
1220 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1221 let stale_ts = Utc::now() - chrono::Duration::minutes(10);
1222 let stale_event = make_event(
1223 EventSource::Receipt,
1224 "file",
1225 NormalizedVerdict::Deny,
1226 "/etc/passwd",
1227 stale_ts,
1228 );
1229
1230 let mut bound_events = std::collections::HashMap::new();
1231 bound_events.insert("denied_access".to_string(), vec![stale_event]);
1232 engine.windows.insert(
1233 0,
1234 vec![WindowState {
1235 bound_events,
1236 window_start: stale_ts,
1237 }],
1238 );
1239
1240 let alerts = engine.flush();
1241 assert!(
1242 alerts.is_empty(),
1243 "expired windows should be evicted before flush emits alerts"
1244 );
1245 }
1246
1247 #[test]
1248 fn condition_matches_verdict_forwarded() {
1249 let cond = RuleCondition {
1250 source: vec!["hubble".to_string()],
1251 action_type: None,
1252 verdict: Some("forwarded".to_string()),
1253 target_pattern: None,
1254 not_target_pattern: None,
1255 after: None,
1256 within: None,
1257 bind: "test".to_string(),
1258 };
1259 let cp = CompiledPatterns {
1260 target: None,
1261 not_target: None,
1262 };
1263
1264 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1265
1266 let forwarded_event = make_event(
1267 EventSource::Hubble,
1268 "egress",
1269 NormalizedVerdict::Forwarded,
1270 "evil.com:443",
1271 ts,
1272 );
1273 assert!(condition_matches(&cond, &cp, &forwarded_event));
1274
1275 let allow_event = make_event(
1276 EventSource::Hubble,
1277 "egress",
1278 NormalizedVerdict::Allow,
1279 "evil.com:443",
1280 ts,
1281 );
1282 assert!(!condition_matches(&cond, &cp, &allow_event));
1283 }
1284
1285 #[test]
1286 fn condition_matches_verdict_dropped() {
1287 let cond = RuleCondition {
1288 source: vec!["hubble".to_string()],
1289 action_type: None,
1290 verdict: Some("dropped".to_string()),
1291 target_pattern: None,
1292 not_target_pattern: None,
1293 after: None,
1294 within: None,
1295 bind: "test".to_string(),
1296 };
1297 let cp = CompiledPatterns {
1298 target: None,
1299 not_target: None,
1300 };
1301
1302 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1303
1304 let dropped_event = make_event(
1305 EventSource::Hubble,
1306 "egress",
1307 NormalizedVerdict::Dropped,
1308 "evil.com:443",
1309 ts,
1310 );
1311 assert!(condition_matches(&cond, &cp, &dropped_event));
1312
1313 let forwarded_event = make_event(
1314 EventSource::Hubble,
1315 "egress",
1316 NormalizedVerdict::Forwarded,
1317 "evil.com:443",
1318 ts,
1319 );
1320 assert!(!condition_matches(&cond, &cp, &forwarded_event));
1321 }
1322
1323 #[test]
1324 fn condition_matches_verdict_none() {
1325 let cond = RuleCondition {
1326 source: vec!["tetragon".to_string()],
1327 action_type: None,
1328 verdict: Some("none".to_string()),
1329 target_pattern: None,
1330 not_target_pattern: None,
1331 after: None,
1332 within: None,
1333 bind: "test".to_string(),
1334 };
1335 let cp = CompiledPatterns {
1336 target: None,
1337 not_target: None,
1338 };
1339
1340 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1341
1342 let none_event = make_event(
1343 EventSource::Tetragon,
1344 "process",
1345 NormalizedVerdict::None,
1346 "process_exec /bin/sh",
1347 ts,
1348 );
1349 assert!(condition_matches(&cond, &cp, &none_event));
1350
1351 let allow_event = make_event(
1352 EventSource::Tetragon,
1353 "process",
1354 NormalizedVerdict::Allow,
1355 "process_exec /bin/sh",
1356 ts,
1357 );
1358 assert!(!condition_matches(&cond, &cp, &allow_event));
1359 }
1360
1361 #[test]
1362 fn condition_matches_verdict_unknown_rejects() {
1363 let cond = RuleCondition {
1364 source: vec!["receipt".to_string()],
1365 action_type: None,
1366 verdict: Some("invalid_verdict".to_string()),
1367 target_pattern: None,
1368 not_target_pattern: None,
1369 after: None,
1370 within: None,
1371 bind: "test".to_string(),
1372 };
1373 let cp = CompiledPatterns {
1374 target: None,
1375 not_target: None,
1376 };
1377
1378 let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1379
1380 let event = make_event(
1381 EventSource::Receipt,
1382 "file",
1383 NormalizedVerdict::Allow,
1384 "test",
1385 ts,
1386 );
1387 assert!(
1388 !condition_matches(&cond, &cp, &event),
1389 "unknown verdict string should never match"
1390 );
1391 }
1392
1393 #[test]
1394 fn hubble_source_matches_egress_condition() {
1395 let rule = exfil_rule();
1396 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1397
1398 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1399 let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
1400
1401 let e1 = make_event(
1402 EventSource::Receipt,
1403 "file",
1404 NormalizedVerdict::Allow,
1405 "read /home/user/.env",
1406 ts1,
1407 );
1408 engine.process_event(&e1);
1409
1410 let e2 = make_event(
1412 EventSource::Hubble,
1413 "egress",
1414 NormalizedVerdict::Allow,
1415 "evil.com:443",
1416 ts2,
1417 );
1418 let alerts = engine.process_event(&e2);
1419 assert_eq!(alerts.len(), 1);
1420 }
1421
1422 #[test]
1423 fn evict_expired_capped_uses_shorter_window() {
1424 let rule = exfil_rule();
1427 let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1428
1429 let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1430
1431 let e1 = make_event(
1433 EventSource::Receipt,
1434 "file",
1435 NormalizedVerdict::Allow,
1436 "read /etc/passwd",
1437 ts1,
1438 );
1439 engine.process_event(&e1);
1440
1441 assert_eq!(engine.windows.len(), 1);
1443
1444 engine.evict_expired_capped(chrono::Duration::zero());
1446 assert!(
1447 engine.windows.is_empty(),
1448 "zero max_window should evict all windows"
1449 );
1450 }
1451
1452 #[test]
1453 fn evict_expired_capped_preserves_when_cap_larger_than_rule_window() {
1454 let rule = exfil_rule(); let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1458
1459 let ts = Utc::now();
1460 let root_only = make_event(
1463 EventSource::Receipt,
1464 "file",
1465 NormalizedVerdict::Allow,
1466 "read /etc/passwd",
1467 ts,
1468 );
1469 let alerts = engine.process_event(&root_only);
1470 assert!(
1471 alerts.is_empty(),
1472 "root-only event should not complete rule"
1473 );
1474 let before = engine.windows.get(&0).map_or(0, Vec::len);
1475 assert_eq!(before, 1, "expected one active correlation window");
1476
1477 engine.evict_expired_capped(chrono::Duration::hours(24));
1479 let after = engine.windows.get(&0).map_or(0, Vec::len);
1480 assert_eq!(
1481 after, 1,
1482 "cap larger than rule window should preserve a fresh window"
1483 );
1484 }
1485}