rsigma_eval/correlation_engine/
introspect.rs1use serde::Serialize;
13
14use rsigma_parser::{ConditionOperator, CorrelationType};
15
16use super::CorrelationEngine;
17use crate::correlation::{CompiledCorrelation, GroupKey, WindowState};
18
19#[derive(Debug, Clone, Serialize)]
22pub struct CorrelationStateSnapshot {
23 pub correlations: Vec<CorrelationInfo>,
26 pub groups: Vec<GroupStateInfo>,
28}
29
30#[derive(Debug, Clone, Serialize)]
32pub struct CorrelationInfo {
33 pub index: usize,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub id: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub name: Option<String>,
38 pub title: String,
39 #[serde(rename = "type")]
40 pub correlation_type: CorrelationType,
41 pub timespan_secs: u64,
42 pub group_by: Vec<String>,
43 pub rule_refs: Vec<String>,
44 pub threshold: String,
46 pub active_groups: usize,
48}
49
50#[derive(Debug, Clone, Serialize)]
52pub struct GroupKeyPart {
53 pub field: String,
54 #[serde(skip_serializing_if = "Option::is_none")]
55 pub value: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize)]
60pub struct GroupStateInfo {
61 pub correlation_index: usize,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub correlation_id: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub correlation_name: Option<String>,
66 pub correlation_title: String,
67 #[serde(rename = "type")]
68 pub correlation_type: CorrelationType,
69 pub group_key: Vec<GroupKeyPart>,
71 pub group_key_display: String,
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub got: Option<f64>,
77 pub threshold: String,
79 pub met: bool,
81 pub entries: usize,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub earliest: Option<i64>,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub latest: Option<i64>,
87 pub timespan_secs: u64,
88 #[serde(skip_serializing_if = "Option::is_none")]
92 pub seconds_to_eviction: Option<i64>,
93 #[serde(skip_serializing_if = "Option::is_none")]
94 pub last_alert: Option<i64>,
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub suppress_secs: Option<u64>,
97 #[serde(skip_serializing_if = "Option::is_none")]
100 pub suppression_remaining: Option<i64>,
101 pub window: WindowState,
103}
104
105impl CorrelationEngine {
106 pub fn introspect(&self) -> CorrelationStateSnapshot {
108 self.introspect_filtered(None, None)
109 }
110
111 pub fn introspect_filtered(
115 &self,
116 id_filter: Option<&str>,
117 group_filter: Option<&str>,
118 ) -> CorrelationStateSnapshot {
119 let mut active_per_corr = vec![0usize; self.correlations.len()];
120 for (ci, _) in self.state.keys() {
121 active_per_corr[*ci] += 1;
122 }
123
124 let correlations = self
125 .correlations
126 .iter()
127 .enumerate()
128 .filter(|(_, c)| matches_id(c, id_filter))
129 .map(|(i, c)| CorrelationInfo {
130 index: i,
131 id: c.id.clone(),
132 name: c.name.clone(),
133 title: c.title.clone(),
134 correlation_type: c.correlation_type,
135 timespan_secs: c.timespan_secs,
136 group_by: c.group_by.iter().map(|g| g.name().to_string()).collect(),
137 rule_refs: c.rule_refs.clone(),
138 threshold: render_threshold(c),
139 active_groups: active_per_corr[i],
140 })
141 .collect();
142
143 let mut groups = Vec::new();
144 for ((ci, gk), ws) in &self.state {
145 let corr = &self.correlations[*ci];
146 if !matches_id(corr, id_filter) {
147 continue;
148 }
149 let parts = group_key_parts(corr, gk);
150 let display = render_group_key(&parts);
151 if let Some(filter) = group_filter
152 && !display.contains(filter)
153 {
154 continue;
155 }
156
157 let got = ws.current_value(
158 corr.correlation_type,
159 &corr.rule_refs,
160 corr.condition.percentile,
161 );
162 let met = ws
163 .check_condition(
164 &corr.condition,
165 corr.correlation_type,
166 &corr.rule_refs,
167 corr.extended_expr.as_ref(),
168 )
169 .is_some();
170 let earliest = ws.earliest_timestamp();
171 let latest = ws.latest_timestamp();
172 let seconds_to_eviction = match (earliest, latest) {
173 (Some(e), Some(l)) => Some((e + corr.timespan_secs as i64 - l).max(0)),
174 _ => None,
175 };
176 let last_alert = self.last_alert.get(&(*ci, gk.clone())).copied();
177 let suppress_secs = corr.suppress_secs.or(self.config.suppress);
178 let suppression_remaining = match (last_alert, suppress_secs, latest) {
179 (Some(la), Some(s), Some(l)) => Some((la + s as i64 - l).max(0)),
180 _ => None,
181 };
182
183 groups.push(GroupStateInfo {
184 correlation_index: *ci,
185 correlation_id: corr.id.clone(),
186 correlation_name: corr.name.clone(),
187 correlation_title: corr.title.clone(),
188 correlation_type: corr.correlation_type,
189 group_key: parts,
190 group_key_display: display,
191 got,
192 threshold: render_threshold(corr),
193 met,
194 entries: ws.entry_count(),
195 earliest,
196 latest,
197 timespan_secs: corr.timespan_secs,
198 seconds_to_eviction,
199 last_alert,
200 suppress_secs,
201 suppression_remaining,
202 window: ws.clone(),
203 });
204 }
205
206 groups.sort_by(|a, b| {
208 a.correlation_index
209 .cmp(&b.correlation_index)
210 .then_with(|| a.group_key_display.cmp(&b.group_key_display))
211 });
212
213 CorrelationStateSnapshot {
214 correlations,
215 groups,
216 }
217 }
218}
219
220fn matches_id(corr: &CompiledCorrelation, id_filter: Option<&str>) -> bool {
221 match id_filter {
222 None => true,
223 Some(f) => {
224 corr.id.as_deref() == Some(f) || corr.name.as_deref() == Some(f) || corr.title == f
225 }
226 }
227}
228
229fn group_key_parts(corr: &CompiledCorrelation, key: &GroupKey) -> Vec<GroupKeyPart> {
230 corr.group_by
231 .iter()
232 .enumerate()
233 .map(|(i, field)| GroupKeyPart {
234 field: field.name().to_string(),
235 value: key.0.get(i).and_then(|v| v.clone()),
236 })
237 .collect()
238}
239
240fn render_group_key(parts: &[GroupKeyPart]) -> String {
241 if parts.is_empty() {
242 return "(no group-by)".to_string();
243 }
244 parts
245 .iter()
246 .map(|p| format!("{}={}", p.field, p.value.as_deref().unwrap_or("<none>")))
247 .collect::<Vec<_>>()
248 .join(", ")
249}
250
251fn render_threshold(corr: &CompiledCorrelation) -> String {
252 let preds: Vec<String> = corr
253 .condition
254 .predicates
255 .iter()
256 .map(|(op, v)| format!("{} {}", op_symbol(*op), v))
257 .collect();
258 if preds.is_empty() {
259 "(none)".to_string()
260 } else {
261 preds.join(", ")
262 }
263}
264
265fn op_symbol(op: ConditionOperator) -> &'static str {
266 match op {
267 ConditionOperator::Lt => "<",
268 ConditionOperator::Lte => "<=",
269 ConditionOperator::Gt => ">",
270 ConditionOperator::Gte => ">=",
271 ConditionOperator::Eq => "==",
272 ConditionOperator::Neq => "!=",
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use crate::event::JsonEvent;
279 use crate::{CorrelationConfig, CorrelationEngine};
280 use rsigma_parser::parse_sigma_yaml;
281 use serde_json::json;
282
283 const RULES: &str = r#"
284title: Login
285id: login-rule
286logsource:
287 category: auth
288detection:
289 selection:
290 EventType: login
291 condition: selection
292---
293title: Many Logins
294correlation:
295 type: event_count
296 rules:
297 - login-rule
298 group-by:
299 - User
300 timespan: 60s
301 condition:
302 gte: 3
303level: high
304"#;
305
306 fn engine() -> CorrelationEngine {
307 let coll = parse_sigma_yaml(RULES).unwrap();
308 let mut e = CorrelationEngine::new(CorrelationConfig::default());
309 e.add_collection(&coll).unwrap();
310 e
311 }
312
313 #[test]
314 fn introspect_reports_gap_below_threshold() {
315 let mut e = engine();
316 for i in 0..2 {
318 let v = json!({"EventType": "login", "User": "admin"});
319 e.process_event_at(&JsonEvent::borrow(&v), 1000 + i);
320 }
321 let snap = e.introspect();
322 assert_eq!(snap.correlations.len(), 1);
323 assert_eq!(snap.correlations[0].threshold, ">= 3");
324 assert_eq!(snap.correlations[0].active_groups, 1);
325
326 let g = snap
327 .groups
328 .iter()
329 .find(|g| g.group_key_display.contains("admin"))
330 .expect("admin group present");
331 assert_eq!(g.got, Some(2.0));
332 assert!(!g.met);
333 assert_eq!(g.entries, 2);
334 assert_eq!(g.threshold, ">= 3");
335 assert_eq!(g.group_key_display, "User=admin");
336 }
337
338 #[test]
339 fn introspect_marks_met_when_threshold_reached() {
340 let mut e = engine();
341 for i in 0..3 {
342 let v = json!({"EventType": "login", "User": "admin"});
343 e.process_event_at(&JsonEvent::borrow(&v), 1000 + i);
344 }
345 let snap = e.introspect();
346 let g = &snap.groups[0];
347 assert_eq!(g.got, Some(3.0));
348 assert!(g.met);
349 assert_eq!(g.seconds_to_eviction, Some(1000 + 60 - 1002));
352 }
353
354 #[test]
355 fn introspect_filter_by_group_substring() {
356 let mut e = engine();
357 for u in ["admin", "alice"] {
358 let v = json!({"EventType": "login", "User": u});
359 e.process_event_at(&JsonEvent::borrow(&v), 1000);
360 }
361 let snap = e.introspect_filtered(None, Some("alice"));
362 assert_eq!(snap.groups.len(), 1);
363 assert!(snap.groups[0].group_key_display.contains("alice"));
364 }
365
366 #[test]
367 fn introspect_filter_by_unknown_id_is_empty() {
368 let mut e = engine();
369 let v = json!({"EventType": "login", "User": "admin"});
370 e.process_event_at(&JsonEvent::borrow(&v), 1000);
371 let snap = e.introspect_filtered(Some("nope"), None);
372 assert!(snap.correlations.is_empty());
373 assert!(snap.groups.is_empty());
374 }
375}