1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::mpsc;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub enum ProgressUpdate {
14 Started {
18 job_id: Uuid,
19 job_type: JobType,
20 total_steps: usize,
21 total_workspaces: usize,
22 timestamp: DateTime<Utc>,
23 },
24
25 Completed {
27 job_id: Uuid,
28 duration_ms: u64,
29 timestamp: DateTime<Utc>,
30 },
31
32 Failed {
34 job_id: Uuid,
35 error: String,
36 timestamp: DateTime<Utc>,
37 },
38
39 WorkspaceStarted {
43 job_id: Uuid,
44 workspace: String,
45 timestamp: DateTime<Utc>,
46 },
47
48 WorkspaceCompleted {
50 job_id: Uuid,
51 workspace: String,
52 duration_ms: u64,
53 timestamp: DateTime<Utc>,
54 },
55
56 StepStarted {
60 job_id: Uuid,
61 step_name: String,
62 workspace: String,
63 phase: ExecutionPhase,
64 timestamp: DateTime<Utc>,
65 },
66
67 StepCompleted {
69 job_id: Uuid,
70 step_name: String,
71 workspace: String,
72 phase: ExecutionPhase,
73 rows: usize,
74 duration_ms: u64,
75 timestamp: DateTime<Utc>,
76 },
77
78 StepFailed {
80 job_id: Uuid,
81 step_name: String,
82 workspace: String,
83 phase: ExecutionPhase,
84 error: String,
85 timestamp: DateTime<Utc>,
86 },
87
88 StepSkipped {
90 job_id: Uuid,
91 step_name: String,
92 workspace: String,
93 phase: ExecutionPhase,
94 reason: String,
95 timestamp: DateTime<Utc>,
96 },
97
98 VariablesExtracted {
102 job_id: Uuid,
103 step_name: String,
104 workspace: String,
105 variables: Vec<String>,
106 timestamp: DateTime<Utc>,
107 },
108
109 ConditionEvaluated {
111 job_id: Uuid,
112 step_name: String,
113 condition: String,
114 result: bool,
115 timestamp: DateTime<Utc>,
116 },
117
118 ForeachProgress {
120 job_id: Uuid,
121 step_name: String,
122 workspace: String,
123 current: usize,
124 total: usize,
125 timestamp: DateTime<Utc>,
126 },
127
128 HttpExecuted {
130 job_id: Uuid,
131 step_name: String,
132 url: String,
133 status: u16,
134 duration_ms: u64,
135 timestamp: DateTime<Utc>,
136 },
137
138 Debug {
142 job_id: Uuid,
143 message: String,
144 timestamp: DateTime<Utc>,
145 },
146}
147
148impl ProgressUpdate {
149 pub fn job_id(&self) -> Uuid {
151 match self {
152 Self::Started { job_id, .. }
153 | Self::Completed { job_id, .. }
154 | Self::Failed { job_id, .. }
155 | Self::WorkspaceStarted { job_id, .. }
156 | Self::WorkspaceCompleted { job_id, .. }
157 | Self::StepStarted { job_id, .. }
158 | Self::StepCompleted { job_id, .. }
159 | Self::StepFailed { job_id, .. }
160 | Self::StepSkipped { job_id, .. }
161 | Self::VariablesExtracted { job_id, .. }
162 | Self::ConditionEvaluated { job_id, .. }
163 | Self::ForeachProgress { job_id, .. }
164 | Self::HttpExecuted { job_id, .. }
165 | Self::Debug { job_id, .. } => *job_id,
166 }
167 }
168
169 pub fn timestamp(&self) -> DateTime<Utc> {
171 match self {
172 Self::Started { timestamp, .. }
173 | Self::Completed { timestamp, .. }
174 | Self::Failed { timestamp, .. }
175 | Self::WorkspaceStarted { timestamp, .. }
176 | Self::WorkspaceCompleted { timestamp, .. }
177 | Self::StepStarted { timestamp, .. }
178 | Self::StepCompleted { timestamp, .. }
179 | Self::StepFailed { timestamp, .. }
180 | Self::StepSkipped { timestamp, .. }
181 | Self::VariablesExtracted { timestamp, .. }
182 | Self::ConditionEvaluated { timestamp, .. }
183 | Self::ForeachProgress { timestamp, .. }
184 | Self::HttpExecuted { timestamp, .. }
185 | Self::Debug { timestamp, .. } => *timestamp,
186 }
187 }
188
189 pub fn is_error(&self) -> bool {
191 matches!(self, Self::Failed { .. } | Self::StepFailed { .. })
192 }
193
194 pub fn debug(job_id: Uuid, message: impl Into<String>) -> Self {
196 Self::Debug {
197 job_id,
198 message: message.into(),
199 timestamp: Utc::now(),
200 }
201 }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
206pub enum JobType {
207 Query,
209 Investigation,
211}
212
213impl std::fmt::Display for JobType {
214 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 match self {
216 Self::Query => write!(f, "Query"),
217 Self::Investigation => write!(f, "Investigation"),
218 }
219 }
220}
221
222#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
224pub enum ExecutionPhase {
225 #[default]
227 Acquisition,
228 Processing,
230 Reporting,
232}
233
234impl std::fmt::Display for ExecutionPhase {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 match self {
237 Self::Acquisition => write!(f, "Acquisition"),
238 Self::Processing => write!(f, "Processing"),
239 Self::Reporting => write!(f, "Reporting"),
240 }
241 }
242}
243
244#[derive(Clone)]
248pub struct ProgressSender {
249 tx: mpsc::UnboundedSender<ProgressUpdate>,
250 job_id: Uuid,
251}
252
253impl ProgressSender {
254 pub fn new(tx: mpsc::UnboundedSender<ProgressUpdate>, job_id: Uuid) -> Self {
256 Self { tx, job_id }
257 }
258
259 pub fn job_id(&self) -> Uuid {
261 self.job_id
262 }
263
264 pub fn send(&self, update: ProgressUpdate) {
266 let _ = self.tx.send(update);
267 }
268
269 pub fn started(&self, job_type: JobType, total_steps: usize, total_workspaces: usize) {
271 self.send(ProgressUpdate::Started {
272 job_id: self.job_id,
273 job_type,
274 total_steps,
275 total_workspaces,
276 timestamp: Utc::now(),
277 });
278 }
279
280 pub fn completed(&self, duration_ms: u64) {
282 self.send(ProgressUpdate::Completed {
283 job_id: self.job_id,
284 duration_ms,
285 timestamp: Utc::now(),
286 });
287 }
288
289 pub fn failed(&self, error: impl Into<String>) {
291 self.send(ProgressUpdate::Failed {
292 job_id: self.job_id,
293 error: error.into(),
294 timestamp: Utc::now(),
295 });
296 }
297
298 pub fn workspace_started(&self, workspace: impl Into<String>) {
300 self.send(ProgressUpdate::WorkspaceStarted {
301 job_id: self.job_id,
302 workspace: workspace.into(),
303 timestamp: Utc::now(),
304 });
305 }
306
307 pub fn workspace_completed(&self, workspace: impl Into<String>, duration_ms: u64) {
309 self.send(ProgressUpdate::WorkspaceCompleted {
310 job_id: self.job_id,
311 workspace: workspace.into(),
312 duration_ms,
313 timestamp: Utc::now(),
314 });
315 }
316
317 pub fn step_started(
319 &self,
320 step_name: impl Into<String>,
321 workspace: impl Into<String>,
322 phase: ExecutionPhase,
323 ) {
324 self.send(ProgressUpdate::StepStarted {
325 job_id: self.job_id,
326 step_name: step_name.into(),
327 workspace: workspace.into(),
328 phase,
329 timestamp: Utc::now(),
330 });
331 }
332
333 pub fn step_completed(
335 &self,
336 step_name: impl Into<String>,
337 workspace: impl Into<String>,
338 phase: ExecutionPhase,
339 rows: usize,
340 duration_ms: u64,
341 ) {
342 self.send(ProgressUpdate::StepCompleted {
343 job_id: self.job_id,
344 step_name: step_name.into(),
345 workspace: workspace.into(),
346 phase,
347 rows,
348 duration_ms,
349 timestamp: Utc::now(),
350 });
351 }
352
353 pub fn step_failed(
355 &self,
356 step_name: impl Into<String>,
357 workspace: impl Into<String>,
358 phase: ExecutionPhase,
359 error: impl Into<String>,
360 ) {
361 self.send(ProgressUpdate::StepFailed {
362 job_id: self.job_id,
363 step_name: step_name.into(),
364 workspace: workspace.into(),
365 phase,
366 error: error.into(),
367 timestamp: Utc::now(),
368 });
369 }
370
371 pub fn step_skipped(
373 &self,
374 step_name: impl Into<String>,
375 workspace: impl Into<String>,
376 phase: ExecutionPhase,
377 reason: impl Into<String>,
378 ) {
379 self.send(ProgressUpdate::StepSkipped {
380 job_id: self.job_id,
381 step_name: step_name.into(),
382 workspace: workspace.into(),
383 phase,
384 reason: reason.into(),
385 timestamp: Utc::now(),
386 });
387 }
388
389 pub fn debug(&self, message: impl Into<String>) {
391 self.send(ProgressUpdate::debug(self.job_id, message));
392 }
393
394 pub fn foreach_progress(
396 &self,
397 step_name: impl Into<String>,
398 workspace: impl Into<String>,
399 current: usize,
400 total: usize,
401 ) {
402 self.send(ProgressUpdate::ForeachProgress {
403 job_id: self.job_id,
404 step_name: step_name.into(),
405 workspace: workspace.into(),
406 current,
407 total,
408 timestamp: Utc::now(),
409 });
410 }
411}
412
413pub type ProgressReceiver = mpsc::UnboundedReceiver<ProgressUpdate>;
415
416pub fn progress_channel(job_id: Uuid) -> (ProgressSender, ProgressReceiver) {
418 let (tx, rx) = mpsc::unbounded_channel();
419 (ProgressSender::new(tx, job_id), rx)
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425
426 #[tokio::test]
427 async fn test_progress_sender() {
428 let job_id = Uuid::new_v4();
429 let (sender, mut receiver) = progress_channel(job_id);
430
431 sender.started(JobType::Query, 5, 3);
432 sender.workspace_started("workspace1");
433 sender.step_started("query1", "workspace1", ExecutionPhase::Acquisition);
434 sender.step_completed("query1", "workspace1", ExecutionPhase::Acquisition, 100, 500);
435 sender.workspace_completed("workspace1", 600);
436 sender.completed(1000);
437
438 let mut count = 0;
439 while let Ok(update) = receiver.try_recv() {
440 assert_eq!(update.job_id(), job_id);
441 count += 1;
442 }
443
444 assert_eq!(count, 6);
445 }
446}