pub mod debugging;
pub mod logging;
pub mod metrics;
pub mod visualization;
use crate::error::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
pub use debugging::{DebugInfo, DebugSession, Debugger};
pub use logging::{LogEntry, LogLevel, WorkflowLogger};
pub use metrics::{MetricsCollector, WorkflowMetrics};
pub use visualization::{DagVisualizer, GraphFormat, VisualizationConfig};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionHistory {
pub execution_id: String,
pub workflow_id: String,
pub workflow_name: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub status: ExecutionHistoryStatus,
pub tasks: Vec<TaskExecutionRecord>,
pub total_tasks: usize,
pub completed_tasks: usize,
pub failed_tasks: usize,
pub metadata: HashMap<String, String>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ExecutionHistoryStatus {
Running,
Success,
Failed,
Cancelled,
TimedOut,
Paused,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskExecutionRecord {
pub task_id: String,
pub task_name: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration: Option<Duration>,
pub status: TaskExecutionStatus,
pub retry_count: usize,
pub output_size_bytes: usize,
pub peak_memory_bytes: Option<usize>,
pub cpu_time_ms: Option<u64>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskExecutionStatus {
Pending,
Running,
Success,
Failed,
Skipped,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
pub total_duration: Duration,
pub task_execution_time: Duration,
pub coordination_overhead: Duration,
pub avg_task_duration: Duration,
pub longest_task_duration: Duration,
pub shortest_task_duration: Duration,
pub parallelism_factor: f64,
pub throughput: f64,
pub critical_path_length: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BottleneckAnalysis {
pub critical_path: Vec<String>,
pub slowest_tasks: Vec<(String, Duration)>,
pub high_retry_tasks: Vec<(String, usize)>,
pub resource_bottlenecks: Vec<ResourceBottleneck>,
pub suggestions: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceBottleneck {
pub resource_type: String,
pub affected_tasks: Vec<String>,
pub severity: f64,
pub description: String,
}
pub struct MonitoringService {
metrics_collector: MetricsCollector,
logger: WorkflowLogger,
debugger: Debugger,
visualizer: DagVisualizer,
}
impl MonitoringService {
pub fn new() -> Self {
Self {
metrics_collector: MetricsCollector::new(),
logger: WorkflowLogger::new(),
debugger: Debugger::new(),
visualizer: DagVisualizer::new(),
}
}
pub fn metrics(&self) -> &MetricsCollector {
&self.metrics_collector
}
pub fn logger(&self) -> &WorkflowLogger {
&self.logger
}
pub fn debugger(&self) -> &Debugger {
&self.debugger
}
pub fn visualizer(&self) -> &DagVisualizer {
&self.visualizer
}
pub fn analyze_performance(&self, history: &ExecutionHistory) -> Result<PerformanceMetrics> {
let total_duration = history
.duration
.ok_or_else(|| crate::error::WorkflowError::monitoring("Duration not available"))?;
let task_durations: Vec<Duration> =
history.tasks.iter().filter_map(|t| t.duration).collect();
if task_durations.is_empty() {
return Err(crate::error::WorkflowError::monitoring(
"No task durations available",
));
}
let task_execution_time: Duration = task_durations.iter().sum();
let coordination_overhead = total_duration.saturating_sub(task_execution_time);
let avg_task_duration = task_execution_time
.checked_div(task_durations.len() as u32)
.unwrap_or(Duration::ZERO);
let longest_task_duration = task_durations
.iter()
.max()
.copied()
.unwrap_or(Duration::ZERO);
let shortest_task_duration = task_durations
.iter()
.min()
.copied()
.unwrap_or(Duration::ZERO);
let parallelism_factor = if total_duration.as_secs() > 0 {
task_execution_time.as_secs_f64() / total_duration.as_secs_f64()
} else {
0.0
};
let throughput = if total_duration.as_secs_f64() > 0.0 {
history.total_tasks as f64 / total_duration.as_secs_f64()
} else {
0.0
};
Ok(PerformanceMetrics {
total_duration,
task_execution_time,
coordination_overhead,
avg_task_duration,
longest_task_duration,
shortest_task_duration,
parallelism_factor,
throughput,
critical_path_length: longest_task_duration, })
}
pub fn detect_bottlenecks(&self, history: &ExecutionHistory) -> Result<BottleneckAnalysis> {
let mut slowest_tasks: Vec<(String, Duration)> = history
.tasks
.iter()
.filter_map(|t| t.duration.map(|d| (t.task_id.clone(), d)))
.collect();
slowest_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
slowest_tasks.truncate(5);
let mut high_retry_tasks: Vec<(String, usize)> = history
.tasks
.iter()
.filter(|t| t.retry_count > 0)
.map(|t| (t.task_id.clone(), t.retry_count))
.collect();
high_retry_tasks.sort_by_key(|x| std::cmp::Reverse(x.1));
high_retry_tasks.truncate(5);
let mut suggestions = Vec::new();
if !slowest_tasks.is_empty() {
suggestions.push(format!(
"Consider optimizing task '{}' which took {:?}",
slowest_tasks[0].0, slowest_tasks[0].1
));
}
if !high_retry_tasks.is_empty() {
suggestions.push(format!(
"Task '{}' has {} retries, investigate failure causes",
high_retry_tasks[0].0, high_retry_tasks[0].1
));
}
Ok(BottleneckAnalysis {
critical_path: Vec::new(), slowest_tasks,
high_retry_tasks,
resource_bottlenecks: Vec::new(),
suggestions,
})
}
}
impl Default for MonitoringService {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monitoring_service_creation() {
let service = MonitoringService::new();
assert!(service.metrics().get_all_metrics().is_empty());
}
#[test]
fn test_execution_history_status() {
let status = ExecutionHistoryStatus::Running;
assert_eq!(status, ExecutionHistoryStatus::Running);
}
#[test]
fn test_performance_metrics() {
let history = ExecutionHistory {
execution_id: "exec1".to_string(),
workflow_id: "wf1".to_string(),
workflow_name: "Test Workflow".to_string(),
start_time: Utc::now(),
end_time: Some(Utc::now()),
duration: Some(Duration::from_secs(100)),
status: ExecutionHistoryStatus::Success,
tasks: vec![
TaskExecutionRecord {
task_id: "task1".to_string(),
task_name: "Task 1".to_string(),
start_time: Utc::now(),
end_time: Some(Utc::now()),
duration: Some(Duration::from_secs(30)),
status: TaskExecutionStatus::Success,
retry_count: 0,
output_size_bytes: 1024,
peak_memory_bytes: None,
cpu_time_ms: None,
error_message: None,
},
TaskExecutionRecord {
task_id: "task2".to_string(),
task_name: "Task 2".to_string(),
start_time: Utc::now(),
end_time: Some(Utc::now()),
duration: Some(Duration::from_secs(40)),
status: TaskExecutionStatus::Success,
retry_count: 0,
output_size_bytes: 2048,
peak_memory_bytes: None,
cpu_time_ms: None,
error_message: None,
},
],
total_tasks: 2,
completed_tasks: 2,
failed_tasks: 0,
metadata: HashMap::new(),
error_message: None,
};
let service = MonitoringService::new();
let metrics = service
.analyze_performance(&history)
.expect("Analysis failed");
assert_eq!(metrics.total_duration, Duration::from_secs(100));
assert!(metrics.avg_task_duration.as_secs() > 0);
}
}