kql_panopticon/execution/processing/steps/
scoring.rs1use crate::error::{Error, Result};
6use crate::execution::processing::{
7 ProcessingContext, ProcessingStepHandler, ProcessingStepOutput, ProcessingStepType,
8};
9use crate::pack::{
10 MatchedIndicator, ProcessingStep, ProcessingStepConfig, ScoringConfig, ScoringResult,
11};
12use crate::variable::{evaluate_condition_new as evaluate_condition, EvaluationContext};
13use async_trait::async_trait;
14use serde_json::json;
15use std::time::Instant;
16use tracing::debug;
17
18pub struct ScoringStepHandler;
20
21impl ScoringStepHandler {
22 pub fn new() -> Self {
24 Self
25 }
26}
27
28impl Default for ScoringStepHandler {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34#[async_trait]
35impl ProcessingStepHandler for ScoringStepHandler {
36 fn handles(&self) -> ProcessingStepType {
37 ProcessingStepType::Scoring
38 }
39
40 async fn execute(
41 &self,
42 step: &ProcessingStep,
43 ctx: &ProcessingContext<'_>,
44 ) -> Result<ProcessingStepOutput> {
45 let start = Instant::now();
46
47 let ProcessingStepConfig::Scoring(scoring_config) = &step.config;
49
50 debug!(
51 "Executing scoring step '{}' with {} indicators",
52 step.name,
53 scoring_config.indicators.len()
54 );
55
56 let result = evaluate_scoring(scoring_config, ctx)?;
58
59 debug!(
60 "Scoring step '{}' completed: score={}, level={}, {} matched indicators",
61 step.name,
62 result.score,
63 result.level,
64 result.matched_indicators.len()
65 );
66
67 let mut writer = ctx.writer(&step.name)?;
69 writer.write_row(&json!({
70 "score": result.score,
71 "level": result.level,
72 "matched_indicators": result.matched_indicators,
73 "summary": result.summary,
74 "recommendation": result.recommendation,
75 }))?;
76
77 let handle = writer.finish()?;
78 Ok(ProcessingStepOutput::new(handle, start.elapsed()))
79 }
80
81 fn validate(&self, step: &ProcessingStep) -> Result<()> {
82 let ProcessingStepConfig::Scoring(config) = &step.config;
83
84 if config.indicators.is_empty() {
86 return Err(Error::pack(format!(
87 "Scoring step '{}' must have at least one indicator",
88 step.name
89 )));
90 }
91
92 for indicator in &config.indicators {
94 if indicator.condition.trim().is_empty() {
95 return Err(Error::pack(format!(
96 "Indicator '{}' in step '{}' has empty condition",
97 indicator.name, step.name
98 )));
99 }
100 }
101
102 if config.thresholds.is_empty() {
104 return Err(Error::pack(format!(
105 "Scoring step '{}' should have at least one threshold",
106 step.name
107 )));
108 }
109
110 Ok(())
111 }
112}
113
114fn evaluate_scoring(
116 config: &ScoringConfig,
117 ctx: &ProcessingContext<'_>,
118) -> Result<ScoringResult> {
119 let mut total_score: i32 = 0;
120 let mut matched_indicators = Vec::new();
121
122 let eval_ctx = EvaluationContext::new()
124 .with_step_results(ctx.acquisition_results().clone());
125
126 for indicator in &config.indicators {
128 let matched = match evaluate_condition(&indicator.condition, &eval_ctx) {
129 Ok(met) => met,
130 Err(e) => {
131 debug!(
132 "Indicator '{}': condition='{}' evaluation failed: {}",
133 indicator.name, indicator.condition, e
134 );
135 false
136 }
137 };
138
139 debug!(
140 "Indicator '{}': condition='{}' matched={}",
141 indicator.name, indicator.condition, matched
142 );
143
144 if matched {
145 total_score += indicator.weight;
146 matched_indicators.push(MatchedIndicator {
147 name: indicator.name.clone(),
148 weight: indicator.weight,
149 description: indicator.description.clone(),
150 });
151 }
152 }
153
154 let mut thresholds: Vec<_> = config.thresholds.iter().collect();
156 thresholds.sort_by(|a, b| b.min_score.cmp(&a.min_score));
157
158 let (level, summary, recommendation) = thresholds
159 .iter()
160 .find(|t| total_score >= t.min_score)
161 .map(|t| {
162 let summary = t
164 .summary
165 .as_ref()
166 .map(|s| s.replace("{{score}}", &total_score.to_string()));
167 (t.level.clone(), summary, t.recommendation.clone())
168 })
169 .unwrap_or_else(|| ("NONE".to_string(), None, None));
170
171 Ok(ScoringResult {
172 score: total_score,
173 level,
174 matched_indicators,
175 summary,
176 recommendation,
177 })
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::execution::result::{ResultContext, ResultWriter};
184 use crate::pack::{ScoringIndicator, ScoringThreshold};
185 use serde_json::Value as JsonValue;
186
187 fn create_test_handle(
188 dir: &std::path::Path,
189 step_name: &str,
190 rows: &[JsonValue],
191 ) -> crate::execution::result::ResultHandle {
192 let path = dir.join(format!("{}.jsonl", step_name));
193 let mut writer = ResultWriter::new(&path, step_name).unwrap();
194 writer.write_rows(rows).unwrap();
195 writer.finish().unwrap()
196 }
197
198 fn make_result_context(temp_dir: &std::path::Path) -> ResultContext {
199 let mut results = ResultContext::new();
200
201 let signins_handle = create_test_handle(
202 temp_dir,
203 "signins",
204 &[
205 json!({"user": "alice", "failed": 15, "country": "US"}),
206 json!({"user": "bob", "failed": 2, "country": "UK"}),
207 ],
208 );
209 results.insert("signins", signins_handle);
210
211 let alerts_handle = create_test_handle(
212 temp_dir,
213 "alerts",
214 &[json!({"severity": "High", "count": 3})],
215 );
216 results.insert("alerts", alerts_handle);
217
218 results
219 }
220
221 fn make_scoring_config() -> ScoringConfig {
222 ScoringConfig {
223 indicators: vec![
224 ScoringIndicator {
225 name: "high_failures".to_string(),
226 condition: "{{signins | any(failed > 10)}}".to_string(),
227 weight: 25,
228 description: Some("High number of failed logins".to_string()),
229 },
230 ScoringIndicator {
231 name: "high_alerts".to_string(),
232 condition: "{{alerts | any(severity == 'High')}}".to_string(),
233 weight: 30,
234 description: Some("High severity alerts present".to_string()),
235 },
236 ],
237 thresholds: vec![
238 ScoringThreshold {
239 level: "HIGH".to_string(),
240 min_score: 50,
241 summary: Some("High risk detected".to_string()),
242 recommendation: Some("Review recommended".to_string()),
243 },
244 ScoringThreshold {
245 level: "MEDIUM".to_string(),
246 min_score: 25,
247 summary: Some("Moderate risk".to_string()),
248 recommendation: None,
249 },
250 ScoringThreshold {
251 level: "LOW".to_string(),
252 min_score: 0,
253 summary: None,
254 recommendation: None,
255 },
256 ],
257 }
258 }
259
260 #[tokio::test]
261 async fn test_scoring_execution() {
262 let temp_dir = tempfile::tempdir().unwrap();
263 let handler = ScoringStepHandler::new();
264 let acquisition_results = make_result_context(temp_dir.path());
265
266 let output_dir = temp_dir.path().join("processing");
268 std::fs::create_dir_all(&output_dir).unwrap();
269
270 let ctx = ProcessingContext::new(&acquisition_results, &output_dir);
271
272 let step = ProcessingStep {
273 name: "risk_score".to_string(),
274 config: ProcessingStepConfig::Scoring(make_scoring_config()),
275 when: None,
276 };
277
278 let output = handler.execute(&step, &ctx).await.unwrap();
279
280 assert!(!output.is_empty());
282 assert_eq!(output.handle().row_count().unwrap(), 1);
283
284 let rows = output.handle().materialize().unwrap();
286 assert_eq!(rows.len(), 1);
287
288 let result = &rows[0];
289 assert_eq!(result["score"], json!(55));
291 assert_eq!(result["level"], json!("HIGH"));
292 }
293
294 #[tokio::test]
295 async fn test_scoring_empty_data() {
296 let temp_dir = tempfile::tempdir().unwrap();
297 let handler = ScoringStepHandler::new();
298 let acquisition_results = ResultContext::new();
299
300 let output_dir = temp_dir.path().join("processing");
301 std::fs::create_dir_all(&output_dir).unwrap();
302
303 let ctx = ProcessingContext::new(&acquisition_results, &output_dir);
304
305 let step = ProcessingStep {
306 name: "risk_score".to_string(),
307 config: ProcessingStepConfig::Scoring(make_scoring_config()),
308 when: None,
309 };
310
311 let output = handler.execute(&step, &ctx).await.unwrap();
312
313 let rows = output.handle().materialize().unwrap();
315 assert_eq!(rows.len(), 1);
316
317 let result = &rows[0];
318 assert_eq!(result["score"], json!(0));
320 assert_eq!(result["level"], json!("LOW"));
321 }
322
323 #[test]
324 fn test_scoring_validation() {
325 let handler = ScoringStepHandler::new();
326
327 let valid_step = ProcessingStep {
329 name: "test".to_string(),
330 config: ProcessingStepConfig::Scoring(make_scoring_config()),
331 when: None,
332 };
333 assert!(handler.validate(&valid_step).is_ok());
334
335 let empty_indicators = ProcessingStep {
337 name: "test".to_string(),
338 config: ProcessingStepConfig::Scoring(ScoringConfig {
339 indicators: vec![],
340 thresholds: vec![ScoringThreshold {
341 level: "LOW".to_string(),
342 min_score: 0,
343 summary: None,
344 recommendation: None,
345 }],
346 }),
347 when: None,
348 };
349 assert!(handler.validate(&empty_indicators).is_err());
350 }
351}