Skip to main content

rsigma_eval/correlation_engine/
introspect.rs

1//! Read-only introspection of correlation window state.
2//!
3//! The only built-in visibility into correlation state is the aggregate
4//! `correlation_state_entries` count. That answers "is there state?" but not
5//! the operator's real question: "why did this correlation not fire?"
6//! [`CorrelationEngine::introspect`] projects, per correlation and per group,
7//! the current aggregate versus the threshold (the gap made explicit), the
8//! window contents, the last alert and remaining suppression, and the seconds
9//! until the next eviction. It is a read-only projection over the existing
10//! state with no hot-path cost.
11
12use serde::Serialize;
13
14use rsigma_parser::{ConditionOperator, CorrelationType};
15
16use super::CorrelationEngine;
17use crate::correlation::{CompiledCorrelation, GroupKey, WindowState};
18
19/// A snapshot of every compiled correlation and its live per-group window
20/// state at the moment [`CorrelationEngine::introspect`] was called.
21#[derive(Debug, Clone, Serialize)]
22pub struct CorrelationStateSnapshot {
23    /// One entry per compiled correlation (independent of whether it has
24    /// active state).
25    pub correlations: Vec<CorrelationInfo>,
26    /// One entry per live `(correlation, group_key)` window.
27    pub groups: Vec<GroupStateInfo>,
28}
29
30/// Static description of one compiled correlation.
31#[derive(Debug, Clone, Serialize)]
32pub struct CorrelationInfo {
33    pub index: usize,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub id: Option<String>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub name: Option<String>,
38    pub title: String,
39    #[serde(rename = "type")]
40    pub correlation_type: CorrelationType,
41    pub timespan_secs: u64,
42    pub group_by: Vec<String>,
43    pub rule_refs: Vec<String>,
44    /// The threshold predicates rendered for display, e.g. `>= 5`.
45    pub threshold: String,
46    /// Number of live group windows for this correlation.
47    pub active_groups: usize,
48}
49
50/// One field of a resolved group key.
51#[derive(Debug, Clone, Serialize)]
52pub struct GroupKeyPart {
53    pub field: String,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub value: Option<String>,
56}
57
58/// Live state of one `(correlation, group_key)` window.
59#[derive(Debug, Clone, Serialize)]
60pub struct GroupStateInfo {
61    pub correlation_index: usize,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub correlation_id: Option<String>,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub correlation_name: Option<String>,
66    pub correlation_title: String,
67    #[serde(rename = "type")]
68    pub correlation_type: CorrelationType,
69    /// The resolved group key, field by field.
70    pub group_key: Vec<GroupKeyPart>,
71    /// A flat `field=value` rendering of the group key for display and
72    /// substring filtering.
73    pub group_key_display: String,
74    /// The current aggregate (count / sum / avg / distinct / fired-rule count).
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub got: Option<f64>,
77    /// The threshold predicates rendered for display.
78    pub threshold: String,
79    /// Whether the threshold condition is currently satisfied.
80    pub met: bool,
81    /// Number of retained entries in the window.
82    pub entries: usize,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub earliest: Option<i64>,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub latest: Option<i64>,
87    pub timespan_secs: u64,
88    /// Seconds until the oldest entry leaves the window, relative to the
89    /// latest observed event time (`earliest + timespan - latest`, clamped at
90    /// zero).
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub seconds_to_eviction: Option<i64>,
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub last_alert: Option<i64>,
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub suppress_secs: Option<u64>,
97    /// Seconds remaining in the suppression window after the last alert,
98    /// relative to the latest observed event time.
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub suppression_remaining: Option<i64>,
101    /// The raw window state (timestamps / values), serialized as-is.
102    pub window: WindowState,
103}
104
105impl CorrelationEngine {
106    /// Snapshot every compiled correlation and its live per-group window state.
107    pub fn introspect(&self) -> CorrelationStateSnapshot {
108        self.introspect_filtered(None, None)
109    }
110
111    /// Like [`introspect`](CorrelationEngine::introspect) but keeps only the
112    /// correlations whose id, name, or title equals `id_filter` and the groups
113    /// whose rendered key contains `group_filter`.
114    pub fn introspect_filtered(
115        &self,
116        id_filter: Option<&str>,
117        group_filter: Option<&str>,
118    ) -> CorrelationStateSnapshot {
119        let mut active_per_corr = vec![0usize; self.correlations.len()];
120        for (ci, _) in self.state.keys() {
121            active_per_corr[*ci] += 1;
122        }
123
124        let correlations = self
125            .correlations
126            .iter()
127            .enumerate()
128            .filter(|(_, c)| matches_id(c, id_filter))
129            .map(|(i, c)| CorrelationInfo {
130                index: i,
131                id: c.id.clone(),
132                name: c.name.clone(),
133                title: c.title.clone(),
134                correlation_type: c.correlation_type,
135                timespan_secs: c.timespan_secs,
136                group_by: c.group_by.iter().map(|g| g.name().to_string()).collect(),
137                rule_refs: c.rule_refs.clone(),
138                threshold: render_threshold(c),
139                active_groups: active_per_corr[i],
140            })
141            .collect();
142
143        let mut groups = Vec::new();
144        for ((ci, gk), ws) in &self.state {
145            let corr = &self.correlations[*ci];
146            if !matches_id(corr, id_filter) {
147                continue;
148            }
149            let parts = group_key_parts(corr, gk);
150            let display = render_group_key(&parts);
151            if let Some(filter) = group_filter
152                && !display.contains(filter)
153            {
154                continue;
155            }
156
157            let got = ws.current_value(
158                corr.correlation_type,
159                &corr.rule_refs,
160                corr.condition.percentile,
161            );
162            let met = ws
163                .check_condition(
164                    &corr.condition,
165                    corr.correlation_type,
166                    &corr.rule_refs,
167                    corr.extended_expr.as_ref(),
168                )
169                .is_some();
170            let earliest = ws.earliest_timestamp();
171            let latest = ws.latest_timestamp();
172            let seconds_to_eviction = match (earliest, latest) {
173                (Some(e), Some(l)) => Some((e + corr.timespan_secs as i64 - l).max(0)),
174                _ => None,
175            };
176            let last_alert = self.last_alert.get(&(*ci, gk.clone())).copied();
177            let suppress_secs = corr.suppress_secs.or(self.config.suppress);
178            let suppression_remaining = match (last_alert, suppress_secs, latest) {
179                (Some(la), Some(s), Some(l)) => Some((la + s as i64 - l).max(0)),
180                _ => None,
181            };
182
183            groups.push(GroupStateInfo {
184                correlation_index: *ci,
185                correlation_id: corr.id.clone(),
186                correlation_name: corr.name.clone(),
187                correlation_title: corr.title.clone(),
188                correlation_type: corr.correlation_type,
189                group_key: parts,
190                group_key_display: display,
191                got,
192                threshold: render_threshold(corr),
193                met,
194                entries: ws.entry_count(),
195                earliest,
196                latest,
197                timespan_secs: corr.timespan_secs,
198                seconds_to_eviction,
199                last_alert,
200                suppress_secs,
201                suppression_remaining,
202                window: ws.clone(),
203            });
204        }
205
206        // Deterministic order for stable output and golden tests.
207        groups.sort_by(|a, b| {
208            a.correlation_index
209                .cmp(&b.correlation_index)
210                .then_with(|| a.group_key_display.cmp(&b.group_key_display))
211        });
212
213        CorrelationStateSnapshot {
214            correlations,
215            groups,
216        }
217    }
218}
219
220fn matches_id(corr: &CompiledCorrelation, id_filter: Option<&str>) -> bool {
221    match id_filter {
222        None => true,
223        Some(f) => {
224            corr.id.as_deref() == Some(f) || corr.name.as_deref() == Some(f) || corr.title == f
225        }
226    }
227}
228
229fn group_key_parts(corr: &CompiledCorrelation, key: &GroupKey) -> Vec<GroupKeyPart> {
230    corr.group_by
231        .iter()
232        .enumerate()
233        .map(|(i, field)| GroupKeyPart {
234            field: field.name().to_string(),
235            value: key.0.get(i).and_then(|v| v.clone()),
236        })
237        .collect()
238}
239
240fn render_group_key(parts: &[GroupKeyPart]) -> String {
241    if parts.is_empty() {
242        return "(no group-by)".to_string();
243    }
244    parts
245        .iter()
246        .map(|p| format!("{}={}", p.field, p.value.as_deref().unwrap_or("<none>")))
247        .collect::<Vec<_>>()
248        .join(", ")
249}
250
251fn render_threshold(corr: &CompiledCorrelation) -> String {
252    let preds: Vec<String> = corr
253        .condition
254        .predicates
255        .iter()
256        .map(|(op, v)| format!("{} {}", op_symbol(*op), v))
257        .collect();
258    if preds.is_empty() {
259        "(none)".to_string()
260    } else {
261        preds.join(", ")
262    }
263}
264
265fn op_symbol(op: ConditionOperator) -> &'static str {
266    match op {
267        ConditionOperator::Lt => "<",
268        ConditionOperator::Lte => "<=",
269        ConditionOperator::Gt => ">",
270        ConditionOperator::Gte => ">=",
271        ConditionOperator::Eq => "==",
272        ConditionOperator::Neq => "!=",
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use crate::event::JsonEvent;
279    use crate::{CorrelationConfig, CorrelationEngine};
280    use rsigma_parser::parse_sigma_yaml;
281    use serde_json::json;
282
283    const RULES: &str = r#"
284title: Login
285id: login-rule
286logsource:
287    category: auth
288detection:
289    selection:
290        EventType: login
291    condition: selection
292---
293title: Many Logins
294correlation:
295    type: event_count
296    rules:
297        - login-rule
298    group-by:
299        - User
300    timespan: 60s
301    condition:
302        gte: 3
303level: high
304"#;
305
306    fn engine() -> CorrelationEngine {
307        let coll = parse_sigma_yaml(RULES).unwrap();
308        let mut e = CorrelationEngine::new(CorrelationConfig::default());
309        e.add_collection(&coll).unwrap();
310        e
311    }
312
313    #[test]
314    fn introspect_reports_gap_below_threshold() {
315        let mut e = engine();
316        // Two logins for admin: below the gte:3 threshold.
317        for i in 0..2 {
318            let v = json!({"EventType": "login", "User": "admin"});
319            e.process_event_at(&JsonEvent::borrow(&v), 1000 + i);
320        }
321        let snap = e.introspect();
322        assert_eq!(snap.correlations.len(), 1);
323        assert_eq!(snap.correlations[0].threshold, ">= 3");
324        assert_eq!(snap.correlations[0].active_groups, 1);
325
326        let g = snap
327            .groups
328            .iter()
329            .find(|g| g.group_key_display.contains("admin"))
330            .expect("admin group present");
331        assert_eq!(g.got, Some(2.0));
332        assert!(!g.met);
333        assert_eq!(g.entries, 2);
334        assert_eq!(g.threshold, ">= 3");
335        assert_eq!(g.group_key_display, "User=admin");
336    }
337
338    #[test]
339    fn introspect_marks_met_when_threshold_reached() {
340        let mut e = engine();
341        for i in 0..3 {
342            let v = json!({"EventType": "login", "User": "admin"});
343            e.process_event_at(&JsonEvent::borrow(&v), 1000 + i);
344        }
345        let snap = e.introspect();
346        let g = &snap.groups[0];
347        assert_eq!(g.got, Some(3.0));
348        assert!(g.met);
349        // The window spans ts 1000..=1002 with a 60s timespan, so the oldest
350        // entry (1000) is evicted 60s after it arrived, relative to the latest.
351        assert_eq!(g.seconds_to_eviction, Some(1000 + 60 - 1002));
352    }
353
354    #[test]
355    fn introspect_filter_by_group_substring() {
356        let mut e = engine();
357        for u in ["admin", "alice"] {
358            let v = json!({"EventType": "login", "User": u});
359            e.process_event_at(&JsonEvent::borrow(&v), 1000);
360        }
361        let snap = e.introspect_filtered(None, Some("alice"));
362        assert_eq!(snap.groups.len(), 1);
363        assert!(snap.groups[0].group_key_display.contains("alice"));
364    }
365
366    #[test]
367    fn introspect_filter_by_unknown_id_is_empty() {
368        let mut e = engine();
369        let v = json!({"EventType": "login", "User": "admin"});
370        e.process_event_at(&JsonEvent::borrow(&v), 1000);
371        let snap = e.introspect_filtered(Some("nope"), None);
372        assert!(snap.correlations.is_empty());
373        assert!(snap.groups.is_empty());
374    }
375}