kql_panopticon/execution/processing/
phase.rs1use crate::error::{Error, Result};
6use crate::execution::progress::{ExecutionPhase, ProgressSender};
7use crate::execution::result::ResultContext;
8use crate::pack::{Processing, ProcessingStepConfig};
9use crate::variable::{evaluate_condition_new as evaluate_condition, EvaluationContext};
10use std::collections::HashMap;
11use std::path::Path;
12use std::time::{Duration, Instant};
13use tracing::{debug, info};
14
15use super::context::ProcessingContext;
16use super::handler::{ProcessingStepHandler, ProcessingStepType};
17
18#[derive(Debug)]
20pub struct ProcessingPhaseOutput {
21 pub results: ResultContext,
23
24 pub step_statuses: HashMap<String, ProcessingStepStatus>,
26
27 pub duration: Duration,
29
30 pub failed_step: Option<String>,
32
33 pub failure_reason: Option<String>,
35}
36
37#[derive(Debug, Clone)]
39pub struct ProcessingStepStatus {
40 pub name: String,
41 pub status: ProcessingStatus,
42 pub duration_ms: u64,
43 pub error: Option<String>,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ProcessingStatus {
49 Success,
50 Failed,
51 Skipped,
52}
53
54pub struct ProcessingPhaseHandler {
58 handlers: HashMap<ProcessingStepType, Box<dyn ProcessingStepHandler>>,
59}
60
61impl ProcessingPhaseHandler {
62 pub fn new() -> Self {
64 Self {
65 handlers: HashMap::new(),
66 }
67 }
68
69 pub fn register(&mut self, handler: Box<dyn ProcessingStepHandler>) {
71 self.handlers.insert(handler.handles(), handler);
72 }
73
74 fn get_handler(&self, step_type: ProcessingStepType) -> Option<&dyn ProcessingStepHandler> {
76 self.handlers.get(&step_type).map(|h| h.as_ref())
77 }
78
79 fn step_type_from_config(config: &ProcessingStepConfig) -> ProcessingStepType {
81 match config {
82 ProcessingStepConfig::Scoring(_) => ProcessingStepType::Scoring,
83 }
84 }
85
86 pub async fn execute(
88 &self,
89 processing: &Processing,
90 acquisition_results: &ResultContext,
91 output_dir: &Path,
92 progress: Option<&ProgressSender>,
93 ) -> Result<ProcessingPhaseOutput> {
94 let phase_start = Instant::now();
95 let mut step_statuses: HashMap<String, ProcessingStepStatus> = HashMap::new();
96 let mut failed_step = None;
97 let mut failure_reason = None;
98
99 let mut ctx = ProcessingContext::new(acquisition_results, output_dir);
101
102 debug!(
103 phase = "processing",
104 steps = processing.steps.len(),
105 "Starting processing phase"
106 );
107
108 for step in &processing.steps {
110 if let Some(when_condition) = &step.when {
112 let eval_ctx = EvaluationContext::new()
114 .with_step_results(acquisition_results.clone());
115
116 let condition_met = match evaluate_condition(when_condition, &eval_ctx) {
117 Ok(met) => met,
118 Err(e) => {
119 tracing::warn!(
120 "Processing step '{}' when='{}' evaluation failed: {}",
121 step.name, when_condition, e
122 );
123 false
124 }
125 };
126
127 debug!(
128 "Processing step '{}' when='{}' evaluated to: {}",
129 step.name, when_condition, condition_met
130 );
131
132 if !condition_met {
133 let reason = format!("Condition not met: {}", when_condition);
134 if let Some(tx) = progress {
135 tx.step_skipped(&step.name, "", ExecutionPhase::Processing, &reason);
136 }
137
138 step_statuses.insert(
139 step.name.clone(),
140 ProcessingStepStatus {
141 name: step.name.clone(),
142 status: ProcessingStatus::Skipped,
143 duration_ms: 0,
144 error: Some(reason),
145 },
146 );
147 continue;
148 }
149 }
150
151 if let Some(tx) = progress {
153 tx.step_started(&step.name, "", ExecutionPhase::Processing);
154 }
155
156 debug!(step = %step.name, "Starting processing step execution");
157
158 let step_type = Self::step_type_from_config(&step.config);
160 let handler = match self.get_handler(step_type) {
161 Some(h) => h,
162 None => {
163 let error_msg = format!("No handler for step type {:?}", step_type);
164 failed_step = Some(step.name.clone());
165 failure_reason = Some(error_msg.clone());
166
167 step_statuses.insert(
168 step.name.clone(),
169 ProcessingStepStatus {
170 name: step.name.clone(),
171 status: ProcessingStatus::Failed,
172 duration_ms: 0,
173 error: Some(error_msg),
174 },
175 );
176 break;
177 }
178 };
179
180 let result = handler.execute(step, &ctx).await;
182
183 match result {
184 Ok(output) => {
185 let duration_ms = output.duration_ms();
186
187 if let Some(tx) = progress {
189 tx.step_completed(&step.name, "", ExecutionPhase::Processing, 0, duration_ms);
190 }
191
192 ctx.register_result(&step.name, output.into_handle());
194
195 step_statuses.insert(
196 step.name.clone(),
197 ProcessingStepStatus {
198 name: step.name.clone(),
199 status: ProcessingStatus::Success,
200 duration_ms,
201 error: None,
202 },
203 );
204
205 info!(
206 message = "processing_step.completed",
207 step = %step.name,
208 duration_ms = duration_ms,
209 );
210 }
211 Err(e) => {
212 let error_msg = e.to_string();
213
214 if let Some(tx) = progress {
215 tx.step_failed(&step.name, "", ExecutionPhase::Processing, &error_msg);
216 }
217
218 failed_step = Some(step.name.clone());
219 failure_reason = Some(error_msg.clone());
220
221 step_statuses.insert(
222 step.name.clone(),
223 ProcessingStepStatus {
224 name: step.name.clone(),
225 status: ProcessingStatus::Failed,
226 duration_ms: 0,
227 error: Some(error_msg),
228 },
229 );
230
231 break;
233 }
234 }
235 }
236
237 Ok(ProcessingPhaseOutput {
238 results: ctx.take_results(),
239 step_statuses,
240 duration: phase_start.elapsed(),
241 failed_step,
242 failure_reason,
243 })
244 }
245
246 pub fn validate(&self, processing: &Processing) -> Result<()> {
248 for step in &processing.steps {
249 let step_type = Self::step_type_from_config(&step.config);
250 let handler = self.get_handler(step_type).ok_or_else(|| {
251 Error::pack(format!(
252 "No handler for step type {:?} in step '{}'",
253 step_type, step.name
254 ))
255 })?;
256
257 handler.validate(step)?;
258 }
259
260 Ok(())
261 }
262}
263
264impl Default for ProcessingPhaseHandler {
265 fn default() -> Self {
266 Self::new()
267 }
268}