1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
10use std::time::Duration;
11
12use super::record::{Disposition, Verdict};
13use super::snapshot::{DispositionSnapshot, RuleBucketsSnapshot};
14
15pub const DEFAULT_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60);
17pub const DEFAULT_BUCKET: Duration = Duration::from_secs(24 * 60 * 60);
19pub const DEFAULT_MIN_SAMPLE: u64 = 5;
21pub const DEFAULT_MAX_SEEN_IDS: usize = 100_000;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum Numerator {
28 #[default]
30 FpOnly,
31 FpAndBtp,
33}
34
35impl Numerator {
36 pub fn parse(s: &str) -> Result<Self, String> {
38 match s {
39 "fp_only" => Ok(Self::FpOnly),
40 "fp_and_btp" => Ok(Self::FpAndBtp),
41 other => Err(format!(
42 "unknown numerator '{other}' (expected 'fp_only' or 'fp_and_btp')"
43 )),
44 }
45 }
46
47 pub fn as_str(self) -> &'static str {
49 match self {
50 Self::FpOnly => "fp_only",
51 Self::FpAndBtp => "fp_and_btp",
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
58pub struct DispositionConfig {
59 pub window: Duration,
61 pub bucket: Duration,
63 pub numerator: Numerator,
65 pub min_sample: u64,
67 pub max_seen_ids: usize,
69}
70
71impl Default for DispositionConfig {
72 fn default() -> Self {
73 Self {
74 window: DEFAULT_WINDOW,
75 bucket: DEFAULT_BUCKET,
76 numerator: Numerator::FpOnly,
77 min_sample: DEFAULT_MIN_SAMPLE,
78 max_seen_ids: DEFAULT_MAX_SEEN_IDS,
79 }
80 }
81}
82
83impl DispositionConfig {
84 fn bucket_secs(&self) -> i64 {
85 (self.bucket.as_secs() as i64).max(1)
86 }
87
88 fn window_secs(&self) -> i64 {
89 self.window.as_secs() as i64
90 }
91}
92
93#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95pub struct VerdictCounts {
96 pub true_positive: u64,
97 pub false_positive: u64,
98 pub benign_true_positive: u64,
99}
100
101impl VerdictCounts {
102 fn add(&mut self, verdict: Verdict) {
103 match verdict {
104 Verdict::TruePositive => self.true_positive += 1,
105 Verdict::FalsePositive => self.false_positive += 1,
106 Verdict::BenignTruePositive => self.benign_true_positive += 1,
107 }
108 }
109
110 fn total(&self) -> u64 {
111 self.true_positive + self.false_positive + self.benign_true_positive
112 }
113
114 fn merge(&mut self, other: &VerdictCounts) {
115 self.true_positive += other.true_positive;
116 self.false_positive += other.false_positive;
117 self.benign_true_positive += other.benign_true_positive;
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum IngestOutcome {
124 Accepted,
126 Duplicate,
128 Rejected(String),
130}
131
132#[derive(Debug, Clone, PartialEq)]
134pub struct RuleSummary {
135 pub rule_id: String,
136 pub true_positives: u64,
137 pub false_positives: u64,
138 pub benign_true_positives: u64,
139 pub total: u64,
140 pub fp_ratio: Option<f64>,
142}
143
144#[derive(Debug)]
149pub struct DispositionStore {
150 config: DispositionConfig,
151 rules: HashMap<String, BTreeMap<i64, VerdictCounts>>,
153 seen: HashSet<String>,
155 seen_order: VecDeque<(i64, String)>,
156}
157
158impl DispositionStore {
159 pub fn new(config: DispositionConfig) -> Self {
161 Self {
162 config,
163 rules: HashMap::new(),
164 seen: HashSet::new(),
165 seen_order: VecDeque::new(),
166 }
167 }
168
169 pub fn config(&self) -> &DispositionConfig {
171 &self.config
172 }
173
174 fn bucket_index(&self, ts: i64) -> i64 {
175 ts.div_euclid(self.config.bucket_secs())
176 }
177
178 pub fn apply(&mut self, disposition: &Disposition, now: i64) -> IngestOutcome {
185 let Some(rule_id) = disposition.rule_id.clone() else {
186 return IngestOutcome::Rejected(
187 "incident-scoped disposition could not be resolved to a rule_id".to_string(),
188 );
189 };
190
191 let key = disposition.dedup_key();
192 if self.seen.contains(&key) {
193 return IngestOutcome::Duplicate;
194 }
195 self.remember(key, disposition.timestamp, now);
201
202 let idx = self.bucket_index(disposition.timestamp);
203 let cutoff = self.bucket_index(now - self.config.window_secs());
204 let buckets = self.rules.entry(rule_id).or_default();
205 buckets.entry(idx).or_default().add(disposition.verdict);
206 buckets.retain(|&i, _| i >= cutoff);
207
208 IngestOutcome::Accepted
209 }
210
211 fn remember(&mut self, key: String, ts: i64, now: i64) {
212 self.seen.insert(key.clone());
213 self.seen_order.push_back((ts, key));
214 self.prune_seen(now);
215 }
216
217 fn prune_seen(&mut self, now: i64) {
218 let cutoff = now - self.config.window_secs();
219 while let Some((ts, _)) = self.seen_order.front() {
220 let over_capacity = self.seen_order.len() > self.config.max_seen_ids;
221 if *ts < cutoff || over_capacity {
222 if let Some((_, key)) = self.seen_order.pop_front() {
223 self.seen.remove(&key);
224 }
225 } else {
226 break;
227 }
228 }
229 }
230
231 pub fn prune(&mut self, now: i64) {
234 let cutoff = self.bucket_index(now - self.config.window_secs());
235 self.rules.retain(|_, buckets| {
236 buckets.retain(|&i, _| i >= cutoff);
237 !buckets.is_empty()
238 });
239 self.prune_seen(now);
240 }
241
242 fn aggregate(&self, rule_id: &str) -> VerdictCounts {
243 let mut total = VerdictCounts::default();
244 if let Some(buckets) = self.rules.get(rule_id) {
245 for counts in buckets.values() {
246 total.merge(counts);
247 }
248 }
249 total
250 }
251
252 fn ratio_of(&self, counts: &VerdictCounts) -> Option<f64> {
253 let total = counts.total();
254 if total < self.config.min_sample || total == 0 {
255 return None;
256 }
257 let numerator = match self.config.numerator {
258 Numerator::FpOnly => counts.false_positive,
259 Numerator::FpAndBtp => counts.false_positive + counts.benign_true_positive,
260 };
261 Some(numerator as f64 / total as f64)
262 }
263
264 pub fn ratio(&self, rule_id: &str) -> Option<f64> {
267 self.ratio_of(&self.aggregate(rule_id))
268 }
269
270 pub fn summaries(&self) -> Vec<RuleSummary> {
273 let mut out: Vec<RuleSummary> = self
274 .rules
275 .keys()
276 .map(|rule_id| {
277 let counts = self.aggregate(rule_id);
278 RuleSummary {
279 rule_id: rule_id.clone(),
280 true_positives: counts.true_positive,
281 false_positives: counts.false_positive,
282 benign_true_positives: counts.benign_true_positive,
283 total: counts.total(),
284 fp_ratio: self.ratio_of(&counts),
285 }
286 })
287 .collect();
288 out.sort_by(|a, b| a.rule_id.cmp(&b.rule_id));
289 out
290 }
291
292 pub fn rule_count(&self) -> usize {
294 self.rules.len()
295 }
296
297 pub fn snapshot(&self) -> DispositionSnapshot {
299 DispositionSnapshot {
300 version: super::snapshot::SNAPSHOT_VERSION,
301 numerator: self.config.numerator.as_str().to_string(),
302 rules: self
303 .rules
304 .iter()
305 .map(|(rule_id, buckets)| RuleBucketsSnapshot {
306 rule_id: rule_id.clone(),
307 buckets: buckets
308 .iter()
309 .map(|(&idx, c)| {
310 (
311 idx,
312 c.true_positive,
313 c.false_positive,
314 c.benign_true_positive,
315 )
316 })
317 .collect(),
318 })
319 .collect(),
320 seen: self.seen_order.iter().cloned().collect(),
321 }
322 }
323
324 pub fn restore(&mut self, snapshot: DispositionSnapshot, now: i64) -> bool {
327 if snapshot.version != super::snapshot::SNAPSHOT_VERSION {
328 return false;
329 }
330 let cutoff = self.bucket_index(now - self.config.window_secs());
331 for rule in snapshot.rules {
332 let mut buckets = BTreeMap::new();
333 for (idx, tp, fp, btp) in rule.buckets {
334 if idx < cutoff {
335 continue;
336 }
337 buckets.insert(
338 idx,
339 VerdictCounts {
340 true_positive: tp,
341 false_positive: fp,
342 benign_true_positive: btp,
343 },
344 );
345 }
346 if !buckets.is_empty() {
347 self.rules.insert(rule.rule_id, buckets);
348 }
349 }
350 let seen_cutoff = now - self.config.window_secs();
351 for (ts, key) in snapshot.seen {
352 if ts < seen_cutoff {
353 continue;
354 }
355 if self.seen.insert(key.clone()) {
356 self.seen_order.push_back((ts, key));
357 }
358 }
359 self.prune_seen(now);
360 true
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367 use crate::dispositions::record::RawDisposition;
368
369 fn disp(rule: &str, verdict: &str, ts: i64) -> Disposition {
370 let raw: RawDisposition = serde_json::from_str(&format!(
371 r#"{{"rule_id": "{rule}", "verdict": "{verdict}"}}"#
372 ))
373 .unwrap();
374 let mut d = Disposition::from_raw(raw, ts).unwrap();
375 d.timestamp = ts;
376 d
377 }
378
379 fn cfg(min_sample: u64) -> DispositionConfig {
380 DispositionConfig {
381 min_sample,
382 ..Default::default()
383 }
384 }
385
386 #[test]
387 fn ratio_is_suppressed_below_min_sample() {
388 let mut store = DispositionStore::new(cfg(5));
389 store.apply(&disp("r", "false_positive", 1000), 1000);
390 assert_eq!(store.ratio("r"), None);
391
392 for i in 1..5 {
395 store.apply(&disp("r", "true_positive", 1000 + i), 1000 + i);
396 }
397 assert_eq!(store.ratio("r"), Some(1.0 / 5.0));
399 }
400
401 #[test]
402 fn numerator_fp_and_btp_counts_benign() {
403 let mut store = DispositionStore::new(DispositionConfig {
404 min_sample: 1,
405 numerator: Numerator::FpAndBtp,
406 ..Default::default()
407 });
408 store.apply(&disp("r", "false_positive", 10), 10);
409 store.apply(&disp("r", "benign_true_positive", 11), 11);
410 store.apply(&disp("r", "true_positive", 12), 12);
411 assert_eq!(store.ratio("r"), Some(2.0 / 3.0));
413 }
414
415 #[test]
416 fn idempotency_collapses_redelivery() {
417 let mut store = DispositionStore::new(cfg(1));
418 let raw: RawDisposition = serde_json::from_str(
419 r#"{"rule_id": "r", "verdict": "false_positive", "fingerprint": "fp1"}"#,
420 )
421 .unwrap();
422 let d = Disposition::from_raw(raw, 100).unwrap();
423 assert_eq!(store.apply(&d, 100), IngestOutcome::Accepted);
424 assert_eq!(store.apply(&d, 101), IngestOutcome::Duplicate);
425 assert_eq!(store.summaries()[0].total, 1);
427 }
428
429 fn incident_disp(rule: &str, incident: &str, ts: i64) -> Disposition {
430 let raw: RawDisposition = serde_json::from_str(&format!(
431 r#"{{"rule_id":"{rule}","scope":"incident","incident_id":"{incident}","verdict":"false_positive"}}"#
432 ))
433 .unwrap();
434 let mut d = Disposition::from_raw(raw, ts).unwrap();
435 d.timestamp = ts;
436 d
437 }
438
439 #[test]
440 fn incident_expansion_counts_every_contributing_rule() {
441 let mut store = DispositionStore::new(cfg(1));
445 for rule in ["r1", "r2", "r3"] {
446 assert_eq!(
447 store.apply(&incident_disp(rule, "inc1", 100), 100),
448 IngestOutcome::Accepted
449 );
450 }
451 assert_eq!(store.summaries().len(), 3);
452
453 for rule in ["r1", "r2", "r3"] {
455 assert_eq!(
456 store.apply(&incident_disp(rule, "inc1", 100), 100),
457 IngestOutcome::Duplicate
458 );
459 }
460 }
461
462 #[test]
463 fn unresolved_incident_is_rejected() {
464 let raw: RawDisposition = serde_json::from_str(
465 r#"{"verdict": "true_positive", "scope": "incident", "incident_id": "i1"}"#,
466 )
467 .unwrap();
468 let d = Disposition::from_raw(raw, 1).unwrap();
469 let mut store = DispositionStore::new(cfg(1));
470 assert!(matches!(store.apply(&d, 1), IngestOutcome::Rejected(_)));
471 }
472
473 #[test]
474 fn window_pruning_drops_old_buckets() {
475 let mut store = DispositionStore::new(cfg(1));
476 let day = 24 * 60 * 60;
477 store.apply(&disp("r", "false_positive", 1000), 1000);
479 let now = 1000 + 40 * day;
480 store.apply(&disp("r", "true_positive", now), now);
481 let summary = &store.summaries()[0];
483 assert_eq!(summary.false_positives, 0);
484 assert_eq!(summary.true_positives, 1);
485 }
486
487 #[test]
488 fn snapshot_round_trips() {
489 let mut store = DispositionStore::new(cfg(1));
490 store.apply(&disp("r", "false_positive", 1000), 1000);
491 store.apply(&disp("r", "true_positive", 1001), 1001);
492 store.apply(&disp("s", "true_positive", 1002), 1002);
493 let snap = store.snapshot();
494
495 let mut restored = DispositionStore::new(cfg(1));
496 assert!(restored.restore(snap, 1002));
497 assert_eq!(restored.summaries(), store.summaries());
498 }
499
500 #[test]
501 fn restore_rejects_version_mismatch() {
502 let mut snap = DispositionStore::new(cfg(1)).snapshot();
503 snap.version = 9999;
504 let mut store = DispositionStore::new(cfg(1));
505 assert!(!store.restore(snap, 1));
506 }
507}