Skip to main content

rsigma_runtime/risk/
mod.rs

1//! Post-engine risk-based alerting layer.
2//!
3//! An optional stage in the daemon sink path, between post-evaluation
4//! enrichment and the alert pipeline, modeled on Splunk RBA and Entity Risk
5//! Scoring. It runs in two stages:
6//!
7//! - **Stage one (risk annotation):** each firing detection is assigned a risk
8//!   score and one or more risk objects (entities such as `user`, `host`,
9//!   `src_ip`). The score and objects are injected into `header.enrichments`
10//!   under the reserved `risk.score` / `risk.objects` keys, and, when
11//!   `emit_risk_events` is set, a compact risk event is emitted per
12//!   `(detection, risk object)` pair.
13//!
14//! The layer is strictly post-engine: it consumes [`EvaluationResult`]s and
15//! emits annotated [`EvaluationResult`]s plus additive risk events, so the
16//! evaluation hot path is untouched. The immutable, validated config
17//! ([`RiskLayer`]) is built from a YAML file and swapped atomically on
18//! hot-reload.
19
20mod accumulator;
21mod config;
22mod incident;
23mod object;
24mod score;
25mod snapshot;
26mod tactics;
27
28pub use accumulator::{IncidentConfig, RiskCaps, RiskState};
29pub use config::{
30    IncidentFile, IncludeLabel, ObjectFile, ReducerLabel, RiskCapsFile, RiskConfigError, RiskFile,
31    ScopeConfig, ScoreFile, build_risk_layer, load_risk_file, parse_risk_config,
32};
33pub use incident::{IncludeMode, RiskEntityView, RiskIncidentResult, RiskRef};
34pub use object::RiskObject;
35pub use score::DEFAULT_SCORE_ATTRIBUTE;
36pub use snapshot::{EntitySnapshot, RiskStateSnapshot, SNAPSHOT_VERSION};
37
38use rsigma_eval::{EvaluationResult, ProcessResult};
39use serde_json::Value;
40
41use crate::{MetricsHook, Scope};
42
43use accumulator::Contribution;
44use object::ObjectSelector;
45use score::ScoreConfig;
46
47/// Reserved enrichment key carrying the resolved risk score.
48const RISK_SCORE_KEY: &str = "risk.score";
49/// Reserved enrichment key carrying the extracted risk objects.
50const RISK_OBJECTS_KEY: &str = "risk.objects";
51
52/// Output of [`RiskLayer::process`]: the annotated pass-through results and the
53/// additive risk events (opt-in).
54#[derive(Debug, Default)]
55pub struct RiskOutput {
56    /// Annotated detections, flowing on to the alert pipeline and the sinks.
57    pub kept: ProcessResult,
58    /// Compact risk events, one per `(detection, risk object)` pair, dispatched
59    /// as additive NDJSON (optionally to a dedicated subject). Empty unless
60    /// `emit_risk_events` is set.
61    pub risk_events: Vec<Value>,
62    /// High-fidelity risk incidents emitted when an entity crossed a threshold,
63    /// dispatched via the incident path (optionally to a dedicated subject).
64    pub incidents: Vec<RiskIncidentResult>,
65}
66
67/// A validated, runnable risk layer.
68///
69/// Immutable after construction and cheap to hold behind an `Arc`, so it can be
70/// swapped atomically on hot-reload while the sink task keeps a live snapshot
71/// for the duration of a batch.
72#[derive(Debug)]
73pub struct RiskLayer {
74    scope: Scope,
75    strip_event: bool,
76    score: ScoreConfig,
77    objects: Vec<ObjectSelector>,
78    emit_risk_events: bool,
79    nats_subject: Option<String>,
80    incident: Option<IncidentConfig>,
81}
82
83impl RiskLayer {
84    /// Construct from validated parts. Prefer [`build_risk_layer`].
85    #[allow(clippy::too_many_arguments)]
86    pub(crate) fn new(
87        scope: Scope,
88        strip_event: bool,
89        score: ScoreConfig,
90        objects: Vec<ObjectSelector>,
91        emit_risk_events: bool,
92        nats_subject: Option<String>,
93        incident: Option<IncidentConfig>,
94    ) -> Self {
95        RiskLayer {
96            scope,
97            strip_event,
98            score,
99            objects,
100            emit_risk_events,
101            nats_subject,
102            incident,
103        }
104    }
105
106    /// The optional NATS subject override for emitted risk events.
107    pub fn risk_event_nats_subject(&self) -> Option<&str> {
108        self.nats_subject.as_deref()
109    }
110
111    /// The validated incident config, if the accumulator is enabled.
112    pub fn incident_config(&self) -> Option<&IncidentConfig> {
113        self.incident.as_ref()
114    }
115
116    /// The optional NATS subject override for emitted risk incidents.
117    pub fn incident_nats_subject(&self) -> Option<&str> {
118        self.incident
119            .as_ref()
120            .and_then(|c| c.nats_subject.as_deref())
121    }
122
123    /// Annotate each in-scope detection with its risk score and risk objects,
124    /// accumulate per-entity risk (emitting incidents on threshold crossings),
125    /// and (opt-in) emit a compact risk event per `(detection, risk object)`
126    /// pair. Out-of-scope results pass through untouched.
127    pub fn process(
128        &self,
129        results: ProcessResult,
130        state: &mut RiskState,
131        now: i64,
132        metrics: &dyn MetricsHook,
133    ) -> RiskOutput {
134        let start = std::time::Instant::now();
135        let mut out = RiskOutput {
136            kept: Vec::with_capacity(results.len()),
137            risk_events: Vec::new(),
138            incidents: Vec::new(),
139        };
140
141        for mut result in results {
142            if !self.scope.matches(&result) {
143                metrics.on_risk_annotation("skipped");
144                out.kept.push(result);
145                continue;
146            }
147
148            let score = self.score.resolve(&result);
149            let objects = object::extract(&result, &self.objects);
150            metrics.observe_risk_annotation_score(score as f64);
151            if objects.is_empty() {
152                metrics.on_risk_annotation("no_entity");
153            } else {
154                metrics.on_risk_annotation("scored");
155                metrics.on_risk_objects(objects.len() as u64);
156            }
157
158            annotate(&mut result, score, &objects);
159
160            if self.emit_risk_events {
161                for object in &objects {
162                    out.risk_events
163                        .push(risk_event(&result, score, object, now));
164                }
165            }
166
167            if let Some(cfg) = self.incident.as_ref()
168                && !objects.is_empty()
169            {
170                self.accumulate(cfg, &result, score, &objects, state, now, metrics, &mut out);
171            }
172
173            if self.strip_event {
174                strip_event_payloads(&mut result);
175            }
176            out.kept.push(result);
177        }
178
179        if self.incident.is_some() {
180            metrics.set_risk_entities_open(state.len() as i64);
181            metrics.set_risk_state_entries(state.total_entries() as i64);
182        }
183        metrics.observe_risk_layer_duration(start.elapsed().as_secs_f64());
184        out
185    }
186
187    /// Feed one annotated detection's risk objects into the accumulator.
188    #[allow(clippy::too_many_arguments)]
189    fn accumulate(
190        &self,
191        cfg: &IncidentConfig,
192        result: &EvaluationResult,
193        score: i64,
194        objects: &[RiskObject],
195        state: &mut RiskState,
196        now: i64,
197        metrics: &dyn MetricsHook,
198        out: &mut RiskOutput,
199    ) {
200        let tactics = tactics::extract(&result.header.tags);
201        let rule = result
202            .header
203            .rule_id
204            .clone()
205            .unwrap_or_else(|| result.header.rule_title.clone());
206        let level = result.header.level.map(|l| l.as_str().to_string());
207        let stored_result = if matches!(cfg.include, IncludeMode::Results) {
208            let mut stripped = result.clone();
209            strip_event_payloads(&mut stripped);
210            serde_json::to_value(&stripped).ok()
211        } else {
212            None
213        };
214
215        for object in objects {
216            let contribution = Contribution {
217                ts: now,
218                score,
219                tactics: tactics.clone(),
220                rule: rule.clone(),
221                level: level.clone(),
222                result: stored_result.clone(),
223            };
224            let outcome = state.record(cfg, &object.object_type, &object.value, contribution, now);
225            if outcome.evicted {
226                metrics.on_risk_eviction();
227            }
228            if let Some(incident) = outcome.incident {
229                metrics.on_risk_incident_emitted(incident.trigger);
230                out.incidents.push(incident);
231            }
232        }
233    }
234
235    /// Advance time: prune entities whose windows have aged out and refresh the
236    /// open-entity and state-entry gauges. A no-op when the accumulator is off.
237    pub fn tick(&self, state: &mut RiskState, now: i64, metrics: &dyn MetricsHook) {
238        let Some(cfg) = self.incident.as_ref() else {
239            return;
240        };
241        let evicted = state.tick(cfg, now);
242        for _ in 0..evicted {
243            metrics.on_risk_eviction();
244        }
245        metrics.set_risk_entities_open(state.len() as i64);
246        metrics.set_risk_state_entries(state.total_entries() as i64);
247    }
248
249    /// Capture the accumulator into a versioned persistence snapshot.
250    pub fn snapshot(&self, state: &RiskState) -> RiskStateSnapshot {
251        state.snapshot()
252    }
253
254    /// Restore a snapshot into `state`, pruning entries past the window at
255    /// `now`. Returns `false` on a version mismatch, or when no accumulator is
256    /// configured to restore into (the caller starts fresh).
257    pub fn restore(&self, state: &mut RiskState, snap: RiskStateSnapshot, now: i64) -> bool {
258        let Some(cfg) = self.incident.as_ref() else {
259            return false;
260        };
261        state.restore(
262            snap,
263            cfg.window.as_secs() as i64,
264            cfg.caps.max_open_entities,
265            now,
266        )
267    }
268}
269
270/// Inject the reserved `risk.score` / `risk.objects` keys into a result's
271/// enrichments. The layer wins on a collision with a user enricher.
272fn annotate(result: &mut EvaluationResult, score: i64, objects: &[RiskObject]) {
273    let map = result
274        .header
275        .enrichments
276        .get_or_insert_with(serde_json::Map::new);
277    if map.contains_key(RISK_SCORE_KEY) || map.contains_key(RISK_OBJECTS_KEY) {
278        // Debug, not warn: an upstream enricher setting these on every result
279        // would otherwise emit one log line per result.
280        tracing::debug!("risk layer: overwriting a user-set `risk.*` enrichment key");
281    }
282    map.insert(RISK_SCORE_KEY.to_string(), Value::from(score));
283    if !objects.is_empty() {
284        let value = serde_json::to_value(objects).unwrap_or(Value::Null);
285        map.insert(RISK_OBJECTS_KEY.to_string(), value);
286    }
287}
288
289/// Remove raw event payloads from a result, so the layer can extract on
290/// `event.*` without emitting full events when `strip_event` is set.
291fn strip_event_payloads(result: &mut EvaluationResult) {
292    if let Some(detection) = result.as_detection_mut() {
293        detection.event = None;
294    }
295    if let Some(correlation) = result.as_correlation_mut() {
296        correlation.events = None;
297        correlation.event_refs = None;
298    }
299}
300
301/// Build one compact risk event for a `(detection, risk object)` pair. The
302/// `risk_event` marker key disambiguates it on the wire.
303fn risk_event(result: &EvaluationResult, score: i64, object: &RiskObject, now: i64) -> Value {
304    let mut map = serde_json::Map::new();
305    map.insert("risk_event".to_string(), Value::Bool(true));
306    map.insert("timestamp".to_string(), Value::from(now));
307    map.insert(
308        "rule".to_string(),
309        Value::String(
310            result
311                .header
312                .rule_id
313                .clone()
314                .unwrap_or_else(|| result.header.rule_title.clone()),
315        ),
316    );
317    map.insert(
318        "rule_title".to_string(),
319        Value::String(result.header.rule_title.clone()),
320    );
321    if let Some(level) = result.header.level {
322        map.insert(
323            "level".to_string(),
324            Value::String(level.as_str().to_string()),
325        );
326    }
327    map.insert("risk_score".to_string(), Value::from(score));
328    map.insert(
329        "risk_object".to_string(),
330        serde_json::to_value(object).unwrap_or(Value::Null),
331    );
332    Value::Object(map)
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::NoopMetrics;
339    use rsigma_eval::{DetectionBody, FieldMatch, ResultBody, RuleHeader};
340    use rsigma_parser::Level;
341    use std::collections::HashMap;
342    use std::sync::Arc;
343
344    fn layer(yaml: &str) -> RiskLayer {
345        parse_risk_config(yaml).unwrap()
346    }
347
348    fn detection(ip: &str, level: Level, tags: Vec<&str>) -> EvaluationResult {
349        EvaluationResult {
350            header: RuleHeader {
351                rule_title: "Suspicious activity".to_string(),
352                rule_id: Some("rule-1".to_string()),
353                level: Some(level),
354                tags: tags.into_iter().map(str::to_string).collect(),
355                custom_attributes: Arc::new(HashMap::new()),
356                enrichments: None,
357            },
358            body: ResultBody::Detection(DetectionBody {
359                matched_selections: vec![],
360                matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
361                event: Some(serde_json::json!({"raw": "event"})),
362            }),
363        }
364    }
365
366    #[test]
367    fn annotates_score_and_objects() {
368        let p = layer(
369            "score:\n  level_scores:\n    high: 40\nobjects:\n  - type: src_ip\n    selector: match.SourceIp\n",
370        );
371        let out = p.process(
372            vec![detection("10.0.0.1", Level::High, vec![])],
373            &mut RiskState::default(),
374            0,
375            &NoopMetrics,
376        );
377        assert_eq!(out.kept.len(), 1);
378        let enr = out.kept[0].header.enrichments.as_ref().unwrap();
379        assert_eq!(enr["risk.score"], serde_json::json!(40));
380        assert_eq!(
381            enr["risk.objects"],
382            serde_json::json!([{"type": "src_ip", "value": "10.0.0.1"}])
383        );
384    }
385
386    #[test]
387    fn out_of_scope_passes_through_unannotated() {
388        let p = layer(
389            "scope:\n  levels: [critical]\nobjects:\n  - type: src_ip\n    selector: match.SourceIp\n",
390        );
391        let out = p.process(
392            vec![detection("10.0.0.1", Level::High, vec![])],
393            &mut RiskState::default(),
394            0,
395            &NoopMetrics,
396        );
397        assert_eq!(out.kept.len(), 1);
398        assert!(out.kept[0].header.enrichments.is_none());
399    }
400
401    #[test]
402    fn no_entity_still_annotates_score_only() {
403        let p = layer(
404            "score:\n  default_score: 7\nobjects:\n  - type: user\n    selector: enrichment.user\n",
405        );
406        let out = p.process(
407            vec![detection("10.0.0.1", Level::High, vec![])],
408            &mut RiskState::default(),
409            0,
410            &NoopMetrics,
411        );
412        let enr = out.kept[0].header.enrichments.as_ref().unwrap();
413        assert_eq!(enr["risk.score"], serde_json::json!(7));
414        assert!(!enr.contains_key("risk.objects"));
415    }
416
417    #[test]
418    fn emits_risk_event_per_object_when_opted_in() {
419        let p = layer(
420            "emit_risk_events: true\nscore:\n  default_score: 5\nobjects:\n  - type: src_ip\n    selector: match.SourceIp\n",
421        );
422        let out = p.process(
423            vec![detection("10.0.0.1", Level::High, vec![])],
424            &mut RiskState::default(),
425            1234,
426            &NoopMetrics,
427        );
428        assert_eq!(out.risk_events.len(), 1);
429        let ev = &out.risk_events[0];
430        assert_eq!(ev["risk_event"], serde_json::json!(true));
431        assert_eq!(ev["risk_score"], serde_json::json!(5));
432        assert_eq!(ev["timestamp"], serde_json::json!(1234));
433        assert_eq!(ev["risk_object"]["value"], serde_json::json!("10.0.0.1"));
434    }
435
436    #[test]
437    fn strip_event_drops_payload_after_extraction() {
438        let p = layer("strip_event: true\nobjects:\n  - type: host\n    selector: event.raw\n");
439        let out = p.process(
440            vec![detection("10.0.0.1", Level::High, vec![])],
441            &mut RiskState::default(),
442            0,
443            &NoopMetrics,
444        );
445        // Event was used for extraction then stripped from the delivered result.
446        assert!(out.kept[0].as_detection().unwrap().event.is_none());
447        assert_eq!(
448            out.kept[0].header.enrichments.as_ref().unwrap()["risk.objects"],
449            serde_json::json!([{"type": "host", "value": "event"}])
450        );
451    }
452}