1use crate::client::Client;
9use crate::error::Result;
10use crate::pack::Pack;
11use crate::workspace::Workspace;
12
13use super::acquisition::steps::{FileStepHandler, HttpStepHandler, KqlStepHandler};
14use super::acquisition::AcquisitionPhaseHandler;
15use super::processing::steps::ScoringStepHandler;
16use super::processing::ProcessingPhaseHandler;
17use super::progress::{JobType, ProgressSender};
18use super::reporting::steps::TemplateStepHandler;
19use super::reporting::{ReportingConfig, ReportingPhaseHandler};
20use super::result::ResultContext;
21use super::trace::{ExecutionTrace, TraceStatus};
22use super::types::{
23 ExecutionPhase, ExecutionStatus, PackExecutorConfig, PackExecutorResult, StepResult,
24 WorkspaceResult,
25};
26
27use chrono::Local;
28use std::collections::HashMap;
29use std::path::PathBuf;
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32use tokio::fs;
33use tracing::{debug, info};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
37pub enum ExecutionMode {
38 #[default]
40 Normal,
41 DryRun,
43 Debug,
45}
46
47#[derive(Debug, Clone)]
49pub struct ExecutionOptions {
50 pub mode: ExecutionMode,
52 pub trace: bool,
54 pub timeout: Duration,
56}
57
58impl Default for ExecutionOptions {
59 fn default() -> Self {
60 Self {
61 mode: ExecutionMode::Normal,
62 trace: false,
63 timeout: Duration::from_secs(120),
64 }
65 }
66}
67
68pub struct PackExecutor {
75 acquisition: AcquisitionPhaseHandler,
77
78 processing: ProcessingPhaseHandler,
80
81 reporting: ReportingPhaseHandler,
83
84 options: ExecutionOptions,
86}
87
88impl PackExecutor {
89 pub fn new(client: Client) -> Self {
91 let client = Arc::new(client);
92
93 let mut acquisition = AcquisitionPhaseHandler::new();
95 acquisition.register(Box::new(KqlStepHandler::new(client)));
96 acquisition.register(Box::new(HttpStepHandler::new()));
97 acquisition.register(Box::new(FileStepHandler::new()));
98
99 let mut processing = ProcessingPhaseHandler::new();
101 processing.register(Box::new(ScoringStepHandler::new()));
102
103 let mut reporting = ReportingPhaseHandler::new();
105 reporting.register(Box::new(TemplateStepHandler::new()));
106
107 Self {
108 acquisition,
109 processing,
110 reporting,
111 options: ExecutionOptions::default(),
112 }
113 }
114
115 pub fn with_options(client: Client, options: ExecutionOptions) -> Self {
117 let mut executor = Self::new(client);
118 executor.options = options;
119 executor
120 }
121
122 pub fn set_options(&mut self, options: ExecutionOptions) {
124 self.options = options;
125 }
126
127 pub fn with_timeout(mut self, timeout: Duration) -> Self {
129 self.options.timeout = timeout;
130 self
131 }
132
133 pub fn validate(&self, config: &PackExecutorConfig) -> Result<()> {
138 let pack = &config.pack;
139
140 self.acquisition.validate(&pack.acquisition)?;
142
143 if let Some(ref processing) = pack.processing {
145 self.processing.validate(processing)?;
146 }
147
148 if let Some(ref reporting) = pack.reporting {
150 self.reporting.validate(reporting)?;
151 }
152
153 Ok(())
154 }
155
156 pub async fn execute(
158 &self,
159 config: PackExecutorConfig,
160 workspaces: Vec<Workspace>,
161 progress: Option<ProgressSender>,
162 ) -> Result<PackExecutorResult> {
163 let job_id = uuid::Uuid::new_v4();
164 let start = Instant::now();
165 let timestamp = Local::now().format("%Y%m%d_%H%M%S").to_string();
166
167 let pack = &config.pack;
168 let mut workspace_results: HashMap<String, WorkspaceResult> = HashMap::new();
169 let mut trace = if self.options.trace {
170 Some(ExecutionTrace::new())
171 } else {
172 None
173 };
174
175 info!(
176 job_id = %job_id,
177 pack = %pack.name,
178 workspaces = workspaces.len(),
179 "Starting pack execution"
180 );
181
182 if let Some(ref tx) = progress {
184 tx.started(JobType::Investigation, pack.acquisition.steps.len(), workspaces.len());
185 }
186
187 for workspace in &workspaces {
189 debug!(
190 workspace = %workspace.name,
191 workspace_id = %workspace.workspace_id,
192 "Starting workspace execution"
193 );
194
195 if let Some(ref tx) = progress {
197 tx.workspace_started(&workspace.name);
198 }
199
200 let ws_result = self
201 .execute_workspace(pack, workspace, &config, ×tamp, progress.as_ref())
202 .await;
203
204 if let Some(ref tx) = progress {
206 tx.workspace_completed(&workspace.name, ws_result.duration_ms);
207 }
208
209 workspace_results.insert(workspace.name.clone(), ws_result);
210 }
211
212 let all_failed = workspace_results
214 .values()
215 .all(|r| r.status == ExecutionStatus::Failed);
216 let all_succeeded = workspace_results
217 .values()
218 .all(|r| r.status == ExecutionStatus::Success);
219
220 let overall_status = if all_failed {
221 ExecutionStatus::Failed
222 } else if all_succeeded {
223 ExecutionStatus::Success
224 } else {
225 ExecutionStatus::Partial
226 };
227
228 let duration_ms = start.elapsed().as_millis() as u64;
229
230 if let Some(ref tx) = progress {
232 tx.completed(duration_ms);
233 }
234
235 info!(
236 job_id = %job_id,
237 status = ?overall_status,
238 duration_ms = duration_ms,
239 "Pack execution completed"
240 );
241
242 if let Some(ref mut t) = trace {
244 t.set_status(match overall_status {
245 ExecutionStatus::Success => TraceStatus::Success,
246 ExecutionStatus::Failed => TraceStatus::Failed,
247 ExecutionStatus::Partial => TraceStatus::Partial,
248 ExecutionStatus::Pending | ExecutionStatus::Running => TraceStatus::Running,
249 });
250 }
251
252 let output_dir = config.output_dir.clone();
254
255 Ok(PackExecutorResult {
256 job_id,
257 pack_name: pack.name.clone(),
258 status: overall_status,
259 workspace_results,
260 duration_ms,
261 output_dir,
262 trace,
263 })
264 }
265
266 async fn execute_workspace(
268 &self,
269 pack: &Pack,
270 workspace: &Workspace,
271 config: &PackExecutorConfig,
272 timestamp: &str,
273 progress: Option<&ProgressSender>,
274 ) -> WorkspaceResult {
275 let start = Instant::now();
276
277 let output_dir = config
279 .output_dir
280 .clone()
281 .unwrap_or_else(|| PathBuf::from("./output"));
282
283 let workspace_output_dir = output_dir
284 .join(Workspace::normalize_name(&workspace.subscription_name))
285 .join(Workspace::normalize_name(&workspace.name))
286 .join(timestamp);
287
288 if let Err(e) = fs::create_dir_all(&workspace_output_dir).await {
290 return WorkspaceResult {
291 workspace_name: workspace.name.clone(),
292 workspace_id: workspace.workspace_id.clone(),
293 status: ExecutionStatus::Failed,
294 step_results: HashMap::new(),
295 step_handles: ResultContext::new(),
296 duration_ms: start.elapsed().as_millis() as u64,
297 failed_step: None,
298 failure_reason: Some(format!("Failed to create output directory: {}", e)),
299 };
300 }
301
302 debug!(phase = "acquisition", "Starting acquisition phase");
304
305 let acq_output = self
306 .acquisition
307 .execute(
308 pack,
309 workspace,
310 config.inputs.clone(),
311 &workspace_output_dir,
312 progress,
313 )
314 .await;
315
316 let acq_output = match acq_output {
317 Ok(output) => output,
318 Err(e) => {
319 return WorkspaceResult {
320 workspace_name: workspace.name.clone(),
321 workspace_id: workspace.workspace_id.clone(),
322 status: ExecutionStatus::Failed,
323 step_results: HashMap::new(),
324 step_handles: ResultContext::new(),
325 duration_ms: start.elapsed().as_millis() as u64,
326 failed_step: None,
327 failure_reason: Some(format!("Acquisition phase failed: {}", e)),
328 };
329 }
330 };
331
332 if acq_output.failed_step.is_some() {
334 return WorkspaceResult {
335 workspace_name: workspace.name.clone(),
336 workspace_id: workspace.workspace_id.clone(),
337 status: ExecutionStatus::Failed,
338 step_results: convert_step_statuses(&acq_output.step_statuses),
339 step_handles: acq_output.results,
340 duration_ms: start.elapsed().as_millis() as u64,
341 failed_step: acq_output.failed_step,
342 failure_reason: acq_output.failure_reason,
343 };
344 }
345
346 let proc_output = if let Some(ref processing) = pack.processing {
348 if !processing.is_empty() {
349 debug!(phase = "processing", "Starting processing phase");
350
351 let output = self
352 .processing
353 .execute(processing, &acq_output.results, &workspace_output_dir, progress)
354 .await;
355
356 match output {
357 Ok(o) => Some(o),
358 Err(e) => {
359 return WorkspaceResult {
360 workspace_name: workspace.name.clone(),
361 workspace_id: workspace.workspace_id.clone(),
362 status: ExecutionStatus::Failed,
363 step_results: convert_step_statuses(&acq_output.step_statuses),
364 step_handles: acq_output.results,
365 duration_ms: start.elapsed().as_millis() as u64,
366 failed_step: None,
367 failure_reason: Some(format!("Processing phase failed: {}", e)),
368 };
369 }
370 }
371 } else {
372 None
373 }
374 } else {
375 None
376 };
377
378 let rep_output = if let Some(ref reporting) = pack.reporting {
380 if !reporting.is_empty() {
381 debug!(phase = "reporting", "Starting reporting phase");
382
383 let mut report_config = ReportingConfig::new(
385 reporting,
386 &pack.name,
387 workspace,
388 &config.inputs,
389 &acq_output.results,
390 &workspace_output_dir,
391 );
392
393 if let Some(proc) = proc_output.as_ref() {
394 report_config = report_config.with_processing_results(&proc.results);
395 }
396
397 if let Some(path) = config.pack_path.as_deref() {
398 report_config = report_config.with_pack_path(path);
399 }
400
401 match self.reporting.execute(report_config, progress).await
403 {
404 Ok(output) => Some(output),
405 Err(e) => {
406 tracing::warn!(error = %e, "Reporting phase failed");
408 None
409 }
410 }
411 } else {
412 None
413 }
414 } else {
415 None
416 };
417
418 let mut step_results = convert_step_statuses(&acq_output.step_statuses);
420
421 let mut step_handles = acq_output.results;
423 if let Some(proc) = proc_output {
424 step_handles.merge(proc.results);
426
427 for (name, status) in &proc.step_statuses {
429 step_results.insert(
430 name.clone(),
431 StepResult {
432 name: name.clone(),
433 phase: ExecutionPhase::Processing,
434 status: match status.status {
435 super::processing::ProcessingStatus::Success => {
436 super::types::StepStatus::Success
437 }
438 super::processing::ProcessingStatus::Failed => {
439 super::types::StepStatus::Failed
440 }
441 super::processing::ProcessingStatus::Skipped => {
442 super::types::StepStatus::Skipped
443 }
444 },
445 row_count: None,
446 duration_ms: status.duration_ms,
447 output_path: None,
448 error: status.error.clone(),
449 },
450 );
451 }
452 }
453
454 if let Some(rep) = rep_output {
456 for (name, status) in &rep.report_statuses {
457 step_results.insert(
458 name.clone(),
459 StepResult {
460 name: name.clone(),
461 phase: ExecutionPhase::Reporting,
462 status: match status.status {
463 super::reporting::ReportingStatus::Success => {
464 super::types::StepStatus::Success
465 }
466 super::reporting::ReportingStatus::Failed => {
467 super::types::StepStatus::Failed
468 }
469 super::reporting::ReportingStatus::Skipped => {
470 super::types::StepStatus::Skipped
471 }
472 },
473 row_count: None,
474 duration_ms: status.duration_ms,
475 output_path: None,
476 error: status.error.clone(),
477 },
478 );
479 }
480 }
481
482 WorkspaceResult {
483 workspace_name: workspace.name.clone(),
484 workspace_id: workspace.workspace_id.clone(),
485 status: ExecutionStatus::Success,
486 step_results,
487 step_handles,
488 duration_ms: start.elapsed().as_millis() as u64,
489 failed_step: None,
490 failure_reason: None,
491 }
492 }
493}
494
495fn convert_step_statuses(
497 statuses: &HashMap<String, super::acquisition::StepExecutionStatus>,
498) -> HashMap<String, StepResult> {
499 statuses
500 .iter()
501 .map(|(name, status)| {
502 (
503 name.clone(),
504 StepResult {
505 name: name.clone(),
506 phase: ExecutionPhase::Acquisition,
507 status: match status.status {
508 super::acquisition::AcquisitionStepStatus::Success => super::types::StepStatus::Success,
509 super::acquisition::AcquisitionStepStatus::Failed => super::types::StepStatus::Failed,
510 super::acquisition::AcquisitionStepStatus::Skipped => super::types::StepStatus::Skipped,
511 },
512 row_count: status.row_count,
513 duration_ms: status.duration_ms,
514 output_path: None,
515 error: status.error.clone(),
516 },
517 )
518 })
519 .collect()
520}