Skip to main content

rsigma_runtime/dispositions/
store.rs

1//! The rolling per-rule disposition store and the false-positive ratio.
2//!
3//! Verdict counts are kept in fixed-width time buckets (daily by default)
4//! across a rolling window, so memory is bounded at rules times buckets rather
5//! than growing with disposition volume. The per-rule false-positive ratio is
6//! recomputed from the retained buckets, suppressed until a rule reaches a
7//! minimum sample so a single false positive cannot publish a misleading 100%.
8
9use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
10use std::time::Duration;
11
12use super::record::{Disposition, Verdict};
13use super::snapshot::{DispositionSnapshot, RuleBucketsSnapshot};
14
15/// Default rolling window over which dispositions are counted.
16pub const DEFAULT_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60);
17/// Default bucket width (one day).
18pub const DEFAULT_BUCKET: Duration = Duration::from_secs(24 * 60 * 60);
19/// Default minimum dispositions before a rule's ratio is published.
20pub const DEFAULT_MIN_SAMPLE: u64 = 5;
21/// Default ceiling on the idempotency seen-id set.
22pub const DEFAULT_MAX_SEEN_IDS: usize = 100_000;
23
24/// Whether the false-positive ratio numerator counts false positives only or
25/// also benign true positives.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum Numerator {
28    /// `false_positive` only (the default).
29    #[default]
30    FpOnly,
31    /// `false_positive` + `benign_true_positive`.
32    FpAndBtp,
33}
34
35impl Numerator {
36    /// Parse the config string form.
37    pub fn parse(s: &str) -> Result<Self, String> {
38        match s {
39            "fp_only" => Ok(Self::FpOnly),
40            "fp_and_btp" => Ok(Self::FpAndBtp),
41            other => Err(format!(
42                "unknown numerator '{other}' (expected 'fp_only' or 'fp_and_btp')"
43            )),
44        }
45    }
46
47    /// The config string form.
48    pub fn as_str(self) -> &'static str {
49        match self {
50            Self::FpOnly => "fp_only",
51            Self::FpAndBtp => "fp_and_btp",
52        }
53    }
54}
55
56/// Configuration for the rolling disposition store.
57#[derive(Debug, Clone)]
58pub struct DispositionConfig {
59    /// The rolling window over which dispositions are counted.
60    pub window: Duration,
61    /// The bucket width (counts are aggregated per bucket).
62    pub bucket: Duration,
63    /// Whether benign true positives count toward the ratio numerator.
64    pub numerator: Numerator,
65    /// Minimum dispositions in the window before the ratio is published.
66    pub min_sample: u64,
67    /// Ceiling on the idempotency seen-id set.
68    pub max_seen_ids: usize,
69}
70
71impl Default for DispositionConfig {
72    fn default() -> Self {
73        Self {
74            window: DEFAULT_WINDOW,
75            bucket: DEFAULT_BUCKET,
76            numerator: Numerator::FpOnly,
77            min_sample: DEFAULT_MIN_SAMPLE,
78            max_seen_ids: DEFAULT_MAX_SEEN_IDS,
79        }
80    }
81}
82
83impl DispositionConfig {
84    fn bucket_secs(&self) -> i64 {
85        (self.bucket.as_secs() as i64).max(1)
86    }
87
88    fn window_secs(&self) -> i64 {
89        self.window.as_secs() as i64
90    }
91}
92
93/// Verdict counts within a single bucket.
94#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
95pub struct VerdictCounts {
96    pub true_positive: u64,
97    pub false_positive: u64,
98    pub benign_true_positive: u64,
99}
100
101impl VerdictCounts {
102    fn add(&mut self, verdict: Verdict) {
103        match verdict {
104            Verdict::TruePositive => self.true_positive += 1,
105            Verdict::FalsePositive => self.false_positive += 1,
106            Verdict::BenignTruePositive => self.benign_true_positive += 1,
107        }
108    }
109
110    fn total(&self) -> u64 {
111        self.true_positive + self.false_positive + self.benign_true_positive
112    }
113
114    fn merge(&mut self, other: &VerdictCounts) {
115        self.true_positive += other.true_positive;
116        self.false_positive += other.false_positive;
117        self.benign_true_positive += other.benign_true_positive;
118    }
119}
120
121/// The outcome of applying one disposition.
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum IngestOutcome {
124    /// The disposition was counted.
125    Accepted,
126    /// A record with the same idempotency key was already counted.
127    Duplicate,
128    /// The disposition could not be counted; carries a reason.
129    Rejected(String),
130}
131
132/// A per-rule rolling summary for the `GET` view and the scorecard feed.
133#[derive(Debug, Clone, PartialEq)]
134pub struct RuleSummary {
135    pub rule_id: String,
136    pub true_positives: u64,
137    pub false_positives: u64,
138    pub benign_true_positives: u64,
139    pub total: u64,
140    /// `None` when the rule has fewer than `min_sample` dispositions.
141    pub fp_ratio: Option<f64>,
142}
143
144/// The rolling per-rule disposition store.
145///
146/// Updated only by the ingestion paths (it never sits in the eval or sink
147/// path), so it is strictly additive and cannot affect detection throughput.
148#[derive(Debug)]
149pub struct DispositionStore {
150    config: DispositionConfig,
151    /// Per-rule bucketed counts, keyed by `rule_id` then bucket index.
152    rules: HashMap<String, BTreeMap<i64, VerdictCounts>>,
153    /// Idempotency set: keys seen, with their timestamp for window pruning.
154    seen: HashSet<String>,
155    seen_order: VecDeque<(i64, String)>,
156}
157
158impl DispositionStore {
159    /// Create an empty store with the given config.
160    pub fn new(config: DispositionConfig) -> Self {
161        Self {
162            config,
163            rules: HashMap::new(),
164            seen: HashSet::new(),
165            seen_order: VecDeque::new(),
166        }
167    }
168
169    /// The store's configuration.
170    pub fn config(&self) -> &DispositionConfig {
171        &self.config
172    }
173
174    fn bucket_index(&self, ts: i64) -> i64 {
175        ts.div_euclid(self.config.bucket_secs())
176    }
177
178    /// Apply one disposition at `now` (epoch seconds), enforcing idempotency.
179    ///
180    /// Returns [`IngestOutcome::Rejected`] for an unresolved incident-scoped
181    /// record (no `rule_id`), [`IngestOutcome::Duplicate`] for a redelivery, and
182    /// [`IngestOutcome::Accepted`] otherwise. The affected rule's bucket map is
183    /// pruned to the window on each apply.
184    pub fn apply(&mut self, disposition: &Disposition, now: i64) -> IngestOutcome {
185        let Some(rule_id) = disposition.rule_id.clone() else {
186            return IngestOutcome::Rejected(
187                "incident-scoped disposition could not be resolved to a rule_id".to_string(),
188            );
189        };
190
191        let key = disposition.dedup_key();
192        if self.seen.contains(&key) {
193            return IngestOutcome::Duplicate;
194        }
195        // Order the seen entry by the disposition's own timestamp so the
196        // idempotency key lives exactly as long as its bucket can contribute to
197        // the window: once a record ages out of the window its bucket is pruned
198        // on apply, so a later redelivery lands in an already-pruned bucket and
199        // cannot double count.
200        self.remember(key, disposition.timestamp, now);
201
202        let idx = self.bucket_index(disposition.timestamp);
203        let cutoff = self.bucket_index(now - self.config.window_secs());
204        let buckets = self.rules.entry(rule_id).or_default();
205        buckets.entry(idx).or_default().add(disposition.verdict);
206        buckets.retain(|&i, _| i >= cutoff);
207
208        IngestOutcome::Accepted
209    }
210
211    fn remember(&mut self, key: String, ts: i64, now: i64) {
212        self.seen.insert(key.clone());
213        self.seen_order.push_back((ts, key));
214        self.prune_seen(now);
215    }
216
217    fn prune_seen(&mut self, now: i64) {
218        let cutoff = now - self.config.window_secs();
219        while let Some((ts, _)) = self.seen_order.front() {
220            let over_capacity = self.seen_order.len() > self.config.max_seen_ids;
221            if *ts < cutoff || over_capacity {
222                if let Some((_, key)) = self.seen_order.pop_front() {
223                    self.seen.remove(&key);
224                }
225            } else {
226                break;
227            }
228        }
229    }
230
231    /// Drop buckets older than the window across every rule (called on a
232    /// periodic tick). Rules left with no buckets are removed.
233    pub fn prune(&mut self, now: i64) {
234        let cutoff = self.bucket_index(now - self.config.window_secs());
235        self.rules.retain(|_, buckets| {
236            buckets.retain(|&i, _| i >= cutoff);
237            !buckets.is_empty()
238        });
239        self.prune_seen(now);
240    }
241
242    fn aggregate(&self, rule_id: &str) -> VerdictCounts {
243        let mut total = VerdictCounts::default();
244        if let Some(buckets) = self.rules.get(rule_id) {
245            for counts in buckets.values() {
246                total.merge(counts);
247            }
248        }
249        total
250    }
251
252    fn ratio_of(&self, counts: &VerdictCounts) -> Option<f64> {
253        let total = counts.total();
254        if total < self.config.min_sample || total == 0 {
255            return None;
256        }
257        let numerator = match self.config.numerator {
258            Numerator::FpOnly => counts.false_positive,
259            Numerator::FpAndBtp => counts.false_positive + counts.benign_true_positive,
260        };
261        Some(numerator as f64 / total as f64)
262    }
263
264    /// The false-positive ratio for one rule over the window, or `None` when it
265    /// has fewer than `min_sample` dispositions.
266    pub fn ratio(&self, rule_id: &str) -> Option<f64> {
267        self.ratio_of(&self.aggregate(rule_id))
268    }
269
270    /// A per-rule summary for every rule with at least one disposition, sorted
271    /// by `rule_id` for stable output.
272    pub fn summaries(&self) -> Vec<RuleSummary> {
273        let mut out: Vec<RuleSummary> = self
274            .rules
275            .keys()
276            .map(|rule_id| {
277                let counts = self.aggregate(rule_id);
278                RuleSummary {
279                    rule_id: rule_id.clone(),
280                    true_positives: counts.true_positive,
281                    false_positives: counts.false_positive,
282                    benign_true_positives: counts.benign_true_positive,
283                    total: counts.total(),
284                    fp_ratio: self.ratio_of(&counts),
285                }
286            })
287            .collect();
288        out.sort_by(|a, b| a.rule_id.cmp(&b.rule_id));
289        out
290    }
291
292    /// Number of rules currently tracked.
293    pub fn rule_count(&self) -> usize {
294        self.rules.len()
295    }
296
297    /// Capture the store into a versioned snapshot for persistence.
298    pub fn snapshot(&self) -> DispositionSnapshot {
299        DispositionSnapshot {
300            version: super::snapshot::SNAPSHOT_VERSION,
301            numerator: self.config.numerator.as_str().to_string(),
302            rules: self
303                .rules
304                .iter()
305                .map(|(rule_id, buckets)| RuleBucketsSnapshot {
306                    rule_id: rule_id.clone(),
307                    buckets: buckets
308                        .iter()
309                        .map(|(&idx, c)| {
310                            (
311                                idx,
312                                c.true_positive,
313                                c.false_positive,
314                                c.benign_true_positive,
315                            )
316                        })
317                        .collect(),
318                })
319                .collect(),
320            seen: self.seen_order.iter().cloned().collect(),
321        }
322    }
323
324    /// Restore a snapshot at `now`, pruning buckets and seen ids past the
325    /// window. Returns `false` on a version mismatch (caller starts fresh).
326    pub fn restore(&mut self, snapshot: DispositionSnapshot, now: i64) -> bool {
327        if snapshot.version != super::snapshot::SNAPSHOT_VERSION {
328            return false;
329        }
330        let cutoff = self.bucket_index(now - self.config.window_secs());
331        for rule in snapshot.rules {
332            let mut buckets = BTreeMap::new();
333            for (idx, tp, fp, btp) in rule.buckets {
334                if idx < cutoff {
335                    continue;
336                }
337                buckets.insert(
338                    idx,
339                    VerdictCounts {
340                        true_positive: tp,
341                        false_positive: fp,
342                        benign_true_positive: btp,
343                    },
344                );
345            }
346            if !buckets.is_empty() {
347                self.rules.insert(rule.rule_id, buckets);
348            }
349        }
350        let seen_cutoff = now - self.config.window_secs();
351        for (ts, key) in snapshot.seen {
352            if ts < seen_cutoff {
353                continue;
354            }
355            if self.seen.insert(key.clone()) {
356                self.seen_order.push_back((ts, key));
357            }
358        }
359        self.prune_seen(now);
360        true
361    }
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367    use crate::dispositions::record::RawDisposition;
368
369    fn disp(rule: &str, verdict: &str, ts: i64) -> Disposition {
370        let raw: RawDisposition = serde_json::from_str(&format!(
371            r#"{{"rule_id": "{rule}", "verdict": "{verdict}"}}"#
372        ))
373        .unwrap();
374        let mut d = Disposition::from_raw(raw, ts).unwrap();
375        d.timestamp = ts;
376        d
377    }
378
379    fn cfg(min_sample: u64) -> DispositionConfig {
380        DispositionConfig {
381            min_sample,
382            ..Default::default()
383        }
384    }
385
386    #[test]
387    fn ratio_is_suppressed_below_min_sample() {
388        let mut store = DispositionStore::new(cfg(5));
389        store.apply(&disp("r", "false_positive", 1000), 1000);
390        assert_eq!(store.ratio("r"), None);
391
392        // Distinct timestamps so the no-identity fallback dedup key
393        // (rule_id, timestamp, analyst) does not collapse them.
394        for i in 1..5 {
395            store.apply(&disp("r", "true_positive", 1000 + i), 1000 + i);
396        }
397        // Now total == 5 (1 fp + 4 tp).
398        assert_eq!(store.ratio("r"), Some(1.0 / 5.0));
399    }
400
401    #[test]
402    fn numerator_fp_and_btp_counts_benign() {
403        let mut store = DispositionStore::new(DispositionConfig {
404            min_sample: 1,
405            numerator: Numerator::FpAndBtp,
406            ..Default::default()
407        });
408        store.apply(&disp("r", "false_positive", 10), 10);
409        store.apply(&disp("r", "benign_true_positive", 11), 11);
410        store.apply(&disp("r", "true_positive", 12), 12);
411        // (1 fp + 1 btp) / 3 total.
412        assert_eq!(store.ratio("r"), Some(2.0 / 3.0));
413    }
414
415    #[test]
416    fn idempotency_collapses_redelivery() {
417        let mut store = DispositionStore::new(cfg(1));
418        let raw: RawDisposition = serde_json::from_str(
419            r#"{"rule_id": "r", "verdict": "false_positive", "fingerprint": "fp1"}"#,
420        )
421        .unwrap();
422        let d = Disposition::from_raw(raw, 100).unwrap();
423        assert_eq!(store.apply(&d, 100), IngestOutcome::Accepted);
424        assert_eq!(store.apply(&d, 101), IngestOutcome::Duplicate);
425        // Only counted once.
426        assert_eq!(store.summaries()[0].total, 1);
427    }
428
429    fn incident_disp(rule: &str, incident: &str, ts: i64) -> Disposition {
430        let raw: RawDisposition = serde_json::from_str(&format!(
431            r#"{{"rule_id":"{rule}","scope":"incident","incident_id":"{incident}","verdict":"false_positive"}}"#
432        ))
433        .unwrap();
434        let mut d = Disposition::from_raw(raw, ts).unwrap();
435        d.timestamp = ts;
436        d
437    }
438
439    #[test]
440    fn incident_expansion_counts_every_contributing_rule() {
441        // The per-rule records an incident expands into share the incident id
442        // and verdict but differ by rule_id, so all of them must count rather
443        // than collapsing to the first.
444        let mut store = DispositionStore::new(cfg(1));
445        for rule in ["r1", "r2", "r3"] {
446            assert_eq!(
447                store.apply(&incident_disp(rule, "inc1", 100), 100),
448                IngestOutcome::Accepted
449            );
450        }
451        assert_eq!(store.summaries().len(), 3);
452
453        // Re-expanding the same incident (a redelivery) dedups every rule.
454        for rule in ["r1", "r2", "r3"] {
455            assert_eq!(
456                store.apply(&incident_disp(rule, "inc1", 100), 100),
457                IngestOutcome::Duplicate
458            );
459        }
460    }
461
462    #[test]
463    fn unresolved_incident_is_rejected() {
464        let raw: RawDisposition = serde_json::from_str(
465            r#"{"verdict": "true_positive", "scope": "incident", "incident_id": "i1"}"#,
466        )
467        .unwrap();
468        let d = Disposition::from_raw(raw, 1).unwrap();
469        let mut store = DispositionStore::new(cfg(1));
470        assert!(matches!(store.apply(&d, 1), IngestOutcome::Rejected(_)));
471    }
472
473    #[test]
474    fn window_pruning_drops_old_buckets() {
475        let mut store = DispositionStore::new(cfg(1));
476        let day = 24 * 60 * 60;
477        // An old false positive 40 days ago, then a recent true positive.
478        store.apply(&disp("r", "false_positive", 1000), 1000);
479        let now = 1000 + 40 * day;
480        store.apply(&disp("r", "true_positive", now), now);
481        // The 40-day-old bucket is outside the 30-day window.
482        let summary = &store.summaries()[0];
483        assert_eq!(summary.false_positives, 0);
484        assert_eq!(summary.true_positives, 1);
485    }
486
487    #[test]
488    fn snapshot_round_trips() {
489        let mut store = DispositionStore::new(cfg(1));
490        store.apply(&disp("r", "false_positive", 1000), 1000);
491        store.apply(&disp("r", "true_positive", 1001), 1001);
492        store.apply(&disp("s", "true_positive", 1002), 1002);
493        let snap = store.snapshot();
494
495        let mut restored = DispositionStore::new(cfg(1));
496        assert!(restored.restore(snap, 1002));
497        assert_eq!(restored.summaries(), store.summaries());
498    }
499
500    #[test]
501    fn restore_rejects_version_mismatch() {
502        let mut snap = DispositionStore::new(cfg(1)).snapshot();
503        snap.version = 9999;
504        let mut store = DispositionStore::new(cfg(1));
505        assert!(!store.restore(snap, 1));
506    }
507}