kql_panopticon/execution/
progress.rs

1//! Progress reporting for execution engines
2//!
3//! Provides a unified progress reporting system that works for both
4//! query packs and investigation packs.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc;
9use uuid::Uuid;
10
11/// Progress update message sent during execution
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum ProgressUpdate {
14    // === Lifecycle Events ===
15
16    /// Execution has started
17    Started {
18        job_id: Uuid,
19        job_type: JobType,
20        total_steps: usize,
21        total_workspaces: usize,
22        timestamp: DateTime<Utc>,
23    },
24
25    /// Execution completed successfully
26    Completed {
27        job_id: Uuid,
28        duration_ms: u64,
29        timestamp: DateTime<Utc>,
30    },
31
32    /// Execution failed
33    Failed {
34        job_id: Uuid,
35        error: String,
36        timestamp: DateTime<Utc>,
37    },
38
39    // === Workspace Lifecycle Events ===
40
41    /// Workspace execution started
42    WorkspaceStarted {
43        job_id: Uuid,
44        workspace: String,
45        timestamp: DateTime<Utc>,
46    },
47
48    /// Workspace execution completed
49    WorkspaceCompleted {
50        job_id: Uuid,
51        workspace: String,
52        duration_ms: u64,
53        timestamp: DateTime<Utc>,
54    },
55
56    // === Step-Level Events ===
57
58    /// A step/query has started
59    StepStarted {
60        job_id: Uuid,
61        step_name: String,
62        workspace: String,
63        phase: ExecutionPhase,
64        timestamp: DateTime<Utc>,
65    },
66
67    /// A step/query completed successfully
68    StepCompleted {
69        job_id: Uuid,
70        step_name: String,
71        workspace: String,
72        phase: ExecutionPhase,
73        rows: usize,
74        duration_ms: u64,
75        timestamp: DateTime<Utc>,
76    },
77
78    /// A step/query failed
79    StepFailed {
80        job_id: Uuid,
81        step_name: String,
82        workspace: String,
83        phase: ExecutionPhase,
84        error: String,
85        timestamp: DateTime<Utc>,
86    },
87
88    /// A step was skipped (condition not met, dependency failed, etc.)
89    StepSkipped {
90        job_id: Uuid,
91        step_name: String,
92        workspace: String,
93        phase: ExecutionPhase,
94        reason: String,
95        timestamp: DateTime<Utc>,
96    },
97
98    // === Investigation-Specific Events ===
99
100    /// Variables extracted from a step (investigation only)
101    VariablesExtracted {
102        job_id: Uuid,
103        step_name: String,
104        workspace: String,
105        variables: Vec<String>,
106        timestamp: DateTime<Utc>,
107    },
108
109    /// Condition evaluated (investigation only)
110    ConditionEvaluated {
111        job_id: Uuid,
112        step_name: String,
113        condition: String,
114        result: bool,
115        timestamp: DateTime<Utc>,
116    },
117
118    /// Foreach iteration progress (investigation only)
119    ForeachProgress {
120        job_id: Uuid,
121        step_name: String,
122        workspace: String,
123        current: usize,
124        total: usize,
125        timestamp: DateTime<Utc>,
126    },
127
128    /// HTTP step executed (investigation only)
129    HttpExecuted {
130        job_id: Uuid,
131        step_name: String,
132        url: String,
133        status: u16,
134        duration_ms: u64,
135        timestamp: DateTime<Utc>,
136    },
137
138    // === Debug/Verbose Events ===
139
140    /// Debug message (only when verbose mode is enabled)
141    Debug {
142        job_id: Uuid,
143        message: String,
144        timestamp: DateTime<Utc>,
145    },
146}
147
148impl ProgressUpdate {
149    /// Get the job ID from any progress update
150    pub fn job_id(&self) -> Uuid {
151        match self {
152            Self::Started { job_id, .. }
153            | Self::Completed { job_id, .. }
154            | Self::Failed { job_id, .. }
155            | Self::WorkspaceStarted { job_id, .. }
156            | Self::WorkspaceCompleted { job_id, .. }
157            | Self::StepStarted { job_id, .. }
158            | Self::StepCompleted { job_id, .. }
159            | Self::StepFailed { job_id, .. }
160            | Self::StepSkipped { job_id, .. }
161            | Self::VariablesExtracted { job_id, .. }
162            | Self::ConditionEvaluated { job_id, .. }
163            | Self::ForeachProgress { job_id, .. }
164            | Self::HttpExecuted { job_id, .. }
165            | Self::Debug { job_id, .. } => *job_id,
166        }
167    }
168
169    /// Get the timestamp from any progress update
170    pub fn timestamp(&self) -> DateTime<Utc> {
171        match self {
172            Self::Started { timestamp, .. }
173            | Self::Completed { timestamp, .. }
174            | Self::Failed { timestamp, .. }
175            | Self::WorkspaceStarted { timestamp, .. }
176            | Self::WorkspaceCompleted { timestamp, .. }
177            | Self::StepStarted { timestamp, .. }
178            | Self::StepCompleted { timestamp, .. }
179            | Self::StepFailed { timestamp, .. }
180            | Self::StepSkipped { timestamp, .. }
181            | Self::VariablesExtracted { timestamp, .. }
182            | Self::ConditionEvaluated { timestamp, .. }
183            | Self::ForeachProgress { timestamp, .. }
184            | Self::HttpExecuted { timestamp, .. }
185            | Self::Debug { timestamp, .. } => *timestamp,
186        }
187    }
188
189    /// Check if this is an error event
190    pub fn is_error(&self) -> bool {
191        matches!(self, Self::Failed { .. } | Self::StepFailed { .. })
192    }
193
194    /// Create a debug message
195    pub fn debug(job_id: Uuid, message: impl Into<String>) -> Self {
196        Self::Debug {
197            job_id,
198            message: message.into(),
199            timestamp: Utc::now(),
200        }
201    }
202}
203
204/// Type of job being executed
205#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum JobType {
207    /// Simple query or query pack
208    Query,
209    /// Investigation pack with chained steps
210    Investigation,
211}
212
213impl std::fmt::Display for JobType {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Self::Query => write!(f, "Query"),
217            Self::Investigation => write!(f, "Investigation"),
218        }
219    }
220}
221
222/// Execution phase for progress tracking
223#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
224pub enum ExecutionPhase {
225    /// Data acquisition phase (KQL, HTTP, File steps)
226    #[default]
227    Acquisition,
228    /// Data processing phase (scoring, transforms)
229    Processing,
230    /// Report generation phase (templates)
231    Reporting,
232}
233
234impl std::fmt::Display for ExecutionPhase {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        match self {
237            Self::Acquisition => write!(f, "Acquisition"),
238            Self::Processing => write!(f, "Processing"),
239            Self::Reporting => write!(f, "Reporting"),
240        }
241    }
242}
243
244/// Wrapper around mpsc sender for progress updates
245///
246/// Provides helper methods for sending common progress events.
247#[derive(Clone)]
248pub struct ProgressSender {
249    tx: mpsc::UnboundedSender<ProgressUpdate>,
250    job_id: Uuid,
251}
252
253impl ProgressSender {
254    /// Create a new progress sender
255    pub fn new(tx: mpsc::UnboundedSender<ProgressUpdate>, job_id: Uuid) -> Self {
256        Self { tx, job_id }
257    }
258
259    /// Get the job ID
260    pub fn job_id(&self) -> Uuid {
261        self.job_id
262    }
263
264    /// Send a progress update (ignores send errors)
265    pub fn send(&self, update: ProgressUpdate) {
266        let _ = self.tx.send(update);
267    }
268
269    /// Send the started event
270    pub fn started(&self, job_type: JobType, total_steps: usize, total_workspaces: usize) {
271        self.send(ProgressUpdate::Started {
272            job_id: self.job_id,
273            job_type,
274            total_steps,
275            total_workspaces,
276            timestamp: Utc::now(),
277        });
278    }
279
280    /// Send the completed event
281    pub fn completed(&self, duration_ms: u64) {
282        self.send(ProgressUpdate::Completed {
283            job_id: self.job_id,
284            duration_ms,
285            timestamp: Utc::now(),
286        });
287    }
288
289    /// Send the failed event
290    pub fn failed(&self, error: impl Into<String>) {
291        self.send(ProgressUpdate::Failed {
292            job_id: self.job_id,
293            error: error.into(),
294            timestamp: Utc::now(),
295        });
296    }
297
298    /// Send workspace started event
299    pub fn workspace_started(&self, workspace: impl Into<String>) {
300        self.send(ProgressUpdate::WorkspaceStarted {
301            job_id: self.job_id,
302            workspace: workspace.into(),
303            timestamp: Utc::now(),
304        });
305    }
306
307    /// Send workspace completed event
308    pub fn workspace_completed(&self, workspace: impl Into<String>, duration_ms: u64) {
309        self.send(ProgressUpdate::WorkspaceCompleted {
310            job_id: self.job_id,
311            workspace: workspace.into(),
312            duration_ms,
313            timestamp: Utc::now(),
314        });
315    }
316
317    /// Send step started event
318    pub fn step_started(
319        &self,
320        step_name: impl Into<String>,
321        workspace: impl Into<String>,
322        phase: ExecutionPhase,
323    ) {
324        self.send(ProgressUpdate::StepStarted {
325            job_id: self.job_id,
326            step_name: step_name.into(),
327            workspace: workspace.into(),
328            phase,
329            timestamp: Utc::now(),
330        });
331    }
332
333    /// Send step completed event
334    pub fn step_completed(
335        &self,
336        step_name: impl Into<String>,
337        workspace: impl Into<String>,
338        phase: ExecutionPhase,
339        rows: usize,
340        duration_ms: u64,
341    ) {
342        self.send(ProgressUpdate::StepCompleted {
343            job_id: self.job_id,
344            step_name: step_name.into(),
345            workspace: workspace.into(),
346            phase,
347            rows,
348            duration_ms,
349            timestamp: Utc::now(),
350        });
351    }
352
353    /// Send step failed event
354    pub fn step_failed(
355        &self,
356        step_name: impl Into<String>,
357        workspace: impl Into<String>,
358        phase: ExecutionPhase,
359        error: impl Into<String>,
360    ) {
361        self.send(ProgressUpdate::StepFailed {
362            job_id: self.job_id,
363            step_name: step_name.into(),
364            workspace: workspace.into(),
365            phase,
366            error: error.into(),
367            timestamp: Utc::now(),
368        });
369    }
370
371    /// Send step skipped event
372    pub fn step_skipped(
373        &self,
374        step_name: impl Into<String>,
375        workspace: impl Into<String>,
376        phase: ExecutionPhase,
377        reason: impl Into<String>,
378    ) {
379        self.send(ProgressUpdate::StepSkipped {
380            job_id: self.job_id,
381            step_name: step_name.into(),
382            workspace: workspace.into(),
383            phase,
384            reason: reason.into(),
385            timestamp: Utc::now(),
386        });
387    }
388
389    /// Send debug message
390    pub fn debug(&self, message: impl Into<String>) {
391        self.send(ProgressUpdate::debug(self.job_id, message));
392    }
393
394    /// Send foreach progress event
395    pub fn foreach_progress(
396        &self,
397        step_name: impl Into<String>,
398        workspace: impl Into<String>,
399        current: usize,
400        total: usize,
401    ) {
402        self.send(ProgressUpdate::ForeachProgress {
403            job_id: self.job_id,
404            step_name: step_name.into(),
405            workspace: workspace.into(),
406            current,
407            total,
408            timestamp: Utc::now(),
409        });
410    }
411}
412
413/// Type alias for the receiver side
414pub type ProgressReceiver = mpsc::UnboundedReceiver<ProgressUpdate>;
415
416/// Create a new progress channel
417pub fn progress_channel(job_id: Uuid) -> (ProgressSender, ProgressReceiver) {
418    let (tx, rx) = mpsc::unbounded_channel();
419    (ProgressSender::new(tx, job_id), rx)
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425
426    #[tokio::test]
427    async fn test_progress_sender() {
428        let job_id = Uuid::new_v4();
429        let (sender, mut receiver) = progress_channel(job_id);
430
431        sender.started(JobType::Query, 5, 3);
432        sender.workspace_started("workspace1");
433        sender.step_started("query1", "workspace1", ExecutionPhase::Acquisition);
434        sender.step_completed("query1", "workspace1", ExecutionPhase::Acquisition, 100, 500);
435        sender.workspace_completed("workspace1", 600);
436        sender.completed(1000);
437
438        let mut count = 0;
439        while let Ok(update) = receiver.try_recv() {
440            assert_eq!(update.job_id(), job_id);
441            count += 1;
442        }
443
444        assert_eq!(count, 6);
445    }
446}