kql_panopticon/execution/
executor.rs

1//! Pack executor for phase-based execution
2//!
3//! Orchestrates pack execution through three phases:
4//! 1. Acquisition - Data collection (per workspace)
5//! 2. Processing - Data transformation (global)
6//! 3. Reporting - Output generation (global)
7
8use crate::client::Client;
9use crate::error::Result;
10use crate::pack::Pack;
11use crate::workspace::Workspace;
12
13use super::acquisition::steps::{FileStepHandler, HttpStepHandler, KqlStepHandler};
14use super::acquisition::AcquisitionPhaseHandler;
15use super::processing::steps::ScoringStepHandler;
16use super::processing::ProcessingPhaseHandler;
17use super::progress::{JobType, ProgressSender};
18use super::reporting::steps::TemplateStepHandler;
19use super::reporting::{ReportingConfig, ReportingPhaseHandler};
20use super::result::ResultContext;
21use super::trace::{ExecutionTrace, TraceStatus};
22use super::types::{
23    ExecutionPhase, ExecutionStatus, PackExecutorConfig, PackExecutorResult, StepResult,
24    WorkspaceResult,
25};
26
27use chrono::Local;
28use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::fs;
33use tracing::{debug, info};
34
35/// Execution mode for the pack
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum ExecutionMode {
38    /// Normal execution
39    #[default]
40    Normal,
41    /// Dry run - validate but don't execute
42    DryRun,
43    /// Debug mode - extra logging
44    Debug,
45}
46
47/// Execution options
48#[derive(Debug, Clone)]
49pub struct ExecutionOptions {
50    /// Execution mode
51    pub mode: ExecutionMode,
52    /// Enable execution tracing
53    pub trace: bool,
54    /// Default timeout for steps
55    pub timeout: Duration,
56}
57
58impl Default for ExecutionOptions {
59    fn default() -> Self {
60        Self {
61            mode: ExecutionMode::Normal,
62            trace: false,
63            timeout: Duration::from_secs(120),
64        }
65    }
66}
67
68/// Pack executor - coordinates phase-based execution
69///
70/// Executes packs through three phases:
71/// 1. **Acquisition**: Per-workspace data collection
72/// 2. **Processing**: Global data transformation
73/// 3. **Reporting**: Global report generation
74pub struct PackExecutor {
75    /// Acquisition phase handler
76    acquisition: AcquisitionPhaseHandler,
77
78    /// Processing phase handler
79    processing: ProcessingPhaseHandler,
80
81    /// Reporting phase handler
82    reporting: ReportingPhaseHandler,
83
84    /// Execution options
85    options: ExecutionOptions,
86}
87
88impl PackExecutor {
89    /// Create a new executor with the given client
90    pub fn new(client: Client) -> Self {
91        let client = Arc::new(client);
92
93        // Initialize acquisition phase with handlers
94        let mut acquisition = AcquisitionPhaseHandler::new();
95        acquisition.register(Box::new(KqlStepHandler::new(client)));
96        acquisition.register(Box::new(HttpStepHandler::new()));
97        acquisition.register(Box::new(FileStepHandler::new()));
98
99        // Initialize processing phase with handlers
100        let mut processing = ProcessingPhaseHandler::new();
101        processing.register(Box::new(ScoringStepHandler::new()));
102
103        // Initialize reporting phase with handlers
104        let mut reporting = ReportingPhaseHandler::new();
105        reporting.register(Box::new(TemplateStepHandler::new()));
106
107        Self {
108            acquisition,
109            processing,
110            reporting,
111            options: ExecutionOptions::default(),
112        }
113    }
114
115    /// Create with custom options
116    pub fn with_options(client: Client, options: ExecutionOptions) -> Self {
117        let mut executor = Self::new(client);
118        executor.options = options;
119        executor
120    }
121
122    /// Set execution options
123    pub fn set_options(&mut self, options: ExecutionOptions) {
124        self.options = options;
125    }
126
127    /// Set default timeout
128    pub fn with_timeout(mut self, timeout: Duration) -> Self {
129        self.options.timeout = timeout;
130        self
131    }
132
133    /// Validate a pack configuration
134    ///
135    /// Validates all phases (acquisition, processing, reporting) to ensure
136    /// the pack can be executed successfully.
137    pub fn validate(&self, config: &PackExecutorConfig) -> Result<()> {
138        let pack = &config.pack;
139
140        // Validate acquisition phase
141        self.acquisition.validate(&pack.acquisition)?;
142
143        // Validate processing phase (if present)
144        if let Some(ref processing) = pack.processing {
145            self.processing.validate(processing)?;
146        }
147
148        // Validate reporting phase (if present)
149        if let Some(ref reporting) = pack.reporting {
150            self.reporting.validate(reporting)?;
151        }
152
153        Ok(())
154    }
155
156    /// Execute a pack across workspaces
157    pub async fn execute(
158        &self,
159        config: PackExecutorConfig,
160        workspaces: Vec<Workspace>,
161        progress: Option<ProgressSender>,
162    ) -> Result<PackExecutorResult> {
163        let job_id = uuid::Uuid::new_v4();
164        let start = Instant::now();
165        let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string();
166
167        let pack = &config.pack;
168        let mut workspace_results: HashMap<String, WorkspaceResult> = HashMap::new();
169        let mut trace = if self.options.trace {
170            Some(ExecutionTrace::new())
171        } else {
172            None
173        };
174
175        info!(
176            job_id = %job_id,
177            pack = %pack.name,
178            workspaces = workspaces.len(),
179            "Starting pack execution"
180        );
181
182        // Emit job started
183        if let Some(ref tx) = progress {
184            tx.started(JobType::Investigation, pack.acquisition.steps.len(), workspaces.len());
185        }
186
187        // Execute per workspace
188        for workspace in &workspaces {
189            debug!(
190                workspace = %workspace.name,
191                workspace_id = %workspace.workspace_id,
192                "Starting workspace execution"
193            );
194
195            // Emit workspace started
196            if let Some(ref tx) = progress {
197                tx.workspace_started(&workspace.name);
198            }
199
200            let ws_result = self
201                .execute_workspace(pack, workspace, &config, &timestamp, progress.as_ref())
202                .await;
203
204            // Emit workspace completed
205            if let Some(ref tx) = progress {
206                tx.workspace_completed(&workspace.name, ws_result.duration_ms);
207            }
208
209            workspace_results.insert(workspace.name.clone(), ws_result);
210        }
211
212        // Determine final status
213        let all_failed = workspace_results
214            .values()
215            .all(|r| r.status == ExecutionStatus::Failed);
216        let all_succeeded = workspace_results
217            .values()
218            .all(|r| r.status == ExecutionStatus::Success);
219
220        let overall_status = if all_failed {
221            ExecutionStatus::Failed
222        } else if all_succeeded {
223            ExecutionStatus::Success
224        } else {
225            ExecutionStatus::Partial
226        };
227
228        let duration_ms = start.elapsed().as_millis() as u64;
229
230        // Emit job completed
231        if let Some(ref tx) = progress {
232            tx.completed(duration_ms);
233        }
234
235        info!(
236            job_id = %job_id,
237            status = ?overall_status,
238            duration_ms = duration_ms,
239            "Pack execution completed"
240        );
241
242        // Finalize trace
243        if let Some(ref mut t) = trace {
244            t.set_status(match overall_status {
245                ExecutionStatus::Success => TraceStatus::Success,
246                ExecutionStatus::Failed => TraceStatus::Failed,
247                ExecutionStatus::Partial => TraceStatus::Partial,
248                ExecutionStatus::Pending | ExecutionStatus::Running => TraceStatus::Running,
249            });
250        }
251
252        // Build output directory for result
253        let output_dir = config.output_dir.clone();
254
255        Ok(PackExecutorResult {
256            job_id,
257            pack_name: pack.name.clone(),
258            status: overall_status,
259            workspace_results,
260            duration_ms,
261            output_dir,
262            trace,
263        })
264    }
265
266    /// Execute pack for a single workspace
267    async fn execute_workspace(
268        &self,
269        pack: &Pack,
270        workspace: &Workspace,
271        config: &PackExecutorConfig,
272        timestamp: &str,
273        progress: Option<&ProgressSender>,
274    ) -> WorkspaceResult {
275        let start = Instant::now();
276
277        // Build output directory
278        let output_dir = config
279            .output_dir
280            .clone()
281            .unwrap_or_else(|| PathBuf::from("./output"));
282
283        let workspace_output_dir = output_dir
284            .join(Workspace::normalize_name(&workspace.subscription_name))
285            .join(Workspace::normalize_name(&workspace.name))
286            .join(timestamp);
287
288        // Create output directory
289        if let Err(e) = fs::create_dir_all(&workspace_output_dir).await {
290            return WorkspaceResult {
291                workspace_name: workspace.name.clone(),
292                workspace_id: workspace.workspace_id.clone(),
293                status: ExecutionStatus::Failed,
294                step_results: HashMap::new(),
295                step_handles: ResultContext::new(),
296                duration_ms: start.elapsed().as_millis() as u64,
297                failed_step: None,
298                failure_reason: Some(format!("Failed to create output directory: {}", e)),
299            };
300        }
301
302        // Phase 1: Acquisition
303        debug!(phase = "acquisition", "Starting acquisition phase");
304
305        let acq_output = self
306            .acquisition
307            .execute(
308                pack,
309                workspace,
310                config.inputs.clone(),
311                &workspace_output_dir,
312                progress,
313            )
314            .await;
315
316        let acq_output = match acq_output {
317            Ok(output) => output,
318            Err(e) => {
319                return WorkspaceResult {
320                    workspace_name: workspace.name.clone(),
321                    workspace_id: workspace.workspace_id.clone(),
322                    status: ExecutionStatus::Failed,
323                    step_results: HashMap::new(),
324                    step_handles: ResultContext::new(),
325                    duration_ms: start.elapsed().as_millis() as u64,
326                    failed_step: None,
327                    failure_reason: Some(format!("Acquisition phase failed: {}", e)),
328                };
329            }
330        };
331
332        // Check if acquisition had a failure
333        if acq_output.failed_step.is_some() {
334            return WorkspaceResult {
335                workspace_name: workspace.name.clone(),
336                workspace_id: workspace.workspace_id.clone(),
337                status: ExecutionStatus::Failed,
338                step_results: convert_step_statuses(&acq_output.step_statuses),
339                step_handles: acq_output.results,
340                duration_ms: start.elapsed().as_millis() as u64,
341                failed_step: acq_output.failed_step,
342                failure_reason: acq_output.failure_reason,
343            };
344        }
345
346        // Phase 2: Processing (if configured)
347        let proc_output = if let Some(ref processing) = pack.processing {
348            if !processing.is_empty() {
349                debug!(phase = "processing", "Starting processing phase");
350
351                let output = self
352                    .processing
353                    .execute(processing, &acq_output.results, &workspace_output_dir, progress)
354                    .await;
355
356                match output {
357                    Ok(o) => Some(o),
358                    Err(e) => {
359                        return WorkspaceResult {
360                            workspace_name: workspace.name.clone(),
361                            workspace_id: workspace.workspace_id.clone(),
362                            status: ExecutionStatus::Failed,
363                            step_results: convert_step_statuses(&acq_output.step_statuses),
364                            step_handles: acq_output.results,
365                            duration_ms: start.elapsed().as_millis() as u64,
366                            failed_step: None,
367                            failure_reason: Some(format!("Processing phase failed: {}", e)),
368                        };
369                    }
370                }
371            } else {
372                None
373            }
374        } else {
375            None
376        };
377
378        // Phase 3: Reporting (if configured)
379        let rep_output = if let Some(ref reporting) = pack.reporting {
380            if !reporting.is_empty() {
381                debug!(phase = "reporting", "Starting reporting phase");
382
383                // Build reporting config
384                let mut report_config = ReportingConfig::new(
385                    reporting,
386                    &pack.name,
387                    workspace,
388                    &config.inputs,
389                    &acq_output.results,
390                    &workspace_output_dir,
391                );
392
393                if let Some(proc) = proc_output.as_ref() {
394                    report_config = report_config.with_processing_results(&proc.results);
395                }
396
397                if let Some(path) = config.pack_path.as_deref() {
398                    report_config = report_config.with_pack_path(path);
399                }
400
401                // Pass ResultContext refs for lazy materialization
402                match self.reporting.execute(report_config, progress).await
403                {
404                    Ok(output) => Some(output),
405                    Err(e) => {
406                        // Reporting failures don't fail the workspace (logged but continued)
407                        tracing::warn!(error = %e, "Reporting phase failed");
408                        None
409                    }
410                }
411            } else {
412                None
413            }
414        } else {
415            None
416        };
417
418        // Build final step results
419        let mut step_results = convert_step_statuses(&acq_output.step_statuses);
420
421        // Add processing step results and merge handles
422        let mut step_handles = acq_output.results;
423        if let Some(proc) = proc_output {
424            // Merge processing result handles
425            step_handles.merge(proc.results);
426
427            // Add processing step statuses
428            for (name, status) in &proc.step_statuses {
429                step_results.insert(
430                    name.clone(),
431                    StepResult {
432                        name: name.clone(),
433                        phase: ExecutionPhase::Processing,
434                        status: match status.status {
435                            super::processing::ProcessingStatus::Success => {
436                                super::types::StepStatus::Success
437                            }
438                            super::processing::ProcessingStatus::Failed => {
439                                super::types::StepStatus::Failed
440                            }
441                            super::processing::ProcessingStatus::Skipped => {
442                                super::types::StepStatus::Skipped
443                            }
444                        },
445                        row_count: None,
446                        duration_ms: status.duration_ms,
447                        output_path: None,
448                        error: status.error.clone(),
449                    },
450                );
451            }
452        }
453
454        // Add reporting step results
455        if let Some(rep) = rep_output {
456            for (name, status) in &rep.report_statuses {
457                step_results.insert(
458                    name.clone(),
459                    StepResult {
460                        name: name.clone(),
461                        phase: ExecutionPhase::Reporting,
462                        status: match status.status {
463                            super::reporting::ReportingStatus::Success => {
464                                super::types::StepStatus::Success
465                            }
466                            super::reporting::ReportingStatus::Failed => {
467                                super::types::StepStatus::Failed
468                            }
469                            super::reporting::ReportingStatus::Skipped => {
470                                super::types::StepStatus::Skipped
471                            }
472                        },
473                        row_count: None,
474                        duration_ms: status.duration_ms,
475                        output_path: None,
476                        error: status.error.clone(),
477                    },
478                );
479            }
480        }
481
482        WorkspaceResult {
483            workspace_name: workspace.name.clone(),
484            workspace_id: workspace.workspace_id.clone(),
485            status: ExecutionStatus::Success,
486            step_results,
487            step_handles,
488            duration_ms: start.elapsed().as_millis() as u64,
489            failed_step: None,
490            failure_reason: None,
491        }
492    }
493}
494
495/// Convert acquisition step statuses to StepResult map
496fn convert_step_statuses(
497    statuses: &HashMap<String, super::acquisition::StepExecutionStatus>,
498) -> HashMap<String, StepResult> {
499    statuses
500        .iter()
501        .map(|(name, status)| {
502            (
503                name.clone(),
504                StepResult {
505                    name: name.clone(),
506                    phase: ExecutionPhase::Acquisition,
507                    status: match status.status {
508                        super::acquisition::AcquisitionStepStatus::Success => super::types::StepStatus::Success,
509                        super::acquisition::AcquisitionStepStatus::Failed => super::types::StepStatus::Failed,
510                        super::acquisition::AcquisitionStepStatus::Skipped => super::types::StepStatus::Skipped,
511                    },
512                    row_count: status.row_count,
513                    duration_ms: status.duration_ms,
514                    output_path: None,
515                    error: status.error.clone(),
516                },
517            )
518        })
519        .collect()
520}