Skip to main content

hunt_correlate/
engine.rs

1//! Correlation engine — evaluates correlation rules against event streams using sliding windows.
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use regex::Regex;
7use serde::Serialize;
8
9use crate::error::{Error, Result};
10use crate::rules::{CorrelationRule, RuleCondition, RuleSeverity};
11use hunt_query::timeline::{NormalizedVerdict, TimelineEvent};
12
13/// An alert generated when a correlation rule fires.
14#[derive(Debug, Clone, Serialize)]
15pub struct Alert {
16    /// Rule name that generated this alert.
17    pub rule_name: String,
18    /// Severity level.
19    pub severity: RuleSeverity,
20    /// Alert title from rule output.
21    pub title: String,
22    /// When the alert was triggered (timestamp of the final matching event).
23    pub triggered_at: DateTime<Utc>,
24    /// Evidence: the timeline events that contributed to this alert.
25    pub evidence: Vec<TimelineEvent>,
26    /// Alert description from rule.
27    pub description: String,
28}
29
30/// Tracks in-flight event bindings for a single detection sequence of a rule.
31#[derive(Debug, Clone)]
32struct WindowState {
33    /// Events bound by condition bind name.
34    bound_events: HashMap<String, Vec<TimelineEvent>>,
35    /// Timestamp of the first bound event (window start).
36    window_start: DateTime<Utc>,
37}
38
39/// Pre-compiled regex pair for a single condition.
40#[derive(Debug, Clone)]
41struct CompiledPatterns {
42    target: Option<Regex>,
43    not_target: Option<Regex>,
44}
45
46/// The correlation engine evaluates events against loaded rules using sliding windows.
47#[derive(Debug)]
48pub struct CorrelationEngine {
49    /// Parsed rules.
50    rules: Vec<CorrelationRule>,
51    /// Pre-compiled regex patterns, keyed by (rule index, condition index).
52    patterns: HashMap<(usize, usize), CompiledPatterns>,
53    /// In-flight window states: keyed by rule index, each rule can have multiple concurrent windows.
54    windows: HashMap<usize, Vec<WindowState>>,
55}
56
57impl CorrelationEngine {
58    /// Create a new correlation engine, pre-compiling all regex patterns.
59    pub fn new(rules: Vec<CorrelationRule>) -> Result<Self> {
60        let mut patterns = HashMap::new();
61
62        for (ri, rule) in rules.iter().enumerate() {
63            for (ci, cond) in rule.conditions.iter().enumerate() {
64                let target = match &cond.target_pattern {
65                    Some(pat) => Some(Regex::new(pat).map_err(|e| {
66                        Error::Regex(format!("rule '{}' condition {ci}: {e}", rule.name))
67                    })?),
68                    None => None,
69                };
70                let not_target = match &cond.not_target_pattern {
71                    Some(pat) => Some(Regex::new(pat).map_err(|e| {
72                        Error::Regex(format!(
73                            "rule '{}' condition {ci} not_target: {e}",
74                            rule.name
75                        ))
76                    })?),
77                    None => None,
78                };
79                patterns.insert((ri, ci), CompiledPatterns { target, not_target });
80            }
81        }
82
83        Ok(Self {
84            rules,
85            patterns,
86            windows: HashMap::new(),
87        })
88    }
89
90    /// Process a single event against all loaded rules.
91    /// Evicts expired windows first, then evaluates conditions.
92    /// Returns any alerts that were generated.
93    pub fn process_event(&mut self, event: &TimelineEvent) -> Vec<Alert> {
94        self.evict_expired_at(event.timestamp);
95
96        let mut alerts = Vec::new();
97
98        for ri in 0..self.rules.len() {
99            let rule_alerts = self.evaluate_rule(ri, event);
100            alerts.extend(rule_alerts);
101        }
102
103        alerts
104    }
105
106    /// Remove window states that have exceeded their rule's window duration,
107    /// using the given reference time (typically the current event's timestamp).
108    pub fn evict_expired_at(&mut self, now: DateTime<Utc>) {
109        for (ri, windows) in &mut self.windows {
110            let window_dur = self.rules[*ri].window;
111            windows.retain(|ws| {
112                let elapsed = now.signed_duration_since(ws.window_start);
113                elapsed <= window_dur
114            });
115        }
116
117        // Remove empty entries.
118        self.windows.retain(|_, v| !v.is_empty());
119    }
120
121    /// Remove window states that have exceeded their rule's window duration
122    /// using wall-clock time.
123    pub fn evict_expired(&mut self) {
124        self.evict_expired_at(Utc::now());
125    }
126
127    /// Remove window states that have exceeded the shorter of the rule's own
128    /// window duration and the provided `max_window` cap, using wall-clock time.
129    pub fn evict_expired_capped(&mut self, max_window: chrono::Duration) {
130        let now = Utc::now();
131        for (ri, windows) in &mut self.windows {
132            let rule_dur = self.rules[*ri].window;
133            let effective = if max_window < rule_dur {
134                max_window
135            } else {
136                rule_dur
137            };
138            windows.retain(|ws| {
139                let elapsed = now.signed_duration_since(ws.window_start);
140                elapsed <= effective
141            });
142        }
143        self.windows.retain(|_, v| !v.is_empty());
144    }
145
146    /// Return a reference to the loaded rules.
147    pub fn rules(&self) -> &[CorrelationRule] {
148        &self.rules
149    }
150
151    /// Flush all windows and return alerts for any fully-matched sequences.
152    pub fn flush(&mut self) -> Vec<Alert> {
153        self.evict_expired();
154
155        let mut alerts = Vec::new();
156
157        for (ri, windows) in self.windows.drain() {
158            let rule = &self.rules[ri];
159            for ws in windows {
160                if all_conditions_met(rule, &ws) {
161                    alerts.push(build_alert(rule, &ws));
162                }
163            }
164        }
165
166        alerts
167    }
168
169    /// Evaluate a single rule against an event. May create new windows or advance existing ones.
170    fn evaluate_rule(&mut self, ri: usize, event: &TimelineEvent) -> Vec<Alert> {
171        let mut alerts = Vec::new();
172        let rule = &self.rules[ri];
173
174        // Snapshot the number of windows that existed before processing this event.
175        // Dependent conditions (`after` is Some) must only iterate windows that
176        // existed before this event was processed, preventing a single event from
177        // binding to both a newly-created root window and its dependent condition.
178        let pre_existing_count = self.windows.get(&ri).map_or(0, |w| w.len());
179        // Track whether this event already advanced each pre-existing window
180        // through a dependent condition in this pass.
181        let mut dependent_advanced = vec![false; pre_existing_count];
182
183        // Check each condition to see if this event matches it.
184        for (ci, cond) in rule.conditions.iter().enumerate() {
185            let cp = match self.patterns.get(&(ri, ci)) {
186                Some(cp) => cp,
187                None => continue,
188            };
189
190            if !condition_matches(cond, cp, event) {
191                continue;
192            }
193
194            if cond.after.is_none() {
195                // This is a root condition (no dependency). Start a new window.
196                let mut bound = HashMap::new();
197                bound.insert(cond.bind.clone(), vec![event.clone()]);
198                let ws = WindowState {
199                    bound_events: bound,
200                    window_start: event.timestamp,
201                };
202
203                self.windows.entry(ri).or_default().push(ws);
204            } else {
205                // This condition depends on a prior bind. Try to advance existing windows.
206                let after_bind = cond.after.as_deref();
207
208                if pre_existing_count == 0 {
209                    continue;
210                }
211
212                let windows = match self.windows.get_mut(&ri) {
213                    Some(w) => w,
214                    None => continue,
215                };
216
217                for (wi, ws) in windows.iter_mut().take(pre_existing_count).enumerate() {
218                    // A single event may advance at most one dependent bind per window.
219                    if dependent_advanced.get(wi).copied().unwrap_or(false) {
220                        continue;
221                    }
222
223                    // Skip windows that already have this bind matched.
224                    if ws.bound_events.contains_key(&cond.bind) {
225                        continue;
226                    }
227
228                    // Check that the `after` bind exists in this window.
229                    let after_ok = match after_bind {
230                        Some(ab) => ws.bound_events.contains_key(ab),
231                        None => true,
232                    };
233                    if !after_ok {
234                        continue;
235                    }
236
237                    // Dependent events must never be earlier than the latest
238                    // prerequisite event, even when `within` is not set.
239                    if let Some(ab) = after_bind {
240                        if let Some(after_events) = ws.bound_events.get(ab) {
241                            if let Some(latest_after) =
242                                after_events.iter().map(|e| e.timestamp).max()
243                            {
244                                let elapsed = event.timestamp.signed_duration_since(latest_after);
245                                if elapsed < chrono::Duration::zero() {
246                                    continue;
247                                }
248                                if let Some(within_dur) = cond.within {
249                                    if elapsed > within_dur {
250                                        continue;
251                                    }
252                                }
253                            }
254                        }
255                    }
256
257                    // Bind this event.
258                    ws.bound_events
259                        .entry(cond.bind.clone())
260                        .or_default()
261                        .push(event.clone());
262                    if let Some(slot) = dependent_advanced.get_mut(wi) {
263                        *slot = true;
264                    }
265                }
266            }
267        }
268
269        // Check if any windows for this rule are now fully matched.
270        if let Some(windows) = self.windows.get_mut(&ri) {
271            let rule = &self.rules[ri];
272            let mut completed_indices = Vec::new();
273
274            for (wi, ws) in windows.iter().enumerate() {
275                if all_conditions_met(rule, ws) {
276                    alerts.push(build_alert(rule, ws));
277                    completed_indices.push(wi);
278                }
279            }
280
281            // Remove completed windows in reverse order to preserve indices.
282            for wi in completed_indices.into_iter().rev() {
283                windows.remove(wi);
284            }
285        }
286
287        alerts
288    }
289}
290
291/// Check if all conditions in a rule have at least one bound event.
292fn all_conditions_met(rule: &CorrelationRule, ws: &WindowState) -> bool {
293    rule.conditions
294        .iter()
295        .all(|cond| ws.bound_events.contains_key(&cond.bind))
296}
297
298/// Check if a single condition matches a timeline event.
299fn condition_matches(
300    cond: &RuleCondition,
301    compiled: &CompiledPatterns,
302    event: &TimelineEvent,
303) -> bool {
304    // Source check: condition.source must contain event.source (case-insensitive).
305    let event_source_str = event.source.to_string().to_lowercase();
306    let source_ok = cond
307        .source
308        .iter()
309        .any(|s| s.to_lowercase() == event_source_str);
310    if !source_ok {
311        return false;
312    }
313
314    // Action type check (case-insensitive).
315    if let Some(ref at) = cond.action_type {
316        match &event.action_type {
317            Some(eat) => {
318                if !eat.eq_ignore_ascii_case(at) {
319                    return false;
320                }
321            }
322            None => return false,
323        }
324    }
325
326    // Verdict check.
327    if let Some(ref v) = cond.verdict {
328        let expected = match v.to_lowercase().as_str() {
329            "allow" => NormalizedVerdict::Allow,
330            "deny" => NormalizedVerdict::Deny,
331            "warn" => NormalizedVerdict::Warn,
332            "forwarded" => NormalizedVerdict::Forwarded,
333            "dropped" => NormalizedVerdict::Dropped,
334            "none" => NormalizedVerdict::None,
335            _ => return false,
336        };
337        if event.verdict != expected {
338            return false;
339        }
340    }
341
342    // Target pattern: regex must match event.summary.
343    if let Some(ref re) = compiled.target {
344        if !re.is_match(&event.summary) {
345            return false;
346        }
347    }
348
349    // Not-target pattern: regex must NOT match event.summary.
350    if let Some(ref re) = compiled.not_target {
351        if re.is_match(&event.summary) {
352            return false;
353        }
354    }
355
356    true
357}
358
359/// Build an alert from a completed window state.
360fn build_alert(rule: &CorrelationRule, ws: &WindowState) -> Alert {
361    // Collect evidence events in the order specified by output.evidence.
362    let mut evidence = Vec::new();
363    for bind_name in &rule.output.evidence {
364        if let Some(events) = ws.bound_events.get(bind_name) {
365            evidence.extend(events.iter().cloned());
366        }
367    }
368
369    // Triggered at = timestamp of the latest evidence event.
370    let triggered_at = evidence
371        .iter()
372        .map(|e| e.timestamp)
373        .max()
374        .unwrap_or_else(Utc::now);
375
376    Alert {
377        rule_name: rule.name.clone(),
378        severity: rule.severity,
379        title: rule.output.title.clone(),
380        triggered_at,
381        evidence,
382        description: rule.description.clone(),
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use crate::rules::parse_rule;
390    use chrono::TimeZone;
391    use hunt_query::query::EventSource;
392    use hunt_query::timeline::{NormalizedVerdict, TimelineEvent, TimelineEventKind};
393
394    fn make_event(
395        source: EventSource,
396        action_type: &str,
397        verdict: NormalizedVerdict,
398        summary: &str,
399        ts: DateTime<Utc>,
400    ) -> TimelineEvent {
401        TimelineEvent {
402            timestamp: ts,
403            source,
404            kind: TimelineEventKind::GuardDecision,
405            verdict,
406            severity: None,
407            summary: summary.to_string(),
408            process: None,
409            namespace: None,
410            pod: None,
411            action_type: Some(action_type.to_string()),
412            signature_valid: None,
413            raw: None,
414        }
415    }
416
417    fn exfil_rule() -> CorrelationRule {
418        parse_rule(
419            r#"
420schema: clawdstrike.hunt.correlation.v1
421name: "MCP Tool Exfiltration Attempt"
422severity: high
423description: >
424  Detects an MCP tool reading sensitive files followed by
425  network egress to an external domain within 30 seconds.
426window: 30s
427conditions:
428  - source: receipt
429    action_type: file
430    verdict: allow
431    target_pattern: "/etc/passwd|/etc/shadow|\\.ssh/|\\.(env|pem|key)$"
432    bind: file_access
433  - source: [receipt, hubble]
434    action_type: egress
435    not_target_pattern: "->\\s*(localhost|127\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|10\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|172\\.(1[6-9]|2[0-9]|3[01])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])|192\\.168\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]))(?::[0-9]{1,5})?(?:$|[^A-Za-z0-9.:-])"
436    after: file_access
437    within: 30s
438    bind: egress_event
439output:
440  title: "Potential data exfiltration via MCP tool"
441  evidence:
442    - file_access
443    - egress_event
444"#,
445        )
446        .unwrap()
447    }
448
449    fn single_condition_rule() -> CorrelationRule {
450        parse_rule(
451            r#"
452schema: clawdstrike.hunt.correlation.v1
453name: "Forbidden Path Access"
454severity: critical
455description: "Detects any denied file access"
456window: 5m
457conditions:
458  - source: receipt
459    action_type: file
460    verdict: deny
461    bind: denied_access
462output:
463  title: "File access denied"
464  evidence:
465    - denied_access
466"#,
467        )
468        .unwrap()
469    }
470
471    #[test]
472    fn engine_new_compiles_regex() {
473        let rule = exfil_rule();
474        let engine = CorrelationEngine::new(vec![rule]).unwrap();
475        assert_eq!(engine.rules().len(), 1);
476    }
477
478    #[test]
479    fn engine_new_rejects_bad_regex() {
480        let mut rule = exfil_rule();
481        rule.conditions[0].target_pattern = Some("[invalid".to_string());
482        let result = CorrelationEngine::new(vec![rule]);
483        assert!(result.is_err());
484        let msg = result.unwrap_err().to_string();
485        assert!(msg.contains("regex"), "got: {msg}");
486    }
487
488    #[test]
489    fn single_condition_rule_fires_immediately() {
490        let rule = single_condition_rule();
491        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
492
493        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
494        let event = make_event(
495            EventSource::Receipt,
496            "file",
497            NormalizedVerdict::Deny,
498            "/etc/passwd",
499            ts,
500        );
501
502        let alerts = engine.process_event(&event);
503        assert_eq!(alerts.len(), 1);
504        assert_eq!(alerts[0].rule_name, "Forbidden Path Access");
505        assert_eq!(alerts[0].severity, RuleSeverity::Critical);
506        assert_eq!(alerts[0].evidence.len(), 1);
507    }
508
509    #[test]
510    fn two_condition_sequence_generates_alert() {
511        let rule = exfil_rule();
512        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
513
514        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
515        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
516
517        // First event: file access to sensitive path
518        let e1 = make_event(
519            EventSource::Receipt,
520            "file",
521            NormalizedVerdict::Allow,
522            "read /etc/passwd",
523            ts1,
524        );
525        let alerts = engine.process_event(&e1);
526        assert!(
527            alerts.is_empty(),
528            "should not alert on first condition only"
529        );
530
531        // Second event: egress to external domain
532        let e2 = make_event(
533            EventSource::Receipt,
534            "egress",
535            NormalizedVerdict::Allow,
536            "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
537            ts2,
538        );
539        let alerts = engine.process_event(&e2);
540        assert_eq!(alerts.len(), 1);
541        assert_eq!(alerts[0].title, "Potential data exfiltration via MCP tool");
542        assert_eq!(alerts[0].evidence.len(), 2);
543    }
544
545    #[test]
546    fn egress_to_internal_excluded_by_not_target() {
547        let rule = exfil_rule();
548        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
549
550        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
551        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
552
553        let e1 = make_event(
554            EventSource::Receipt,
555            "file",
556            NormalizedVerdict::Allow,
557            "read /etc/passwd",
558            ts1,
559        );
560        engine.process_event(&e1);
561
562        // Egress to internal IP — should be excluded by not_target_pattern
563        let e2 = make_event(
564            EventSource::Receipt,
565            "egress",
566            NormalizedVerdict::Allow,
567            "egress TCP 10.0.0.1:8080 -> 192.168.1.1:8080",
568            ts2,
569        );
570        let alerts = engine.process_event(&e2);
571        assert!(
572            alerts.is_empty(),
573            "internal egress should not trigger alert"
574        );
575    }
576
577    #[test]
578    fn egress_to_localhost_subdomain_still_alerts() {
579        // localhost.evil.com must not be treated as plain localhost.
580        let rule = exfil_rule();
581        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
582
583        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
584        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
585
586        let e1 = make_event(
587            EventSource::Receipt,
588            "file",
589            NormalizedVerdict::Allow,
590            "read /etc/passwd",
591            ts1,
592        );
593        engine.process_event(&e1);
594
595        let e2 = make_event(
596            EventSource::Receipt,
597            "egress",
598            NormalizedVerdict::Allow,
599            "egress TCP 10.0.0.1:8080 -> localhost.evil.com:443",
600            ts2,
601        );
602        let alerts = engine.process_event(&e2);
603        assert_eq!(
604            alerts.len(),
605            1,
606            "localhost subdomains are external and should not be excluded"
607        );
608    }
609
610    #[test]
611    fn egress_to_172_20_range_excluded_as_private() {
612        // 172.20.x.x is RFC 1918 private (172.16.0.0/12) and must be excluded.
613        let rule = exfil_rule();
614        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
615
616        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
617        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
618
619        let e1 = make_event(
620            EventSource::Receipt,
621            "file",
622            NormalizedVerdict::Allow,
623            "read /etc/passwd",
624            ts1,
625        );
626        engine.process_event(&e1);
627
628        // 172.25.0.1 is private — should NOT trigger alert
629        let e2 = make_event(
630            EventSource::Receipt,
631            "egress",
632            NormalizedVerdict::Allow,
633            "egress TCP 10.0.0.1:8080 -> 172.25.0.1:8080",
634            ts2,
635        );
636        let alerts = engine.process_event(&e2);
637        assert!(
638            alerts.is_empty(),
639            "172.25.x.x is RFC 1918 private and should be excluded"
640        );
641    }
642
643    #[test]
644    fn egress_to_172_2_not_excluded_as_public() {
645        // 172.2.x.x is NOT RFC 1918 private — it should trigger an alert.
646        let rule = exfil_rule();
647        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
648
649        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
650        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
651
652        let e1 = make_event(
653            EventSource::Receipt,
654            "file",
655            NormalizedVerdict::Allow,
656            "read /etc/passwd",
657            ts1,
658        );
659        engine.process_event(&e1);
660
661        // 172.2.0.1 is public — SHOULD trigger alert
662        let e2 = make_event(
663            EventSource::Receipt,
664            "egress",
665            NormalizedVerdict::Allow,
666            "egress TCP 10.0.0.1:8080 -> 172.2.0.1:8080",
667            ts2,
668        );
669        let alerts = engine.process_event(&e2);
670        assert_eq!(
671            alerts.len(),
672            1,
673            "172.2.x.x is a public IP and should trigger exfiltration alert"
674        );
675    }
676
677    #[test]
678    fn egress_to_100_not_excluded_as_public() {
679        // 100.x.x.x is public and must not be excluded by a 10.x prefix.
680        let rule = exfil_rule();
681        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
682
683        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
684        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
685
686        let e1 = make_event(
687            EventSource::Receipt,
688            "file",
689            NormalizedVerdict::Allow,
690            "read /etc/passwd",
691            ts1,
692        );
693        engine.process_event(&e1);
694
695        let e2 = make_event(
696            EventSource::Receipt,
697            "egress",
698            NormalizedVerdict::Allow,
699            "egress TCP 10.0.0.1:8080 -> 100.1.2.3:8080",
700            ts2,
701        );
702        let alerts = engine.process_event(&e2);
703        assert_eq!(
704            alerts.len(),
705            1,
706            "100.x.x.x is a public IP and should trigger exfiltration alert"
707        );
708    }
709
710    #[test]
711    fn egress_without_direction_prefix_private_source_public_dest_still_alerts() {
712        // Some summaries may omit the direction prefix and start with source IP.
713        // The not_target_pattern must not exclude based on private source.
714        let rule = exfil_rule();
715        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
716
717        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
718        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
719
720        let e1 = make_event(
721            EventSource::Receipt,
722            "file",
723            NormalizedVerdict::Allow,
724            "read /etc/passwd",
725            ts1,
726        );
727        engine.process_event(&e1);
728
729        let e2 = make_event(
730            EventSource::Receipt,
731            "egress",
732            NormalizedVerdict::Allow,
733            "10.0.0.1 -> 93.184.216.34:443",
734            ts2,
735        );
736        let alerts = engine.process_event(&e2);
737        assert_eq!(
738            alerts.len(),
739            1,
740            "private source at summary start must not suppress external destination alerts"
741        );
742    }
743
744    #[test]
745    fn within_constraint_rejects_late_event() {
746        let rule = exfil_rule();
747        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
748
749        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
750        // 31 seconds later — exceeds 30s within constraint
751        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 31).unwrap();
752
753        let e1 = make_event(
754            EventSource::Receipt,
755            "file",
756            NormalizedVerdict::Allow,
757            "read /home/user/.ssh/id_rsa",
758            ts1,
759        );
760        engine.process_event(&e1);
761
762        let e2 = make_event(
763            EventSource::Receipt,
764            "egress",
765            NormalizedVerdict::Allow,
766            "evil.com:443",
767            ts2,
768        );
769        let alerts = engine.process_event(&e2);
770        assert!(
771            alerts.is_empty(),
772            "event outside within window should not trigger"
773        );
774    }
775
776    #[test]
777    fn after_without_within_rejects_out_of_order_event() {
778        let rule = parse_rule(
779            r#"
780schema: clawdstrike.hunt.correlation.v1
781name: "Ordered Dependent Sequence"
782severity: medium
783description: "Dependent events must occur after their prerequisite"
784window: 5m
785conditions:
786  - source: receipt
787    action_type: file
788    bind: first
789  - source: receipt
790    action_type: egress
791    after: first
792    bind: second
793output:
794  title: "Ordered sequence matched"
795  evidence:
796    - first
797    - second
798"#,
799        )
800        .unwrap();
801        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
802
803        let ts_first = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
804        let ts_older = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
805        let ts_newer = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 20).unwrap();
806
807        let first = make_event(
808            EventSource::Receipt,
809            "file",
810            NormalizedVerdict::Allow,
811            "read /etc/passwd",
812            ts_first,
813        );
814        engine.process_event(&first);
815
816        let out_of_order = make_event(
817            EventSource::Receipt,
818            "egress",
819            NormalizedVerdict::Allow,
820            "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
821            ts_older,
822        );
823        let alerts = engine.process_event(&out_of_order);
824        assert!(
825            alerts.is_empty(),
826            "dependent event older than prerequisite must not match"
827        );
828
829        let ordered = make_event(
830            EventSource::Receipt,
831            "egress",
832            NormalizedVerdict::Allow,
833            "egress TCP 10.0.0.1:8080 -> 93.184.216.34:443",
834            ts_newer,
835        );
836        let alerts = engine.process_event(&ordered);
837        assert_eq!(
838            alerts.len(),
839            1,
840            "dependent event after prerequisite should still match"
841        );
842    }
843
844    #[test]
845    fn event_matching_no_rules() {
846        let rule = exfil_rule();
847        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
848
849        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
850        // Tetragon process event — does not match any condition in the exfil rule
851        let event = make_event(
852            EventSource::Tetragon,
853            "process",
854            NormalizedVerdict::None,
855            "process_exec /usr/bin/ls",
856            ts,
857        );
858
859        let alerts = engine.process_event(&event);
860        assert!(alerts.is_empty());
861    }
862
863    #[test]
864    fn multiple_rules_same_event() {
865        let rule1 = single_condition_rule();
866        let rule2 = parse_rule(
867            r#"
868schema: clawdstrike.hunt.correlation.v1
869name: "Any File Deny"
870severity: medium
871description: "Any file denial"
872window: 1m
873conditions:
874  - source: receipt
875    action_type: file
876    verdict: deny
877    bind: evt
878output:
879  title: "File denial observed"
880  evidence:
881    - evt
882"#,
883        )
884        .unwrap();
885
886        let mut engine = CorrelationEngine::new(vec![rule1, rule2]).unwrap();
887
888        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
889        let event = make_event(
890            EventSource::Receipt,
891            "file",
892            NormalizedVerdict::Deny,
893            "/secret",
894            ts,
895        );
896
897        let alerts = engine.process_event(&event);
898        assert_eq!(alerts.len(), 2, "both rules should fire");
899
900        let names: Vec<&str> = alerts.iter().map(|a| a.rule_name.as_str()).collect();
901        assert!(names.contains(&"Forbidden Path Access"));
902        assert!(names.contains(&"Any File Deny"));
903    }
904
905    #[test]
906    fn single_event_cannot_satisfy_root_and_dependent_condition() {
907        // Regression: a single event that matches both a root condition and
908        // a dependent condition (with `after` pointing to the root) must NOT
909        // bind to both in the same pass. The dependent condition should only
910        // match against windows that existed *before* this event was processed.
911        let rule = parse_rule(
912            r#"
913schema: clawdstrike.hunt.correlation.v1
914name: "Self-match guard"
915severity: high
916description: "Should require two distinct events"
917window: 30s
918conditions:
919  - source: receipt
920    action_type: egress
921    bind: first
922  - source: receipt
923    action_type: egress
924    after: first
925    within: 30s
926    bind: second
927output:
928  title: "Two egress events"
929  evidence:
930    - first
931    - second
932"#,
933        )
934        .unwrap();
935        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
936
937        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
938        let event = make_event(
939            EventSource::Receipt,
940            "egress",
941            NormalizedVerdict::Allow,
942            "evil.com:443",
943            ts,
944        );
945
946        // A single event should open a window but NOT complete it.
947        let alerts = engine.process_event(&event);
948        assert!(
949            alerts.is_empty(),
950            "a single event must not satisfy both root and dependent conditions"
951        );
952
953        // A second distinct event should now complete the window.
954        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
955        let event2 = make_event(
956            EventSource::Receipt,
957            "egress",
958            NormalizedVerdict::Allow,
959            "other.com:443",
960            ts2,
961        );
962        let alerts = engine.process_event(&event2);
963        assert_eq!(
964            alerts.len(),
965            1,
966            "two distinct events should complete the sequence"
967        );
968    }
969
970    #[test]
971    fn single_event_cannot_satisfy_chained_dependent_conditions() {
972        let rule = parse_rule(
973            r#"
974schema: clawdstrike.hunt.correlation.v1
975name: "Dependent chain"
976severity: high
977description: "Should require three distinct events"
978window: 30s
979conditions:
980  - source: receipt
981    action_type: file
982    bind: first
983  - source: receipt
984    action_type: egress
985    after: first
986    within: 30s
987    bind: second
988  - source: receipt
989    action_type: egress
990    after: second
991    within: 30s
992    bind: third
993output:
994  title: "Three-step sequence"
995  evidence:
996    - first
997    - second
998    - third
999"#,
1000        )
1001        .unwrap();
1002
1003        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1004        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1005        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
1006        let ts3 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 10).unwrap();
1007
1008        let first = make_event(
1009            EventSource::Receipt,
1010            "file",
1011            NormalizedVerdict::Allow,
1012            "read /tmp/data",
1013            ts1,
1014        );
1015        assert!(engine.process_event(&first).is_empty());
1016
1017        // This event matches both dependent conditions by predicate; it must
1018        // only satisfy the first dependent bind in this pass.
1019        let second = make_event(
1020            EventSource::Receipt,
1021            "egress",
1022            NormalizedVerdict::Allow,
1023            "evil.com:443",
1024            ts2,
1025        );
1026        assert!(
1027            engine.process_event(&second).is_empty(),
1028            "a single dependent event must not satisfy an entire chain"
1029        );
1030
1031        let third = make_event(
1032            EventSource::Receipt,
1033            "egress",
1034            NormalizedVerdict::Allow,
1035            "other.com:443",
1036            ts3,
1037        );
1038        assert_eq!(engine.process_event(&third).len(), 1);
1039    }
1040
1041    #[test]
1042    fn condition_matches_source_check() {
1043        let cond = RuleCondition {
1044            source: vec!["receipt".to_string()],
1045            action_type: None,
1046            verdict: None,
1047            target_pattern: None,
1048            not_target_pattern: None,
1049            after: None,
1050            within: None,
1051            bind: "test".to_string(),
1052        };
1053        let cp = CompiledPatterns {
1054            target: None,
1055            not_target: None,
1056        };
1057
1058        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1059
1060        let receipt_event = make_event(
1061            EventSource::Receipt,
1062            "file",
1063            NormalizedVerdict::Allow,
1064            "test",
1065            ts,
1066        );
1067        assert!(condition_matches(&cond, &cp, &receipt_event));
1068
1069        let tetragon_event = make_event(
1070            EventSource::Tetragon,
1071            "process",
1072            NormalizedVerdict::None,
1073            "test",
1074            ts,
1075        );
1076        assert!(!condition_matches(&cond, &cp, &tetragon_event));
1077    }
1078
1079    #[test]
1080    fn condition_matches_target_pattern() {
1081        let cond = RuleCondition {
1082            source: vec!["receipt".to_string()],
1083            action_type: None,
1084            verdict: None,
1085            target_pattern: Some(r"\.env$".to_string()),
1086            not_target_pattern: None,
1087            after: None,
1088            within: None,
1089            bind: "test".to_string(),
1090        };
1091        let cp = CompiledPatterns {
1092            target: Some(Regex::new(r"\.env$").unwrap()),
1093            not_target: None,
1094        };
1095
1096        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1097
1098        let matching = make_event(
1099            EventSource::Receipt,
1100            "file",
1101            NormalizedVerdict::Allow,
1102            "read /app/.env",
1103            ts,
1104        );
1105        assert!(condition_matches(&cond, &cp, &matching));
1106
1107        let non_matching = make_event(
1108            EventSource::Receipt,
1109            "file",
1110            NormalizedVerdict::Allow,
1111            "read /app/config.toml",
1112            ts,
1113        );
1114        assert!(!condition_matches(&cond, &cp, &non_matching));
1115    }
1116
1117    #[test]
1118    fn condition_matches_not_target_pattern() {
1119        let cond = RuleCondition {
1120            source: vec!["receipt".to_string()],
1121            action_type: None,
1122            verdict: None,
1123            target_pattern: None,
1124            not_target_pattern: Some(r"^localhost".to_string()),
1125            after: None,
1126            within: None,
1127            bind: "test".to_string(),
1128        };
1129        let cp = CompiledPatterns {
1130            target: None,
1131            not_target: Some(Regex::new(r"^localhost").unwrap()),
1132        };
1133
1134        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1135
1136        let excluded = make_event(
1137            EventSource::Receipt,
1138            "egress",
1139            NormalizedVerdict::Allow,
1140            "localhost:8080",
1141            ts,
1142        );
1143        assert!(!condition_matches(&cond, &cp, &excluded));
1144
1145        let allowed = make_event(
1146            EventSource::Receipt,
1147            "egress",
1148            NormalizedVerdict::Allow,
1149            "evil.com:443",
1150            ts,
1151        );
1152        assert!(condition_matches(&cond, &cp, &allowed));
1153    }
1154
1155    #[test]
1156    fn condition_matches_verdict_filter() {
1157        let cond = RuleCondition {
1158            source: vec!["receipt".to_string()],
1159            action_type: None,
1160            verdict: Some("deny".to_string()),
1161            target_pattern: None,
1162            not_target_pattern: None,
1163            after: None,
1164            within: None,
1165            bind: "test".to_string(),
1166        };
1167        let cp = CompiledPatterns {
1168            target: None,
1169            not_target: None,
1170        };
1171
1172        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1173
1174        let deny_event = make_event(
1175            EventSource::Receipt,
1176            "file",
1177            NormalizedVerdict::Deny,
1178            "test",
1179            ts,
1180        );
1181        assert!(condition_matches(&cond, &cp, &deny_event));
1182
1183        let allow_event = make_event(
1184            EventSource::Receipt,
1185            "file",
1186            NormalizedVerdict::Allow,
1187            "test",
1188            ts,
1189        );
1190        assert!(!condition_matches(&cond, &cp, &allow_event));
1191    }
1192
1193    #[test]
1194    fn flush_emits_partially_complete_windows() {
1195        // Flush should emit alerts for fully-matched windows only
1196        let rule = exfil_rule();
1197        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1198
1199        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1200        let e1 = make_event(
1201            EventSource::Receipt,
1202            "file",
1203            NormalizedVerdict::Allow,
1204            "read /etc/passwd",
1205            ts1,
1206        );
1207        engine.process_event(&e1);
1208
1209        // Only first condition matched — flush should NOT produce an alert
1210        let alerts = engine.flush();
1211        assert!(
1212            alerts.is_empty(),
1213            "incomplete window should not produce alert on flush"
1214        );
1215    }
1216
1217    #[test]
1218    fn flush_does_not_emit_alerts_from_expired_windows() {
1219        let rule = single_condition_rule();
1220        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1221        let stale_ts = Utc::now() - chrono::Duration::minutes(10);
1222        let stale_event = make_event(
1223            EventSource::Receipt,
1224            "file",
1225            NormalizedVerdict::Deny,
1226            "/etc/passwd",
1227            stale_ts,
1228        );
1229
1230        let mut bound_events = std::collections::HashMap::new();
1231        bound_events.insert("denied_access".to_string(), vec![stale_event]);
1232        engine.windows.insert(
1233            0,
1234            vec![WindowState {
1235                bound_events,
1236                window_start: stale_ts,
1237            }],
1238        );
1239
1240        let alerts = engine.flush();
1241        assert!(
1242            alerts.is_empty(),
1243            "expired windows should be evicted before flush emits alerts"
1244        );
1245    }
1246
1247    #[test]
1248    fn condition_matches_verdict_forwarded() {
1249        let cond = RuleCondition {
1250            source: vec!["hubble".to_string()],
1251            action_type: None,
1252            verdict: Some("forwarded".to_string()),
1253            target_pattern: None,
1254            not_target_pattern: None,
1255            after: None,
1256            within: None,
1257            bind: "test".to_string(),
1258        };
1259        let cp = CompiledPatterns {
1260            target: None,
1261            not_target: None,
1262        };
1263
1264        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1265
1266        let forwarded_event = make_event(
1267            EventSource::Hubble,
1268            "egress",
1269            NormalizedVerdict::Forwarded,
1270            "evil.com:443",
1271            ts,
1272        );
1273        assert!(condition_matches(&cond, &cp, &forwarded_event));
1274
1275        let allow_event = make_event(
1276            EventSource::Hubble,
1277            "egress",
1278            NormalizedVerdict::Allow,
1279            "evil.com:443",
1280            ts,
1281        );
1282        assert!(!condition_matches(&cond, &cp, &allow_event));
1283    }
1284
1285    #[test]
1286    fn condition_matches_verdict_dropped() {
1287        let cond = RuleCondition {
1288            source: vec!["hubble".to_string()],
1289            action_type: None,
1290            verdict: Some("dropped".to_string()),
1291            target_pattern: None,
1292            not_target_pattern: None,
1293            after: None,
1294            within: None,
1295            bind: "test".to_string(),
1296        };
1297        let cp = CompiledPatterns {
1298            target: None,
1299            not_target: None,
1300        };
1301
1302        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1303
1304        let dropped_event = make_event(
1305            EventSource::Hubble,
1306            "egress",
1307            NormalizedVerdict::Dropped,
1308            "evil.com:443",
1309            ts,
1310        );
1311        assert!(condition_matches(&cond, &cp, &dropped_event));
1312
1313        let forwarded_event = make_event(
1314            EventSource::Hubble,
1315            "egress",
1316            NormalizedVerdict::Forwarded,
1317            "evil.com:443",
1318            ts,
1319        );
1320        assert!(!condition_matches(&cond, &cp, &forwarded_event));
1321    }
1322
1323    #[test]
1324    fn condition_matches_verdict_none() {
1325        let cond = RuleCondition {
1326            source: vec!["tetragon".to_string()],
1327            action_type: None,
1328            verdict: Some("none".to_string()),
1329            target_pattern: None,
1330            not_target_pattern: None,
1331            after: None,
1332            within: None,
1333            bind: "test".to_string(),
1334        };
1335        let cp = CompiledPatterns {
1336            target: None,
1337            not_target: None,
1338        };
1339
1340        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1341
1342        let none_event = make_event(
1343            EventSource::Tetragon,
1344            "process",
1345            NormalizedVerdict::None,
1346            "process_exec /bin/sh",
1347            ts,
1348        );
1349        assert!(condition_matches(&cond, &cp, &none_event));
1350
1351        let allow_event = make_event(
1352            EventSource::Tetragon,
1353            "process",
1354            NormalizedVerdict::Allow,
1355            "process_exec /bin/sh",
1356            ts,
1357        );
1358        assert!(!condition_matches(&cond, &cp, &allow_event));
1359    }
1360
1361    #[test]
1362    fn condition_matches_verdict_unknown_rejects() {
1363        let cond = RuleCondition {
1364            source: vec!["receipt".to_string()],
1365            action_type: None,
1366            verdict: Some("invalid_verdict".to_string()),
1367            target_pattern: None,
1368            not_target_pattern: None,
1369            after: None,
1370            within: None,
1371            bind: "test".to_string(),
1372        };
1373        let cp = CompiledPatterns {
1374            target: None,
1375            not_target: None,
1376        };
1377
1378        let ts = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1379
1380        let event = make_event(
1381            EventSource::Receipt,
1382            "file",
1383            NormalizedVerdict::Allow,
1384            "test",
1385            ts,
1386        );
1387        assert!(
1388            !condition_matches(&cond, &cp, &event),
1389            "unknown verdict string should never match"
1390        );
1391    }
1392
1393    #[test]
1394    fn hubble_source_matches_egress_condition() {
1395        let rule = exfil_rule();
1396        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1397
1398        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1399        let ts2 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 5).unwrap();
1400
1401        let e1 = make_event(
1402            EventSource::Receipt,
1403            "file",
1404            NormalizedVerdict::Allow,
1405            "read /home/user/.env",
1406            ts1,
1407        );
1408        engine.process_event(&e1);
1409
1410        // Hubble source should also match the second condition (source: [receipt, hubble])
1411        let e2 = make_event(
1412            EventSource::Hubble,
1413            "egress",
1414            NormalizedVerdict::Allow,
1415            "evil.com:443",
1416            ts2,
1417        );
1418        let alerts = engine.process_event(&e2);
1419        assert_eq!(alerts.len(), 1);
1420    }
1421
1422    #[test]
1423    fn evict_expired_capped_uses_shorter_window() {
1424        // The exfil rule has a 30s window. Use a max_window of 10s so that
1425        // windows older than 10s are evicted even though the rule allows 30s.
1426        let rule = exfil_rule();
1427        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1428
1429        let ts1 = Utc.with_ymd_and_hms(2025, 6, 15, 12, 0, 0).unwrap();
1430
1431        // Inject a file-access event to start a window.
1432        let e1 = make_event(
1433            EventSource::Receipt,
1434            "file",
1435            NormalizedVerdict::Allow,
1436            "read /etc/passwd",
1437            ts1,
1438        );
1439        engine.process_event(&e1);
1440
1441        // Verify there is one active window.
1442        assert_eq!(engine.windows.len(), 1);
1443
1444        // Evict with max_window = 0s — should immediately evict everything.
1445        engine.evict_expired_capped(chrono::Duration::zero());
1446        assert!(
1447            engine.windows.is_empty(),
1448            "zero max_window should evict all windows"
1449        );
1450    }
1451
1452    #[test]
1453    fn evict_expired_capped_preserves_when_cap_larger_than_rule_window() {
1454        // With a cap larger than the rule window, eviction should
1455        // behave identically to the uncapped variant.
1456        let rule = exfil_rule(); // 30s window
1457        let mut engine = CorrelationEngine::new(vec![rule]).unwrap();
1458
1459        let ts = Utc::now();
1460        // Feed a root-matching event that opens a window but does not complete
1461        // the rule sequence (no matching egress event yet).
1462        let root_only = make_event(
1463            EventSource::Receipt,
1464            "file",
1465            NormalizedVerdict::Allow,
1466            "read /etc/passwd",
1467            ts,
1468        );
1469        let alerts = engine.process_event(&root_only);
1470        assert!(
1471            alerts.is_empty(),
1472            "root-only event should not complete rule"
1473        );
1474        let before = engine.windows.get(&0).map_or(0, Vec::len);
1475        assert_eq!(before, 1, "expected one active correlation window");
1476
1477        // A huge cap should not evict a just-created window.
1478        engine.evict_expired_capped(chrono::Duration::hours(24));
1479        let after = engine.windows.get(&0).map_or(0, Vec::len);
1480        assert_eq!(
1481            after, 1,
1482            "cap larger than rule window should preserve a fresh window"
1483        );
1484    }
1485}