kql_panopticon/execution/processing/steps/
scoring.rs

1//! Scoring step execution handler
2//!
3//! Evaluates scoring indicators against acquisition data to produce risk scores.
4
5use crate::error::{Error, Result};
6use crate::execution::processing::{
7    ProcessingContext, ProcessingStepHandler, ProcessingStepOutput, ProcessingStepType,
8};
9use crate::pack::{
10    MatchedIndicator, ProcessingStep, ProcessingStepConfig, ScoringConfig, ScoringResult,
11};
12use crate::variable::{evaluate_condition_new as evaluate_condition, EvaluationContext};
13use async_trait::async_trait;
14use serde_json::json;
15use std::time::Instant;
16use tracing::debug;
17
18/// Handler for scoring processing steps
19pub struct ScoringStepHandler;
20
21impl ScoringStepHandler {
22    /// Create a new scoring handler
23    pub fn new() -> Self {
24        Self
25    }
26}
27
28impl Default for ScoringStepHandler {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34#[async_trait]
35impl ProcessingStepHandler for ScoringStepHandler {
36    fn handles(&self) -> ProcessingStepType {
37        ProcessingStepType::Scoring
38    }
39
40    async fn execute(
41        &self,
42        step: &ProcessingStep,
43        ctx: &ProcessingContext<'_>,
44    ) -> Result<ProcessingStepOutput> {
45        let start = Instant::now();
46
47        // Extract scoring config
48        let ProcessingStepConfig::Scoring(scoring_config) = &step.config;
49
50        debug!(
51            "Executing scoring step '{}' with {} indicators",
52            step.name,
53            scoring_config.indicators.len()
54        );
55
56        // Evaluate scoring against acquisition results
57        let result = evaluate_scoring(scoring_config, ctx)?;
58
59        debug!(
60            "Scoring step '{}' completed: score={}, level={}, {} matched indicators",
61            step.name,
62            result.score,
63            result.level,
64            result.matched_indicators.len()
65        );
66
67        // Write result to JSONL
68        let mut writer = ctx.writer(&step.name)?;
69        writer.write_row(&json!({
70            "score": result.score,
71            "level": result.level,
72            "matched_indicators": result.matched_indicators,
73            "summary": result.summary,
74            "recommendation": result.recommendation,
75        }))?;
76
77        let handle = writer.finish()?;
78        Ok(ProcessingStepOutput::new(handle, start.elapsed()))
79    }
80
81    fn validate(&self, step: &ProcessingStep) -> Result<()> {
82        let ProcessingStepConfig::Scoring(config) = &step.config;
83
84        // Must have at least one indicator
85        if config.indicators.is_empty() {
86            return Err(Error::pack(format!(
87                "Scoring step '{}' must have at least one indicator",
88                step.name
89            )));
90        }
91
92        // Validate indicator conditions (basic syntax check)
93        for indicator in &config.indicators {
94            if indicator.condition.trim().is_empty() {
95                return Err(Error::pack(format!(
96                    "Indicator '{}' in step '{}' has empty condition",
97                    indicator.name, step.name
98                )));
99            }
100        }
101
102        // Should have at least one threshold
103        if config.thresholds.is_empty() {
104            return Err(Error::pack(format!(
105                "Scoring step '{}' should have at least one threshold",
106                step.name
107            )));
108        }
109
110        Ok(())
111    }
112}
113
114/// Evaluate scoring indicators against step data
115fn evaluate_scoring(
116    config: &ScoringConfig,
117    ctx: &ProcessingContext<'_>,
118) -> Result<ScoringResult> {
119    let mut total_score: i32 = 0;
120    let mut matched_indicators = Vec::new();
121
122    // Create evaluation context from acquisition results
123    let eval_ctx = EvaluationContext::new()
124        .with_step_results(ctx.acquisition_results().clone());
125
126    // Evaluate each indicator against acquisition results
127    for indicator in &config.indicators {
128        let matched = match evaluate_condition(&indicator.condition, &eval_ctx) {
129            Ok(met) => met,
130            Err(e) => {
131                debug!(
132                    "Indicator '{}': condition='{}' evaluation failed: {}",
133                    indicator.name, indicator.condition, e
134                );
135                false
136            }
137        };
138
139        debug!(
140            "Indicator '{}': condition='{}' matched={}",
141            indicator.name, indicator.condition, matched
142        );
143
144        if matched {
145            total_score += indicator.weight;
146            matched_indicators.push(MatchedIndicator {
147                name: indicator.name.clone(),
148                weight: indicator.weight,
149                description: indicator.description.clone(),
150            });
151        }
152    }
153
154    // Determine level based on thresholds (sorted descending by min_score)
155    let mut thresholds: Vec<_> = config.thresholds.iter().collect();
156    thresholds.sort_by(|a, b| b.min_score.cmp(&a.min_score));
157
158    let (level, summary, recommendation) = thresholds
159        .iter()
160        .find(|t| total_score >= t.min_score)
161        .map(|t| {
162            // Replace {{score}} placeholder in summary if present
163            let summary = t
164                .summary
165                .as_ref()
166                .map(|s| s.replace("{{score}}", &total_score.to_string()));
167            (t.level.clone(), summary, t.recommendation.clone())
168        })
169        .unwrap_or_else(|| ("NONE".to_string(), None, None));
170
171    Ok(ScoringResult {
172        score: total_score,
173        level,
174        matched_indicators,
175        summary,
176        recommendation,
177    })
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use crate::execution::result::{ResultContext, ResultWriter};
184    use crate::pack::{ScoringIndicator, ScoringThreshold};
185    use serde_json::Value as JsonValue;
186
187    fn create_test_handle(
188        dir: &std::path::Path,
189        step_name: &str,
190        rows: &[JsonValue],
191    ) -> crate::execution::result::ResultHandle {
192        let path = dir.join(format!("{}.jsonl", step_name));
193        let mut writer = ResultWriter::new(&path, step_name).unwrap();
194        writer.write_rows(rows).unwrap();
195        writer.finish().unwrap()
196    }
197
198    fn make_result_context(temp_dir: &std::path::Path) -> ResultContext {
199        let mut results = ResultContext::new();
200
201        let signins_handle = create_test_handle(
202            temp_dir,
203            "signins",
204            &[
205                json!({"user": "alice", "failed": 15, "country": "US"}),
206                json!({"user": "bob", "failed": 2, "country": "UK"}),
207            ],
208        );
209        results.insert("signins", signins_handle);
210
211        let alerts_handle = create_test_handle(
212            temp_dir,
213            "alerts",
214            &[json!({"severity": "High", "count": 3})],
215        );
216        results.insert("alerts", alerts_handle);
217
218        results
219    }
220
221    fn make_scoring_config() -> ScoringConfig {
222        ScoringConfig {
223            indicators: vec![
224                ScoringIndicator {
225                    name: "high_failures".to_string(),
226                    condition: "{{signins | any(failed > 10)}}".to_string(),
227                    weight: 25,
228                    description: Some("High number of failed logins".to_string()),
229                },
230                ScoringIndicator {
231                    name: "high_alerts".to_string(),
232                    condition: "{{alerts | any(severity == 'High')}}".to_string(),
233                    weight: 30,
234                    description: Some("High severity alerts present".to_string()),
235                },
236            ],
237            thresholds: vec![
238                ScoringThreshold {
239                    level: "HIGH".to_string(),
240                    min_score: 50,
241                    summary: Some("High risk detected".to_string()),
242                    recommendation: Some("Review recommended".to_string()),
243                },
244                ScoringThreshold {
245                    level: "MEDIUM".to_string(),
246                    min_score: 25,
247                    summary: Some("Moderate risk".to_string()),
248                    recommendation: None,
249                },
250                ScoringThreshold {
251                    level: "LOW".to_string(),
252                    min_score: 0,
253                    summary: None,
254                    recommendation: None,
255                },
256            ],
257        }
258    }
259
260    #[tokio::test]
261    async fn test_scoring_execution() {
262        let temp_dir = tempfile::tempdir().unwrap();
263        let handler = ScoringStepHandler::new();
264        let acquisition_results = make_result_context(temp_dir.path());
265
266        // Create output directory for processing results
267        let output_dir = temp_dir.path().join("processing");
268        std::fs::create_dir_all(&output_dir).unwrap();
269
270        let ctx = ProcessingContext::new(&acquisition_results, &output_dir);
271
272        let step = ProcessingStep {
273            name: "risk_score".to_string(),
274            config: ProcessingStepConfig::Scoring(make_scoring_config()),
275            when: None,
276        };
277
278        let output = handler.execute(&step, &ctx).await.unwrap();
279
280        // Verify output has a result handle
281        assert!(!output.is_empty());
282        assert_eq!(output.handle().row_count().unwrap(), 1);
283
284        // Read back the result
285        let rows = output.handle().materialize().unwrap();
286        assert_eq!(rows.len(), 1);
287
288        let result = &rows[0];
289        // Should match high_failures (25) and high_alerts (30) = 55
290        assert_eq!(result["score"], json!(55));
291        assert_eq!(result["level"], json!("HIGH"));
292    }
293
294    #[tokio::test]
295    async fn test_scoring_empty_data() {
296        let temp_dir = tempfile::tempdir().unwrap();
297        let handler = ScoringStepHandler::new();
298        let acquisition_results = ResultContext::new();
299
300        let output_dir = temp_dir.path().join("processing");
301        std::fs::create_dir_all(&output_dir).unwrap();
302
303        let ctx = ProcessingContext::new(&acquisition_results, &output_dir);
304
305        let step = ProcessingStep {
306            name: "risk_score".to_string(),
307            config: ProcessingStepConfig::Scoring(make_scoring_config()),
308            when: None,
309        };
310
311        let output = handler.execute(&step, &ctx).await.unwrap();
312
313        // Read back the result
314        let rows = output.handle().materialize().unwrap();
315        assert_eq!(rows.len(), 1);
316
317        let result = &rows[0];
318        // No matches with empty data
319        assert_eq!(result["score"], json!(0));
320        assert_eq!(result["level"], json!("LOW"));
321    }
322
323    #[test]
324    fn test_scoring_validation() {
325        let handler = ScoringStepHandler::new();
326
327        // Valid step
328        let valid_step = ProcessingStep {
329            name: "test".to_string(),
330            config: ProcessingStepConfig::Scoring(make_scoring_config()),
331            when: None,
332        };
333        assert!(handler.validate(&valid_step).is_ok());
334
335        // Empty indicators
336        let empty_indicators = ProcessingStep {
337            name: "test".to_string(),
338            config: ProcessingStepConfig::Scoring(ScoringConfig {
339                indicators: vec![],
340                thresholds: vec![ScoringThreshold {
341                    level: "LOW".to_string(),
342                    min_score: 0,
343                    summary: None,
344                    recommendation: None,
345                }],
346            }),
347            when: None,
348        };
349        assert!(handler.validate(&empty_indicators).is_err());
350    }
351}