1pub mod debugging;
11pub mod logging;
12pub mod metrics;
13pub mod visualization;
14
15use crate::error::Result;
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::time::Duration;
20
21pub use debugging::{DebugInfo, DebugSession, Debugger};
22pub use logging::{LogEntry, LogLevel, WorkflowLogger};
23pub use metrics::{MetricsCollector, WorkflowMetrics};
24pub use visualization::{DagVisualizer, GraphFormat, VisualizationConfig};
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ExecutionHistory {
29 pub execution_id: String,
31 pub workflow_id: String,
33 pub workflow_name: String,
35 pub start_time: DateTime<Utc>,
37 pub end_time: Option<DateTime<Utc>>,
39 pub duration: Option<Duration>,
41 pub status: ExecutionHistoryStatus,
43 pub tasks: Vec<TaskExecutionRecord>,
45 pub total_tasks: usize,
47 pub completed_tasks: usize,
49 pub failed_tasks: usize,
51 pub metadata: HashMap<String, String>,
53 pub error_message: Option<String>,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
59pub enum ExecutionHistoryStatus {
60 Running,
62 Success,
64 Failed,
66 Cancelled,
68 TimedOut,
70 Paused,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct TaskExecutionRecord {
77 pub task_id: String,
79 pub task_name: String,
81 pub start_time: DateTime<Utc>,
83 pub end_time: Option<DateTime<Utc>>,
85 pub duration: Option<Duration>,
87 pub status: TaskExecutionStatus,
89 pub retry_count: usize,
91 pub output_size_bytes: usize,
93 pub peak_memory_bytes: Option<usize>,
95 pub cpu_time_ms: Option<u64>,
97 pub error_message: Option<String>,
99}
100
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
103pub enum TaskExecutionStatus {
104 Pending,
106 Running,
108 Success,
110 Failed,
112 Skipped,
114 Cancelled,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct PerformanceMetrics {
121 pub total_duration: Duration,
123 pub task_execution_time: Duration,
125 pub coordination_overhead: Duration,
127 pub avg_task_duration: Duration,
129 pub longest_task_duration: Duration,
131 pub shortest_task_duration: Duration,
133 pub parallelism_factor: f64,
135 pub throughput: f64,
137 pub critical_path_length: Duration,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub struct BottleneckAnalysis {
144 pub critical_path: Vec<String>,
146 pub slowest_tasks: Vec<(String, Duration)>,
148 pub high_retry_tasks: Vec<(String, usize)>,
150 pub resource_bottlenecks: Vec<ResourceBottleneck>,
152 pub suggestions: Vec<String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct ResourceBottleneck {
159 pub resource_type: String,
161 pub affected_tasks: Vec<String>,
163 pub severity: f64,
165 pub description: String,
167}
168
169pub struct MonitoringService {
171 metrics_collector: MetricsCollector,
172 logger: WorkflowLogger,
173 debugger: Debugger,
174 visualizer: DagVisualizer,
175}
176
177impl MonitoringService {
178 pub fn new() -> Self {
180 Self {
181 metrics_collector: MetricsCollector::new(),
182 logger: WorkflowLogger::new(),
183 debugger: Debugger::new(),
184 visualizer: DagVisualizer::new(),
185 }
186 }
187
188 pub fn metrics(&self) -> &MetricsCollector {
190 &self.metrics_collector
191 }
192
193 pub fn logger(&self) -> &WorkflowLogger {
195 &self.logger
196 }
197
198 pub fn debugger(&self) -> &Debugger {
200 &self.debugger
201 }
202
203 pub fn visualizer(&self) -> &DagVisualizer {
205 &self.visualizer
206 }
207
208 pub fn analyze_performance(&self, history: &ExecutionHistory) -> Result<PerformanceMetrics> {
210 let total_duration = history
211 .duration
212 .ok_or_else(|| crate::error::WorkflowError::monitoring("Duration not available"))?;
213
214 let task_durations: Vec<Duration> =
215 history.tasks.iter().filter_map(|t| t.duration).collect();
216
217 if task_durations.is_empty() {
218 return Err(crate::error::WorkflowError::monitoring(
219 "No task durations available",
220 ));
221 }
222
223 let task_execution_time: Duration = task_durations.iter().sum();
224 let coordination_overhead = total_duration.saturating_sub(task_execution_time);
225
226 let avg_task_duration = task_execution_time
227 .checked_div(task_durations.len() as u32)
228 .unwrap_or(Duration::ZERO);
229
230 let longest_task_duration = task_durations
231 .iter()
232 .max()
233 .copied()
234 .unwrap_or(Duration::ZERO);
235
236 let shortest_task_duration = task_durations
237 .iter()
238 .min()
239 .copied()
240 .unwrap_or(Duration::ZERO);
241
242 let parallelism_factor = if total_duration.as_secs() > 0 {
243 task_execution_time.as_secs_f64() / total_duration.as_secs_f64()
244 } else {
245 0.0
246 };
247
248 let throughput = if total_duration.as_secs_f64() > 0.0 {
249 history.total_tasks as f64 / total_duration.as_secs_f64()
250 } else {
251 0.0
252 };
253
254 Ok(PerformanceMetrics {
255 total_duration,
256 task_execution_time,
257 coordination_overhead,
258 avg_task_duration,
259 longest_task_duration,
260 shortest_task_duration,
261 parallelism_factor,
262 throughput,
263 critical_path_length: longest_task_duration, })
265 }
266
267 pub fn detect_bottlenecks(&self, history: &ExecutionHistory) -> Result<BottleneckAnalysis> {
269 let mut slowest_tasks: Vec<(String, Duration)> = history
270 .tasks
271 .iter()
272 .filter_map(|t| t.duration.map(|d| (t.task_id.clone(), d)))
273 .collect();
274
275 slowest_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
276 slowest_tasks.truncate(5);
277
278 let mut high_retry_tasks: Vec<(String, usize)> = history
279 .tasks
280 .iter()
281 .filter(|t| t.retry_count > 0)
282 .map(|t| (t.task_id.clone(), t.retry_count))
283 .collect();
284
285 high_retry_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
286 high_retry_tasks.truncate(5);
287
288 let mut suggestions = Vec::new();
289
290 if !slowest_tasks.is_empty() {
291 suggestions.push(format!(
292 "Consider optimizing task '{}' which took {:?}",
293 slowest_tasks[0].0, slowest_tasks[0].1
294 ));
295 }
296
297 if !high_retry_tasks.is_empty() {
298 suggestions.push(format!(
299 "Task '{}' has {} retries, investigate failure causes",
300 high_retry_tasks[0].0, high_retry_tasks[0].1
301 ));
302 }
303
304 Ok(BottleneckAnalysis {
305 critical_path: Vec::new(), slowest_tasks,
307 high_retry_tasks,
308 resource_bottlenecks: Vec::new(),
309 suggestions,
310 })
311 }
312}
313
314impl Default for MonitoringService {
315 fn default() -> Self {
316 Self::new()
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_monitoring_service_creation() {
326 let service = MonitoringService::new();
327 assert!(service.metrics().get_all_metrics().is_empty());
328 }
329
330 #[test]
331 fn test_execution_history_status() {
332 let status = ExecutionHistoryStatus::Running;
333 assert_eq!(status, ExecutionHistoryStatus::Running);
334 }
335
336 #[test]
337 fn test_performance_metrics() {
338 let history = ExecutionHistory {
339 execution_id: "exec1".to_string(),
340 workflow_id: "wf1".to_string(),
341 workflow_name: "Test Workflow".to_string(),
342 start_time: Utc::now(),
343 end_time: Some(Utc::now()),
344 duration: Some(Duration::from_secs(100)),
345 status: ExecutionHistoryStatus::Success,
346 tasks: vec![
347 TaskExecutionRecord {
348 task_id: "task1".to_string(),
349 task_name: "Task 1".to_string(),
350 start_time: Utc::now(),
351 end_time: Some(Utc::now()),
352 duration: Some(Duration::from_secs(30)),
353 status: TaskExecutionStatus::Success,
354 retry_count: 0,
355 output_size_bytes: 1024,
356 peak_memory_bytes: None,
357 cpu_time_ms: None,
358 error_message: None,
359 },
360 TaskExecutionRecord {
361 task_id: "task2".to_string(),
362 task_name: "Task 2".to_string(),
363 start_time: Utc::now(),
364 end_time: Some(Utc::now()),
365 duration: Some(Duration::from_secs(40)),
366 status: TaskExecutionStatus::Success,
367 retry_count: 0,
368 output_size_bytes: 2048,
369 peak_memory_bytes: None,
370 cpu_time_ms: None,
371 error_message: None,
372 },
373 ],
374 total_tasks: 2,
375 completed_tasks: 2,
376 failed_tasks: 0,
377 metadata: HashMap::new(),
378 error_message: None,
379 };
380
381 let service = MonitoringService::new();
382 let metrics = service
383 .analyze_performance(&history)
384 .expect("Analysis failed");
385
386 assert_eq!(metrics.total_duration, Duration::from_secs(100));
387 assert!(metrics.avg_task_duration.as_secs() > 0);
388 }
389}