Skip to main content

oxigdal_workflow/monitoring/
mod.rs

1//! Workflow monitoring and observability.
2//!
3//! Provides comprehensive monitoring capabilities including:
4//! - Real-time metrics collection
5//! - Execution history tracking
6//! - DAG visualization
7//! - Performance profiling
8//! - Bottleneck detection
9
10pub mod debugging;
11pub mod logging;
12pub mod metrics;
13pub mod visualization;
14
15use crate::error::Result;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::time::Duration;
20
21pub use debugging::{DebugInfo, DebugSession, Debugger};
22pub use logging::{LogEntry, LogLevel, WorkflowLogger};
23pub use metrics::{MetricsCollector, WorkflowMetrics};
24pub use visualization::{DagVisualizer, GraphFormat, VisualizationConfig};
25
26/// Execution history entry.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ExecutionHistory {
29    /// Execution ID.
30    pub execution_id: String,
31    /// Workflow ID.
32    pub workflow_id: String,
33    /// Workflow name.
34    pub workflow_name: String,
35    /// Execution start time.
36    pub start_time: DateTime<Utc>,
37    /// Execution end time.
38    pub end_time: Option<DateTime<Utc>>,
39    /// Execution duration.
40    pub duration: Option<Duration>,
41    /// Execution status.
42    pub status: ExecutionHistoryStatus,
43    /// Task execution records.
44    pub tasks: Vec<TaskExecutionRecord>,
45    /// Total tasks count.
46    pub total_tasks: usize,
47    /// Completed tasks count.
48    pub completed_tasks: usize,
49    /// Failed tasks count.
50    pub failed_tasks: usize,
51    /// Execution metadata.
52    pub metadata: HashMap<String, String>,
53    /// Error message if failed.
54    pub error_message: Option<String>,
55}
56
57/// Execution history status.
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59pub enum ExecutionHistoryStatus {
60    /// Execution is running.
61    Running,
62    /// Execution completed successfully.
63    Success,
64    /// Execution failed.
65    Failed,
66    /// Execution was cancelled.
67    Cancelled,
68    /// Execution timed out.
69    TimedOut,
70    /// Execution is paused.
71    Paused,
72}
73
74/// Task execution record.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct TaskExecutionRecord {
77    /// Task ID.
78    pub task_id: String,
79    /// Task name.
80    pub task_name: String,
81    /// Task start time.
82    pub start_time: DateTime<Utc>,
83    /// Task end time.
84    pub end_time: Option<DateTime<Utc>>,
85    /// Task duration.
86    pub duration: Option<Duration>,
87    /// Task status.
88    pub status: TaskExecutionStatus,
89    /// Retry count.
90    pub retry_count: usize,
91    /// Task output size in bytes.
92    pub output_size_bytes: usize,
93    /// Peak memory usage in bytes.
94    pub peak_memory_bytes: Option<usize>,
95    /// CPU time in milliseconds.
96    pub cpu_time_ms: Option<u64>,
97    /// Error message if failed.
98    pub error_message: Option<String>,
99}
100
101/// Task execution status.
102#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
103pub enum TaskExecutionStatus {
104    /// Task is pending.
105    Pending,
106    /// Task is running.
107    Running,
108    /// Task completed successfully.
109    Success,
110    /// Task failed.
111    Failed,
112    /// Task was skipped.
113    Skipped,
114    /// Task was cancelled.
115    Cancelled,
116}
117
118/// Performance metrics for a workflow execution.
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct PerformanceMetrics {
121    /// Total execution time.
122    pub total_duration: Duration,
123    /// Time spent in task execution.
124    pub task_execution_time: Duration,
125    /// Time spent in scheduling/coordination.
126    pub coordination_overhead: Duration,
127    /// Average task duration.
128    pub avg_task_duration: Duration,
129    /// Longest task duration.
130    pub longest_task_duration: Duration,
131    /// Shortest task duration.
132    pub shortest_task_duration: Duration,
133    /// Parallelism factor (average concurrent tasks).
134    pub parallelism_factor: f64,
135    /// Throughput (tasks per second).
136    pub throughput: f64,
137    /// Critical path length.
138    pub critical_path_length: Duration,
139}
140
141/// Bottleneck analysis result.
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct BottleneckAnalysis {
144    /// Critical path tasks.
145    pub critical_path: Vec<String>,
146    /// Slowest tasks.
147    pub slowest_tasks: Vec<(String, Duration)>,
148    /// Tasks with high retry count.
149    pub high_retry_tasks: Vec<(String, usize)>,
150    /// Resource bottlenecks.
151    pub resource_bottlenecks: Vec<ResourceBottleneck>,
152    /// Suggestions for optimization.
153    pub suggestions: Vec<String>,
154}
155
156/// Resource bottleneck information.
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ResourceBottleneck {
159    /// Resource type (CPU, memory, I/O).
160    pub resource_type: String,
161    /// Affected tasks.
162    pub affected_tasks: Vec<String>,
163    /// Severity (0.0 - 1.0).
164    pub severity: f64,
165    /// Description.
166    pub description: String,
167}
168
169/// Monitoring service for workflow executions.
170pub struct MonitoringService {
171    metrics_collector: MetricsCollector,
172    logger: WorkflowLogger,
173    debugger: Debugger,
174    visualizer: DagVisualizer,
175}
176
177impl MonitoringService {
178    /// Create a new monitoring service.
179    pub fn new() -> Self {
180        Self {
181            metrics_collector: MetricsCollector::new(),
182            logger: WorkflowLogger::new(),
183            debugger: Debugger::new(),
184            visualizer: DagVisualizer::new(),
185        }
186    }
187
188    /// Get the metrics collector.
189    pub fn metrics(&self) -> &MetricsCollector {
190        &self.metrics_collector
191    }
192
193    /// Get the logger.
194    pub fn logger(&self) -> &WorkflowLogger {
195        &self.logger
196    }
197
198    /// Get the debugger.
199    pub fn debugger(&self) -> &Debugger {
200        &self.debugger
201    }
202
203    /// Get the visualizer.
204    pub fn visualizer(&self) -> &DagVisualizer {
205        &self.visualizer
206    }
207
208    /// Analyze execution performance.
209    pub fn analyze_performance(&self, history: &ExecutionHistory) -> Result<PerformanceMetrics> {
210        let total_duration = history
211            .duration
212            .ok_or_else(|| crate::error::WorkflowError::monitoring("Duration not available"))?;
213
214        let task_durations: Vec<Duration> =
215            history.tasks.iter().filter_map(|t| t.duration).collect();
216
217        if task_durations.is_empty() {
218            return Err(crate::error::WorkflowError::monitoring(
219                "No task durations available",
220            ));
221        }
222
223        let task_execution_time: Duration = task_durations.iter().sum();
224        let coordination_overhead = total_duration.saturating_sub(task_execution_time);
225
226        let avg_task_duration = task_execution_time
227            .checked_div(task_durations.len() as u32)
228            .unwrap_or(Duration::ZERO);
229
230        let longest_task_duration = task_durations
231            .iter()
232            .max()
233            .copied()
234            .unwrap_or(Duration::ZERO);
235
236        let shortest_task_duration = task_durations
237            .iter()
238            .min()
239            .copied()
240            .unwrap_or(Duration::ZERO);
241
242        let parallelism_factor = if total_duration.as_secs() > 0 {
243            task_execution_time.as_secs_f64() / total_duration.as_secs_f64()
244        } else {
245            0.0
246        };
247
248        let throughput = if total_duration.as_secs_f64() > 0.0 {
249            history.total_tasks as f64 / total_duration.as_secs_f64()
250        } else {
251            0.0
252        };
253
254        Ok(PerformanceMetrics {
255            total_duration,
256            task_execution_time,
257            coordination_overhead,
258            avg_task_duration,
259            longest_task_duration,
260            shortest_task_duration,
261            parallelism_factor,
262            throughput,
263            critical_path_length: longest_task_duration, // Simplified
264        })
265    }
266
267    /// Detect bottlenecks in execution.
268    pub fn detect_bottlenecks(&self, history: &ExecutionHistory) -> Result<BottleneckAnalysis> {
269        let mut slowest_tasks: Vec<(String, Duration)> = history
270            .tasks
271            .iter()
272            .filter_map(|t| t.duration.map(|d| (t.task_id.clone(), d)))
273            .collect();
274
275        slowest_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
276        slowest_tasks.truncate(5);
277
278        let mut high_retry_tasks: Vec<(String, usize)> = history
279            .tasks
280            .iter()
281            .filter(|t| t.retry_count > 0)
282            .map(|t| (t.task_id.clone(), t.retry_count))
283            .collect();
284
285        high_retry_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
286        high_retry_tasks.truncate(5);
287
288        let mut suggestions = Vec::new();
289
290        if !slowest_tasks.is_empty() {
291            suggestions.push(format!(
292                "Consider optimizing task '{}' which took {:?}",
293                slowest_tasks[0].0, slowest_tasks[0].1
294            ));
295        }
296
297        if !high_retry_tasks.is_empty() {
298            suggestions.push(format!(
299                "Task '{}' has {} retries, investigate failure causes",
300                high_retry_tasks[0].0, high_retry_tasks[0].1
301            ));
302        }
303
304        Ok(BottleneckAnalysis {
305            critical_path: Vec::new(), // Would need DAG structure to compute
306            slowest_tasks,
307            high_retry_tasks,
308            resource_bottlenecks: Vec::new(),
309            suggestions,
310        })
311    }
312}
313
314impl Default for MonitoringService {
315    fn default() -> Self {
316        Self::new()
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_monitoring_service_creation() {
326        let service = MonitoringService::new();
327        assert!(service.metrics().get_all_metrics().is_empty());
328    }
329
330    #[test]
331    fn test_execution_history_status() {
332        let status = ExecutionHistoryStatus::Running;
333        assert_eq!(status, ExecutionHistoryStatus::Running);
334    }
335
336    #[test]
337    fn test_performance_metrics() {
338        let history = ExecutionHistory {
339            execution_id: "exec1".to_string(),
340            workflow_id: "wf1".to_string(),
341            workflow_name: "Test Workflow".to_string(),
342            start_time: Utc::now(),
343            end_time: Some(Utc::now()),
344            duration: Some(Duration::from_secs(100)),
345            status: ExecutionHistoryStatus::Success,
346            tasks: vec![
347                TaskExecutionRecord {
348                    task_id: "task1".to_string(),
349                    task_name: "Task 1".to_string(),
350                    start_time: Utc::now(),
351                    end_time: Some(Utc::now()),
352                    duration: Some(Duration::from_secs(30)),
353                    status: TaskExecutionStatus::Success,
354                    retry_count: 0,
355                    output_size_bytes: 1024,
356                    peak_memory_bytes: None,
357                    cpu_time_ms: None,
358                    error_message: None,
359                },
360                TaskExecutionRecord {
361                    task_id: "task2".to_string(),
362                    task_name: "Task 2".to_string(),
363                    start_time: Utc::now(),
364                    end_time: Some(Utc::now()),
365                    duration: Some(Duration::from_secs(40)),
366                    status: TaskExecutionStatus::Success,
367                    retry_count: 0,
368                    output_size_bytes: 2048,
369                    peak_memory_bytes: None,
370                    cpu_time_ms: None,
371                    error_message: None,
372                },
373            ],
374            total_tasks: 2,
375            completed_tasks: 2,
376            failed_tasks: 0,
377            metadata: HashMap::new(),
378            error_message: None,
379        };
380
381        let service = MonitoringService::new();
382        let metrics = service
383            .analyze_performance(&history)
384            .expect("Analysis failed");
385
386        assert_eq!(metrics.total_duration, Duration::from_secs(100));
387        assert!(metrics.avg_task_duration.as_secs() > 0);
388    }
389}