rsigma_runtime/alert_pipeline/
dedup.rs1use std::collections::HashMap;
16use std::time::Duration;
17
18use rsigma_eval::EvaluationResult;
19use serde::{Deserialize, Serialize};
20use serde_json::Value;
21
22use super::strip_event_payloads;
23use crate::selector::Selector;
24
25#[derive(Debug, Clone)]
27pub struct DedupConfig {
28 pub fingerprint: Vec<Selector>,
30 pub repeat_interval: Duration,
33 pub resolve_timeout: Duration,
35 pub max_active_alerts: usize,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
48pub(crate) struct ActiveAlert {
49 first_seen: i64,
50 last_seen: i64,
51 last_emitted: i64,
52 fire_count: u64,
53 emitted_count: u64,
56 sample: Value,
59 fields: Vec<(String, Value)>,
61}
62
63#[derive(Debug, Default)]
65pub struct DedupStore {
66 alerts: HashMap<String, ActiveAlert>,
67}
68
69pub(crate) struct DedupRecord {
72 pub state: &'static str,
74 pub json: Value,
76}
77
78impl DedupStore {
79 pub fn len(&self) -> usize {
81 self.alerts.len()
82 }
83
84 pub fn is_empty(&self) -> bool {
86 self.alerts.is_empty()
87 }
88
89 pub(crate) fn contains(&self, fingerprint: &str) -> bool {
91 self.alerts.contains_key(fingerprint)
92 }
93
94 pub(crate) fn fold(&mut self, fingerprint: &str, now: i64) {
96 if let Some(alert) = self.alerts.get_mut(fingerprint) {
97 alert.fire_count += 1;
98 alert.last_seen = now;
99 }
100 }
101
102 pub(crate) fn insert(
104 &mut self,
105 fingerprint: String,
106 now: i64,
107 sample: Value,
108 fields: Vec<(String, Value)>,
109 ) {
110 self.alerts.insert(
111 fingerprint,
112 ActiveAlert {
113 first_seen: now,
114 last_seen: now,
115 last_emitted: now,
116 fire_count: 1,
117 emitted_count: 1,
118 sample,
119 fields,
120 },
121 );
122 }
123
124 pub(crate) fn tick(&mut self, cfg: &DedupConfig, now: i64) -> Vec<DedupRecord> {
127 let resolve_secs = cfg.resolve_timeout.as_secs() as i64;
128 let repeat_secs = cfg.repeat_interval.as_secs() as i64;
129 let mut out = Vec::new();
130 let mut resolved = Vec::new();
131
132 for (fingerprint, alert) in self.alerts.iter_mut() {
133 if now - alert.last_seen >= resolve_secs {
134 out.push(DedupRecord {
135 state: "resolved",
136 json: build_record(alert, fingerprint, "resolved"),
137 });
138 resolved.push(fingerprint.clone());
139 } else if repeat_secs > 0
140 && now - alert.last_emitted >= repeat_secs
141 && alert.fire_count > alert.emitted_count
142 {
143 out.push(DedupRecord {
144 state: "repeat",
145 json: build_record(alert, fingerprint, "repeat"),
146 });
147 alert.last_emitted = now;
148 alert.emitted_count = alert.fire_count;
149 }
150 }
151
152 for key in resolved {
153 self.alerts.remove(&key);
154 }
155 out
156 }
157
158 pub(crate) fn snapshot(&self) -> Vec<(String, ActiveAlert)> {
160 self.alerts
161 .iter()
162 .map(|(k, v)| (k.clone(), v.clone()))
163 .collect()
164 }
165
166 pub(crate) fn restore(
169 &mut self,
170 alerts: Vec<(String, ActiveAlert)>,
171 now: i64,
172 resolve_secs: i64,
173 ) {
174 for (fingerprint, alert) in alerts {
175 if now - alert.last_seen < resolve_secs {
176 self.alerts.insert(fingerprint, alert);
177 }
178 }
179 }
180}
181
182pub(crate) fn fingerprint(selectors: &[Selector], result: &EvaluationResult) -> String {
189 let rule = result
190 .header
191 .rule_id
192 .as_deref()
193 .unwrap_or(result.header.rule_title.as_str());
194
195 let mut buf = String::with_capacity(64);
196 buf.push_str("rule=");
197 buf.push_str(rule);
198 for sel in selectors {
199 buf.push('\u{1f}');
200 buf.push_str(&sel.as_str());
201 buf.push('=');
202 match sel.resolve(result) {
203 Some(value) => buf.push_str(&canonical(&value)),
204 None => buf.push_str("\u{0}null"),
205 }
206 }
207 format!("{:016x}", fnv1a64(buf.as_bytes()))
208}
209
210pub(crate) fn resolve_fields(
213 selectors: &[Selector],
214 result: &EvaluationResult,
215) -> Vec<(String, Value)> {
216 selectors
217 .iter()
218 .map(|sel| (sel.as_str(), sel.resolve(result).unwrap_or(Value::Null)))
219 .collect()
220}
221
222fn canonical(value: &Value) -> String {
225 match value {
226 Value::String(s) => s.clone(),
227 other => other.to_string(),
228 }
229}
230
231fn build_record(alert: &ActiveAlert, fingerprint: &str, state: &'static str) -> Value {
234 let mut result = alert.sample.clone();
235 if !result.is_object() {
236 result = Value::Object(serde_json::Map::new());
237 }
238 let obj = result.as_object_mut().expect("result is an object");
239 let enrichments = obj
240 .entry("enrichments")
241 .or_insert_with(|| Value::Object(serde_json::Map::new()));
242 if !enrichments.is_object() {
243 *enrichments = Value::Object(serde_json::Map::new());
244 }
245 let map = enrichments
246 .as_object_mut()
247 .expect("enrichments is an object");
248 map.insert("dedup_state".to_string(), Value::String(state.to_string()));
249 map.insert(
250 "dedup_fingerprint".to_string(),
251 Value::String(fingerprint.to_string()),
252 );
253 map.insert(
254 "dedup_fire_count".to_string(),
255 Value::from(alert.fire_count),
256 );
257 map.insert(
258 "dedup_first_seen".to_string(),
259 Value::from(alert.first_seen),
260 );
261 map.insert("dedup_last_seen".to_string(), Value::from(alert.last_seen));
262 let fields: serde_json::Map<String, Value> = alert.fields.iter().cloned().collect();
263 map.insert("dedup_fields".to_string(), Value::Object(fields));
264 result
265}
266
267pub(crate) fn sample_of(result: &EvaluationResult) -> Value {
270 let mut sample = result.clone();
271 strip_event_payloads(&mut sample);
272 serde_json::to_value(&sample).unwrap_or(Value::Null)
273}
274
275pub(crate) fn fnv1a64(bytes: &[u8]) -> u64 {
278 let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
279 for &byte in bytes {
280 hash ^= u64::from(byte);
281 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
282 }
283 hash
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use rsigma_eval::{DetectionBody, EvaluationResult, FieldMatch, ResultBody, RuleHeader};
290 use rsigma_parser::Level;
291 use std::collections::HashMap;
292 use std::sync::Arc;
293
294 fn result(ip: &str) -> EvaluationResult {
295 EvaluationResult {
296 header: RuleHeader {
297 rule_title: "Brute force".to_string(),
298 rule_id: Some("rule-1".to_string()),
299 level: Some(Level::High),
300 tags: vec![],
301 custom_attributes: Arc::new(HashMap::new()),
302 enrichments: None,
303 },
304 body: ResultBody::Detection(DetectionBody {
305 matched_selections: vec![],
306 matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
307 event: Some(serde_json::json!({"big": "payload"})),
308 }),
309 }
310 }
311
312 fn cfg(repeat: u64, resolve: u64) -> DedupConfig {
313 DedupConfig {
314 fingerprint: vec![Selector::parse("match.SourceIp").unwrap()],
315 repeat_interval: Duration::from_secs(repeat),
316 resolve_timeout: Duration::from_secs(resolve),
317 max_active_alerts: 100_000,
318 }
319 }
320
321 #[test]
322 fn fingerprint_is_stable_and_value_sensitive() {
323 let c = cfg(0, 60);
324 let a = fingerprint(&c.fingerprint, &result("10.0.0.1"));
325 let b = fingerprint(&c.fingerprint, &result("10.0.0.1"));
326 let d = fingerprint(&c.fingerprint, &result("10.0.0.2"));
327 assert_eq!(a, b, "same inputs must hash identically");
328 assert_ne!(a, d, "different selector values must differ");
329 }
330
331 #[test]
332 fn first_fire_opens_alert_then_folds() {
333 let c = cfg(0, 60);
334 let mut store = DedupStore::default();
335 let r = result("10.0.0.1");
336 let fp = fingerprint(&c.fingerprint, &r);
337
338 assert!(!store.contains(&fp));
339 store.insert(
340 fp.clone(),
341 100,
342 sample_of(&r),
343 resolve_fields(&c.fingerprint, &r),
344 );
345 assert_eq!(store.len(), 1);
346
347 store.fold(&fp, 105);
350 store.fold(&fp, 110);
351 let records = {
353 store.tick(&c, 200)
355 };
356 assert_eq!(records.len(), 1);
357 assert_eq!(records[0].state, "resolved");
358 let enr = &records[0].json["enrichments"];
359 assert_eq!(enr["dedup_state"], serde_json::json!("resolved"));
360 assert_eq!(enr["dedup_fire_count"], serde_json::json!(3));
361 assert!(records[0].json.get("event").is_none());
362 assert!(store.is_empty(), "resolved alert is evicted");
363 }
364
365 #[test]
366 fn repeat_emits_only_when_new_fires_accumulate() {
367 let c = cfg(10, 600);
368 let mut store = DedupStore::default();
369 let r = result("10.0.0.1");
370 let fp = fingerprint(&c.fingerprint, &r);
371 store.insert(
372 fp.clone(),
373 0,
374 sample_of(&r),
375 resolve_fields(&c.fingerprint, &r),
376 );
377
378 assert!(store.tick(&c, 20).is_empty());
380
381 store.fold(&fp, 25);
383 let records = store.tick(&c, 40);
384 assert_eq!(records.len(), 1);
385 assert_eq!(records[0].state, "repeat");
386 assert_eq!(
387 records[0].json["enrichments"]["dedup_fire_count"],
388 serde_json::json!(2)
389 );
390
391 assert!(store.tick(&c, 55).is_empty());
393 assert_eq!(store.len(), 1, "still active, not resolved");
394 }
395
396 #[test]
397 fn repeat_interval_zero_is_pure_suppression() {
398 let c = cfg(0, 100);
399 let mut store = DedupStore::default();
400 let r = result("10.0.0.1");
401 let fp = fingerprint(&c.fingerprint, &r);
402 store.insert(
403 fp.clone(),
404 0,
405 sample_of(&r),
406 resolve_fields(&c.fingerprint, &r),
407 );
408 store.fold(&fp, 10);
409 assert!(store.tick(&c, 50).is_empty());
411 let records = store.tick(&c, 200);
413 assert_eq!(records.len(), 1);
414 assert_eq!(records[0].state, "resolved");
415 }
416}