1mod accumulator;
21mod config;
22mod incident;
23mod object;
24mod score;
25mod snapshot;
26mod tactics;
27
28pub use accumulator::{IncidentConfig, RiskCaps, RiskState};
29pub use config::{
30 IncidentFile, IncludeLabel, ObjectFile, ReducerLabel, RiskCapsFile, RiskConfigError, RiskFile,
31 ScopeConfig, ScoreFile, build_risk_layer, load_risk_file, parse_risk_config,
32};
33pub use incident::{IncludeMode, RiskEntityView, RiskIncidentResult, RiskRef};
34pub use object::RiskObject;
35pub use score::DEFAULT_SCORE_ATTRIBUTE;
36pub use snapshot::{EntitySnapshot, RiskStateSnapshot, SNAPSHOT_VERSION};
37
38use rsigma_eval::{EvaluationResult, ProcessResult};
39use serde_json::Value;
40
41use crate::{MetricsHook, Scope};
42
43use accumulator::Contribution;
44use object::ObjectSelector;
45use score::ScoreConfig;
46
47const RISK_SCORE_KEY: &str = "risk.score";
49const RISK_OBJECTS_KEY: &str = "risk.objects";
51
52#[derive(Debug, Default)]
55pub struct RiskOutput {
56 pub kept: ProcessResult,
58 pub risk_events: Vec<Value>,
62 pub incidents: Vec<RiskIncidentResult>,
65}
66
67#[derive(Debug)]
73pub struct RiskLayer {
74 scope: Scope,
75 strip_event: bool,
76 score: ScoreConfig,
77 objects: Vec<ObjectSelector>,
78 emit_risk_events: bool,
79 nats_subject: Option<String>,
80 incident: Option<IncidentConfig>,
81}
82
83impl RiskLayer {
84 #[allow(clippy::too_many_arguments)]
86 pub(crate) fn new(
87 scope: Scope,
88 strip_event: bool,
89 score: ScoreConfig,
90 objects: Vec<ObjectSelector>,
91 emit_risk_events: bool,
92 nats_subject: Option<String>,
93 incident: Option<IncidentConfig>,
94 ) -> Self {
95 RiskLayer {
96 scope,
97 strip_event,
98 score,
99 objects,
100 emit_risk_events,
101 nats_subject,
102 incident,
103 }
104 }
105
106 pub fn risk_event_nats_subject(&self) -> Option<&str> {
108 self.nats_subject.as_deref()
109 }
110
111 pub fn incident_config(&self) -> Option<&IncidentConfig> {
113 self.incident.as_ref()
114 }
115
116 pub fn incident_nats_subject(&self) -> Option<&str> {
118 self.incident
119 .as_ref()
120 .and_then(|c| c.nats_subject.as_deref())
121 }
122
123 pub fn process(
128 &self,
129 results: ProcessResult,
130 state: &mut RiskState,
131 now: i64,
132 metrics: &dyn MetricsHook,
133 ) -> RiskOutput {
134 let start = std::time::Instant::now();
135 let mut out = RiskOutput {
136 kept: Vec::with_capacity(results.len()),
137 risk_events: Vec::new(),
138 incidents: Vec::new(),
139 };
140
141 for mut result in results {
142 if !self.scope.matches(&result) {
143 metrics.on_risk_annotation("skipped");
144 out.kept.push(result);
145 continue;
146 }
147
148 let score = self.score.resolve(&result);
149 let objects = object::extract(&result, &self.objects);
150 metrics.observe_risk_annotation_score(score as f64);
151 if objects.is_empty() {
152 metrics.on_risk_annotation("no_entity");
153 } else {
154 metrics.on_risk_annotation("scored");
155 metrics.on_risk_objects(objects.len() as u64);
156 }
157
158 annotate(&mut result, score, &objects);
159
160 if self.emit_risk_events {
161 for object in &objects {
162 out.risk_events
163 .push(risk_event(&result, score, object, now));
164 }
165 }
166
167 if let Some(cfg) = self.incident.as_ref()
168 && !objects.is_empty()
169 {
170 self.accumulate(cfg, &result, score, &objects, state, now, metrics, &mut out);
171 }
172
173 if self.strip_event {
174 strip_event_payloads(&mut result);
175 }
176 out.kept.push(result);
177 }
178
179 if self.incident.is_some() {
180 metrics.set_risk_entities_open(state.len() as i64);
181 metrics.set_risk_state_entries(state.total_entries() as i64);
182 }
183 metrics.observe_risk_layer_duration(start.elapsed().as_secs_f64());
184 out
185 }
186
187 #[allow(clippy::too_many_arguments)]
189 fn accumulate(
190 &self,
191 cfg: &IncidentConfig,
192 result: &EvaluationResult,
193 score: i64,
194 objects: &[RiskObject],
195 state: &mut RiskState,
196 now: i64,
197 metrics: &dyn MetricsHook,
198 out: &mut RiskOutput,
199 ) {
200 let tactics = tactics::extract(&result.header.tags);
201 let rule = result
202 .header
203 .rule_id
204 .clone()
205 .unwrap_or_else(|| result.header.rule_title.clone());
206 let level = result.header.level.map(|l| l.as_str().to_string());
207 let stored_result = if matches!(cfg.include, IncludeMode::Results) {
208 let mut stripped = result.clone();
209 strip_event_payloads(&mut stripped);
210 serde_json::to_value(&stripped).ok()
211 } else {
212 None
213 };
214
215 for object in objects {
216 let contribution = Contribution {
217 ts: now,
218 score,
219 tactics: tactics.clone(),
220 rule: rule.clone(),
221 level: level.clone(),
222 result: stored_result.clone(),
223 };
224 let outcome = state.record(cfg, &object.object_type, &object.value, contribution, now);
225 if outcome.evicted {
226 metrics.on_risk_eviction();
227 }
228 if let Some(incident) = outcome.incident {
229 metrics.on_risk_incident_emitted(incident.trigger);
230 out.incidents.push(incident);
231 }
232 }
233 }
234
235 pub fn tick(&self, state: &mut RiskState, now: i64, metrics: &dyn MetricsHook) {
238 let Some(cfg) = self.incident.as_ref() else {
239 return;
240 };
241 let evicted = state.tick(cfg, now);
242 for _ in 0..evicted {
243 metrics.on_risk_eviction();
244 }
245 metrics.set_risk_entities_open(state.len() as i64);
246 metrics.set_risk_state_entries(state.total_entries() as i64);
247 }
248
249 pub fn snapshot(&self, state: &RiskState) -> RiskStateSnapshot {
251 state.snapshot()
252 }
253
254 pub fn restore(&self, state: &mut RiskState, snap: RiskStateSnapshot, now: i64) -> bool {
258 let Some(cfg) = self.incident.as_ref() else {
259 return false;
260 };
261 state.restore(
262 snap,
263 cfg.window.as_secs() as i64,
264 cfg.caps.max_open_entities,
265 now,
266 )
267 }
268}
269
270fn annotate(result: &mut EvaluationResult, score: i64, objects: &[RiskObject]) {
273 let map = result
274 .header
275 .enrichments
276 .get_or_insert_with(serde_json::Map::new);
277 if map.contains_key(RISK_SCORE_KEY) || map.contains_key(RISK_OBJECTS_KEY) {
278 tracing::debug!("risk layer: overwriting a user-set `risk.*` enrichment key");
281 }
282 map.insert(RISK_SCORE_KEY.to_string(), Value::from(score));
283 if !objects.is_empty() {
284 let value = serde_json::to_value(objects).unwrap_or(Value::Null);
285 map.insert(RISK_OBJECTS_KEY.to_string(), value);
286 }
287}
288
289fn strip_event_payloads(result: &mut EvaluationResult) {
292 if let Some(detection) = result.as_detection_mut() {
293 detection.event = None;
294 }
295 if let Some(correlation) = result.as_correlation_mut() {
296 correlation.events = None;
297 correlation.event_refs = None;
298 }
299}
300
301fn risk_event(result: &EvaluationResult, score: i64, object: &RiskObject, now: i64) -> Value {
304 let mut map = serde_json::Map::new();
305 map.insert("risk_event".to_string(), Value::Bool(true));
306 map.insert("timestamp".to_string(), Value::from(now));
307 map.insert(
308 "rule".to_string(),
309 Value::String(
310 result
311 .header
312 .rule_id
313 .clone()
314 .unwrap_or_else(|| result.header.rule_title.clone()),
315 ),
316 );
317 map.insert(
318 "rule_title".to_string(),
319 Value::String(result.header.rule_title.clone()),
320 );
321 if let Some(level) = result.header.level {
322 map.insert(
323 "level".to_string(),
324 Value::String(level.as_str().to_string()),
325 );
326 }
327 map.insert("risk_score".to_string(), Value::from(score));
328 map.insert(
329 "risk_object".to_string(),
330 serde_json::to_value(object).unwrap_or(Value::Null),
331 );
332 Value::Object(map)
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::NoopMetrics;
339 use rsigma_eval::{DetectionBody, FieldMatch, ResultBody, RuleHeader};
340 use rsigma_parser::Level;
341 use std::collections::HashMap;
342 use std::sync::Arc;
343
344 fn layer(yaml: &str) -> RiskLayer {
345 parse_risk_config(yaml).unwrap()
346 }
347
348 fn detection(ip: &str, level: Level, tags: Vec<&str>) -> EvaluationResult {
349 EvaluationResult {
350 header: RuleHeader {
351 rule_title: "Suspicious activity".to_string(),
352 rule_id: Some("rule-1".to_string()),
353 level: Some(level),
354 tags: tags.into_iter().map(str::to_string).collect(),
355 custom_attributes: Arc::new(HashMap::new()),
356 enrichments: None,
357 },
358 body: ResultBody::Detection(DetectionBody {
359 matched_selections: vec![],
360 matched_fields: vec![FieldMatch::new("SourceIp", serde_json::json!(ip))],
361 event: Some(serde_json::json!({"raw": "event"})),
362 }),
363 }
364 }
365
366 #[test]
367 fn annotates_score_and_objects() {
368 let p = layer(
369 "score:\n level_scores:\n high: 40\nobjects:\n - type: src_ip\n selector: match.SourceIp\n",
370 );
371 let out = p.process(
372 vec![detection("10.0.0.1", Level::High, vec![])],
373 &mut RiskState::default(),
374 0,
375 &NoopMetrics,
376 );
377 assert_eq!(out.kept.len(), 1);
378 let enr = out.kept[0].header.enrichments.as_ref().unwrap();
379 assert_eq!(enr["risk.score"], serde_json::json!(40));
380 assert_eq!(
381 enr["risk.objects"],
382 serde_json::json!([{"type": "src_ip", "value": "10.0.0.1"}])
383 );
384 }
385
386 #[test]
387 fn out_of_scope_passes_through_unannotated() {
388 let p = layer(
389 "scope:\n levels: [critical]\nobjects:\n - type: src_ip\n selector: match.SourceIp\n",
390 );
391 let out = p.process(
392 vec![detection("10.0.0.1", Level::High, vec![])],
393 &mut RiskState::default(),
394 0,
395 &NoopMetrics,
396 );
397 assert_eq!(out.kept.len(), 1);
398 assert!(out.kept[0].header.enrichments.is_none());
399 }
400
401 #[test]
402 fn no_entity_still_annotates_score_only() {
403 let p = layer(
404 "score:\n default_score: 7\nobjects:\n - type: user\n selector: enrichment.user\n",
405 );
406 let out = p.process(
407 vec![detection("10.0.0.1", Level::High, vec![])],
408 &mut RiskState::default(),
409 0,
410 &NoopMetrics,
411 );
412 let enr = out.kept[0].header.enrichments.as_ref().unwrap();
413 assert_eq!(enr["risk.score"], serde_json::json!(7));
414 assert!(!enr.contains_key("risk.objects"));
415 }
416
417 #[test]
418 fn emits_risk_event_per_object_when_opted_in() {
419 let p = layer(
420 "emit_risk_events: true\nscore:\n default_score: 5\nobjects:\n - type: src_ip\n selector: match.SourceIp\n",
421 );
422 let out = p.process(
423 vec![detection("10.0.0.1", Level::High, vec![])],
424 &mut RiskState::default(),
425 1234,
426 &NoopMetrics,
427 );
428 assert_eq!(out.risk_events.len(), 1);
429 let ev = &out.risk_events[0];
430 assert_eq!(ev["risk_event"], serde_json::json!(true));
431 assert_eq!(ev["risk_score"], serde_json::json!(5));
432 assert_eq!(ev["timestamp"], serde_json::json!(1234));
433 assert_eq!(ev["risk_object"]["value"], serde_json::json!("10.0.0.1"));
434 }
435
436 #[test]
437 fn strip_event_drops_payload_after_extraction() {
438 let p = layer("strip_event: true\nobjects:\n - type: host\n selector: event.raw\n");
439 let out = p.process(
440 vec![detection("10.0.0.1", Level::High, vec![])],
441 &mut RiskState::default(),
442 0,
443 &NoopMetrics,
444 );
445 assert!(out.kept[0].as_detection().unwrap().event.is_none());
447 assert_eq!(
448 out.kept[0].header.enrichments.as_ref().unwrap()["risk.objects"],
449 serde_json::json!([{"type": "host", "value": "event"}])
450 );
451 }
452}