kql_panopticon/execution/processing/
phase.rs

1//! Processing phase handler
2//!
3//! Orchestrates execution of processing steps.
4
5use crate::error::{Error, Result};
6use crate::execution::progress::{ExecutionPhase, ProgressSender};
7use crate::execution::result::ResultContext;
8use crate::pack::{Processing, ProcessingStepConfig};
9use crate::variable::{evaluate_condition_new as evaluate_condition, EvaluationContext};
10use std::collections::HashMap;
11use std::path::Path;
12use std::time::{Duration, Instant};
13use tracing::{debug, info};
14
15use super::context::ProcessingContext;
16use super::handler::{ProcessingStepHandler, ProcessingStepType};
17
18/// Phase-level output from processing
19#[derive(Debug)]
20pub struct ProcessingPhaseOutput {
21    /// Processing results keyed by step name (file-backed via ResultContext)
22    pub results: ResultContext,
23
24    /// Per-step execution status
25    pub step_statuses: HashMap<String, ProcessingStepStatus>,
26
27    /// Total duration
28    pub duration: Duration,
29
30    /// First failed step (if any)
31    pub failed_step: Option<String>,
32
33    /// Failure reason
34    pub failure_reason: Option<String>,
35}
36
37/// Status of a processing step execution
38#[derive(Debug, Clone)]
39pub struct ProcessingStepStatus {
40    pub name: String,
41    pub status: ProcessingStatus,
42    pub duration_ms: u64,
43    pub error: Option<String>,
44}
45
46/// Processing step execution status
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ProcessingStatus {
49    Success,
50    Failed,
51    Skipped,
52}
53
54/// Processing phase handler
55///
56/// Coordinates execution of all processing steps.
57pub struct ProcessingPhaseHandler {
58    handlers: HashMap<ProcessingStepType, Box<dyn ProcessingStepHandler>>,
59}
60
61impl ProcessingPhaseHandler {
62    /// Create a new phase handler
63    pub fn new() -> Self {
64        Self {
65            handlers: HashMap::new(),
66        }
67    }
68
69    /// Register a step handler
70    pub fn register(&mut self, handler: Box<dyn ProcessingStepHandler>) {
71        self.handlers.insert(handler.handles(), handler);
72    }
73
74    /// Get handler for a step type
75    fn get_handler(&self, step_type: ProcessingStepType) -> Option<&dyn ProcessingStepHandler> {
76        self.handlers.get(&step_type).map(|h| h.as_ref())
77    }
78
79    /// Get step type from config
80    fn step_type_from_config(config: &ProcessingStepConfig) -> ProcessingStepType {
81        match config {
82            ProcessingStepConfig::Scoring(_) => ProcessingStepType::Scoring,
83        }
84    }
85
86    /// Execute the processing phase
87    pub async fn execute(
88        &self,
89        processing: &Processing,
90        acquisition_results: &ResultContext,
91        output_dir: &Path,
92        progress: Option<&ProgressSender>,
93    ) -> Result<ProcessingPhaseOutput> {
94        let phase_start = Instant::now();
95        let mut step_statuses: HashMap<String, ProcessingStepStatus> = HashMap::new();
96        let mut failed_step = None;
97        let mut failure_reason = None;
98
99        // Create context with output directory for writing results
100        let mut ctx = ProcessingContext::new(acquisition_results, output_dir);
101
102        debug!(
103            phase = "processing",
104            steps = processing.steps.len(),
105            "Starting processing phase"
106        );
107
108        // Execute each step
109        for step in &processing.steps {
110            // Check `when` condition
111            if let Some(when_condition) = &step.when {
112                // Create evaluation context from acquisition results
113                let eval_ctx = EvaluationContext::new()
114                    .with_step_results(acquisition_results.clone());
115
116                let condition_met = match evaluate_condition(when_condition, &eval_ctx) {
117                    Ok(met) => met,
118                    Err(e) => {
119                        tracing::warn!(
120                            "Processing step '{}' when='{}' evaluation failed: {}",
121                            step.name, when_condition, e
122                        );
123                        false
124                    }
125                };
126
127                debug!(
128                    "Processing step '{}' when='{}' evaluated to: {}",
129                    step.name, when_condition, condition_met
130                );
131
132                if !condition_met {
133                    let reason = format!("Condition not met: {}", when_condition);
134                    if let Some(tx) = progress {
135                        tx.step_skipped(&step.name, "", ExecutionPhase::Processing, &reason);
136                    }
137
138                    step_statuses.insert(
139                        step.name.clone(),
140                        ProcessingStepStatus {
141                            name: step.name.clone(),
142                            status: ProcessingStatus::Skipped,
143                            duration_ms: 0,
144                            error: Some(reason),
145                        },
146                    );
147                    continue;
148                }
149            }
150
151            // Emit step started
152            if let Some(tx) = progress {
153                tx.step_started(&step.name, "", ExecutionPhase::Processing);
154            }
155
156            debug!(step = %step.name, "Starting processing step execution");
157
158            // Get handler for this step type
159            let step_type = Self::step_type_from_config(&step.config);
160            let handler = match self.get_handler(step_type) {
161                Some(h) => h,
162                None => {
163                    let error_msg = format!("No handler for step type {:?}", step_type);
164                    failed_step = Some(step.name.clone());
165                    failure_reason = Some(error_msg.clone());
166
167                    step_statuses.insert(
168                        step.name.clone(),
169                        ProcessingStepStatus {
170                            name: step.name.clone(),
171                            status: ProcessingStatus::Failed,
172                            duration_ms: 0,
173                            error: Some(error_msg),
174                        },
175                    );
176                    break;
177                }
178            };
179
180            // Execute step
181            let result = handler.execute(step, &ctx).await;
182
183            match result {
184                Ok(output) => {
185                    let duration_ms = output.duration_ms();
186
187                    // Emit completed
188                    if let Some(tx) = progress {
189                        tx.step_completed(&step.name, "", ExecutionPhase::Processing, 0, duration_ms);
190                    }
191
192                    // Register result handle in context
193                    ctx.register_result(&step.name, output.into_handle());
194
195                    step_statuses.insert(
196                        step.name.clone(),
197                        ProcessingStepStatus {
198                            name: step.name.clone(),
199                            status: ProcessingStatus::Success,
200                            duration_ms,
201                            error: None,
202                        },
203                    );
204
205                    info!(
206                        message = "processing_step.completed",
207                        step = %step.name,
208                        duration_ms = duration_ms,
209                    );
210                }
211                Err(e) => {
212                    let error_msg = e.to_string();
213
214                    if let Some(tx) = progress {
215                        tx.step_failed(&step.name, "", ExecutionPhase::Processing, &error_msg);
216                    }
217
218                    failed_step = Some(step.name.clone());
219                    failure_reason = Some(error_msg.clone());
220
221                    step_statuses.insert(
222                        step.name.clone(),
223                        ProcessingStepStatus {
224                            name: step.name.clone(),
225                            status: ProcessingStatus::Failed,
226                            duration_ms: 0,
227                            error: Some(error_msg),
228                        },
229                    );
230
231                    // Processing failures stop the phase
232                    break;
233                }
234            }
235        }
236
237        Ok(ProcessingPhaseOutput {
238            results: ctx.take_results(),
239            step_statuses,
240            duration: phase_start.elapsed(),
241            failed_step,
242            failure_reason,
243        })
244    }
245
246    /// Validate all steps in the processing config
247    pub fn validate(&self, processing: &Processing) -> Result<()> {
248        for step in &processing.steps {
249            let step_type = Self::step_type_from_config(&step.config);
250            let handler = self.get_handler(step_type).ok_or_else(|| {
251                Error::pack(format!(
252                    "No handler for step type {:?} in step '{}'",
253                    step_type, step.name
254                ))
255            })?;
256
257            handler.validate(step)?;
258        }
259
260        Ok(())
261    }
262}
263
264impl Default for ProcessingPhaseHandler {
265    fn default() -> Self {
266        Self::new()
267    }
268}