ant_quic/workflow/
monitor.rs

1//! Workflow Monitoring and Visualization
2//!
3//! This module provides real-time monitoring, metrics collection, and
4//! visualization capabilities for workflow execution.
5
6use std::{
7    collections::{HashMap, VecDeque},
8    sync::Arc,
9    time::Duration,
10};
11
12use crate::{SystemTime};
13
14use serde::{Deserialize, Serialize};
15use tokio::{
16    sync::{mpsc, RwLock, Mutex},
17    time::interval,
18};
19use tracing::{debug, info, warn};
20
21use crate::workflow::{
22    StageId, WorkflowId, WorkflowStatus, WorkflowError, WorkflowMetrics,
23};
24
25/// Monitoring configuration
26#[derive(Debug, Clone)]
27pub struct MonitoringConfig {
28    /// Metrics collection interval
29    pub collection_interval: Duration,
30    /// History retention period
31    pub retention_period: Duration,
32    /// Maximum events to store per workflow
33    pub max_events_per_workflow: usize,
34    /// Enable detailed tracing
35    pub enable_tracing: bool,
36    /// Alert thresholds
37    pub alert_config: AlertConfig,
38}
39
40impl Default for MonitoringConfig {
41    fn default() -> Self {
42        Self {
43            collection_interval: Duration::from_secs(5),
44            retention_period: Duration::from_secs(24 * 3600),
45            max_events_per_workflow: 1000,
46            enable_tracing: true,
47            alert_config: AlertConfig::default(),
48        }
49    }
50}
51
52/// Alert configuration
53#[derive(Debug, Clone)]
54pub struct AlertConfig {
55    /// Maximum workflow duration before alert
56    pub max_workflow_duration: Duration,
57    /// Maximum stage duration before alert
58    pub max_stage_duration: Duration,
59    /// Error rate threshold (errors per minute)
60    pub error_rate_threshold: f32,
61    /// Memory usage threshold in MB
62    pub memory_threshold_mb: u64,
63    /// CPU usage threshold percentage
64    pub cpu_threshold_percent: f32,
65}
66
67impl Default for AlertConfig {
68    fn default() -> Self {
69        Self {
70            max_workflow_duration: Duration::from_secs(3600),
71            max_stage_duration: Duration::from_secs(600),
72            error_rate_threshold: 10.0,
73            memory_threshold_mb: 1024,
74            cpu_threshold_percent: 80.0,
75        }
76    }
77}
78
79/// Monitoring event types
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum MonitoringEvent {
82    /// Workflow started
83    WorkflowStarted {
84        workflow_id: WorkflowId,
85        definition_id: String,
86        timestamp: SystemTime,
87    },
88    /// Workflow completed
89    WorkflowCompleted {
90        workflow_id: WorkflowId,
91        duration: Duration,
92        success: bool,
93        timestamp: SystemTime,
94    },
95    /// Stage started
96    StageStarted {
97        workflow_id: WorkflowId,
98        stage_id: StageId,
99        timestamp: SystemTime,
100    },
101    /// Stage completed
102    StageCompleted {
103        workflow_id: WorkflowId,
104        stage_id: StageId,
105        duration: Duration,
106        timestamp: SystemTime,
107    },
108    /// Error occurred
109    ErrorOccurred {
110        workflow_id: WorkflowId,
111        stage_id: Option<StageId>,
112        error: String,
113        timestamp: SystemTime,
114    },
115    /// Metric recorded
116    MetricRecorded {
117        workflow_id: WorkflowId,
118        metric_name: String,
119        value: f64,
120        timestamp: SystemTime,
121    },
122    /// Alert triggered
123    AlertTriggered {
124        workflow_id: Option<WorkflowId>,
125        alert_type: AlertType,
126        message: String,
127        timestamp: SystemTime,
128    },
129}
130
131/// Types of alerts
132#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub enum AlertType {
134    WorkflowTimeout,
135    StageTimeout,
136    HighErrorRate,
137    HighMemoryUsage,
138    HighCpuUsage,
139    SystemError,
140}
141
142/// Workflow monitor for real-time monitoring
143pub struct WorkflowMonitor {
144    /// Monitoring configuration
145    config: MonitoringConfig,
146    /// Event history
147    event_history: Arc<RwLock<VecDeque<MonitoringEvent>>>,
148    /// Workflow metrics
149    workflow_metrics: Arc<RwLock<HashMap<WorkflowId, WorkflowMonitoringData>>>,
150    /// System metrics
151    system_metrics: Arc<RwLock<SystemMetrics>>,
152    /// Alert handlers
153    alert_handlers: Arc<RwLock<Vec<Box<dyn AlertHandler>>>>,
154    /// Event channel
155    event_tx: mpsc::Sender<MonitoringEvent>,
156    event_rx: Arc<Mutex<mpsc::Receiver<MonitoringEvent>>>,
157}
158
159impl WorkflowMonitor {
160    /// Create a new workflow monitor
161    pub fn new(config: MonitoringConfig) -> Self {
162        let (event_tx, event_rx) = mpsc::channel(1000);
163        
164        Self {
165            config,
166            event_history: Arc::new(RwLock::new(VecDeque::new())),
167            workflow_metrics: Arc::new(RwLock::new(HashMap::new())),
168            system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
169            alert_handlers: Arc::new(RwLock::new(Vec::new())),
170            event_tx,
171            event_rx: Arc::new(Mutex::new(event_rx)),
172        }
173    }
174
175    /// Start the monitor
176    pub async fn start(&self) -> Result<(), WorkflowError> {
177        info!("Starting workflow monitor");
178        
179        // Start event processing
180        let monitor = self.clone();
181        tokio::spawn(async move {
182            monitor.event_processing_loop().await;
183        });
184        
185        // Start metrics collection
186        let monitor = self.clone();
187        tokio::spawn(async move {
188            monitor.metrics_collection_loop().await;
189        });
190        
191        // Start cleanup task
192        let monitor = self.clone();
193        tokio::spawn(async move {
194            monitor.cleanup_loop().await;
195        });
196        
197        Ok(())
198    }
199
200    /// Register an alert handler
201    pub async fn register_alert_handler(&self, handler: Box<dyn AlertHandler>) {
202        let mut handlers = self.alert_handlers.write().await;
203        handlers.push(handler);
204    }
205
206    /// Record a monitoring event
207    pub async fn record_event(&self, event: MonitoringEvent) -> Result<(), WorkflowError> {
208        self.event_tx.send(event).await
209            .map_err(|_| WorkflowError {
210                code: "MONITORING_ERROR".to_string(),
211                message: "Failed to record monitoring event".to_string(),
212                stage: None,
213                trace: None,
214                recovery_hints: vec![],
215            })
216    }
217
218    /// Get workflow metrics
219    pub async fn get_workflow_metrics(&self, workflow_id: &WorkflowId) -> Option<WorkflowMonitoringData> {
220        let metrics = self.workflow_metrics.read().await;
221        metrics.get(workflow_id).cloned()
222    }
223
224    /// Get system metrics
225    pub async fn get_system_metrics(&self) -> SystemMetrics {
226        self.system_metrics.read().await.clone()
227    }
228
229    /// Get recent events
230    pub async fn get_recent_events(&self, count: usize) -> Vec<MonitoringEvent> {
231        let history = self.event_history.read().await;
232        history.iter().rev().take(count).cloned().collect()
233    }
234
235    /// Get workflow summary
236    pub async fn get_workflow_summary(&self) -> WorkflowSummary {
237        let metrics = self.workflow_metrics.read().await;
238        let system = self.system_metrics.read().await;
239        
240        let active_workflows = metrics.iter()
241            .filter(|(_, data)| matches!(data.status, WorkflowStatus::Running { .. }))
242            .count();
243        
244        let completed_workflows = metrics.iter()
245            .filter(|(_, data)| matches!(data.status, WorkflowStatus::Completed { .. }))
246            .count();
247        
248        let failed_workflows = metrics.iter()
249            .filter(|(_, data)| matches!(data.status, WorkflowStatus::Failed { .. }))
250            .count();
251        
252        let total_duration: Duration = metrics.values()
253            .filter_map(|data| data.end_time.map(|end| end.duration_since(data.start_time).unwrap_or_default()))
254            .sum();
255        
256        let avg_duration = if completed_workflows > 0 {
257            total_duration / completed_workflows as u32
258        } else {
259            Duration::default()
260        };
261        
262        WorkflowSummary {
263            active_workflows,
264            completed_workflows,
265            failed_workflows,
266            total_workflows: metrics.len(),
267            average_duration: avg_duration,
268            system_metrics: system.clone(),
269        }
270    }
271
272    /// Event processing loop
273    async fn event_processing_loop(&self) {
274        let mut receiver = self.event_rx.lock().await;
275        
276        while let Some(event) = receiver.recv().await {
277            if let Err(e) = self.process_event(event.clone()).await {
278                warn!("Error processing monitoring event: {:?}", e);
279            }
280        }
281    }
282
283    /// Process a monitoring event
284    async fn process_event(&self, event: MonitoringEvent) -> Result<(), WorkflowError> {
285        // Add to history
286        {
287            let mut history = self.event_history.write().await;
288            history.push_back(event.clone());
289            
290            // Trim old events
291            while history.len() > 10000 {
292                history.pop_front();
293            }
294        }
295        
296        // Update metrics
297        match &event {
298            MonitoringEvent::WorkflowStarted { workflow_id, definition_id, timestamp } => {
299                let mut metrics = self.workflow_metrics.write().await;
300                metrics.insert(*workflow_id, WorkflowMonitoringData {
301                    workflow_id: *workflow_id,
302                    definition_id: definition_id.clone(),
303                    status: WorkflowStatus::Running { current_stage: StageId("init".to_string()) },
304                    start_time: *timestamp,
305                    end_time: None,
306                    stages_completed: 0,
307                    errors: Vec::new(),
308                    metrics: WorkflowMetrics::default(),
309                });
310            }
311            MonitoringEvent::WorkflowCompleted { workflow_id, duration, success, timestamp } => {
312                let mut metrics = self.workflow_metrics.write().await;
313                if let Some(data) = metrics.get_mut(workflow_id) {
314                    data.status = if *success {
315                        WorkflowStatus::Completed { 
316                            result: crate::workflow::WorkflowResult {
317                                output: HashMap::new(),
318                                duration: *duration,
319                                metrics: data.metrics.clone(),
320                            }
321                        }
322                    } else {
323                        WorkflowStatus::Failed { 
324                            error: WorkflowError {
325                                code: "WORKFLOW_FAILED".to_string(),
326                                message: "Workflow failed".to_string(),
327                                stage: None,
328                                trace: None,
329                                recovery_hints: vec![],
330                            }
331                        }
332                    };
333                    data.end_time = Some(*timestamp);
334                }
335                
336                // Update system metrics
337                let mut system = self.system_metrics.write().await;
338                system.total_workflows_completed += 1;
339                if *success {
340                    system.successful_workflows += 1;
341                } else {
342                    system.failed_workflows += 1;
343                }
344            }
345            MonitoringEvent::StageCompleted { workflow_id, stage_id: _, duration, timestamp: _ } => {
346                let mut metrics = self.workflow_metrics.write().await;
347                if let Some(data) = metrics.get_mut(workflow_id) {
348                    data.stages_completed += 1;
349                    data.metrics.stages_executed += 1;
350                }
351                
352                // Check for stage timeout alert
353                if *duration > self.config.alert_config.max_stage_duration {
354                    self.trigger_alert(AlertType::StageTimeout, 
355                        format!("Stage took {:?}, exceeding threshold", duration),
356                        Some(*workflow_id),
357                    ).await;
358                }
359            }
360            MonitoringEvent::ErrorOccurred { workflow_id, stage_id: _, error, timestamp: _ } => {
361                let mut metrics = self.workflow_metrics.write().await;
362                if let Some(data) = metrics.get_mut(workflow_id) {
363                    data.errors.push(error.clone());
364                    data.metrics.error_count += 1;
365                }
366                
367                // Update system error rate
368                let mut system = self.system_metrics.write().await;
369                system.error_count += 1;
370                
371                // Check error rate
372                let error_rate = system.calculate_error_rate();
373                if error_rate > self.config.alert_config.error_rate_threshold {
374                    self.trigger_alert(AlertType::HighErrorRate,
375                        format!("Error rate {:.1}/min exceeds threshold", error_rate),
376                        None,
377                    ).await;
378                }
379            }
380            MonitoringEvent::AlertTriggered { .. } => {
381                // Alerts are already handled
382            }
383            _ => {}
384        }
385        
386        Ok(())
387    }
388
389    /// Metrics collection loop
390    async fn metrics_collection_loop(&self) {
391        let mut interval = interval(self.config.collection_interval);
392        
393        loop {
394            interval.tick().await;
395            
396            // Collect system metrics
397            if let Err(e) = self.collect_system_metrics().await {
398                warn!("Failed to collect system metrics: {:?}", e);
399            }
400            
401            // Check for alerts
402            self.check_alerts().await;
403        }
404    }
405
406    /// Collect system metrics
407    async fn collect_system_metrics(&self) -> Result<(), WorkflowError> {
408        let mut system = self.system_metrics.write().await;
409        
410        // Update timestamp
411        system.last_updated = SystemTime::now();
412        
413        // In a real implementation, we would collect actual system metrics
414        // For now, we'll use placeholder values
415        system.cpu_usage = 45.0;
416        system.memory_usage_mb = 512;
417        
418        Ok(())
419    }
420
421    /// Check for alert conditions
422    async fn check_alerts(&self) {
423        let metrics = self.workflow_metrics.read().await;
424        let system = self.system_metrics.read().await;
425        
426        // Check CPU usage
427        if system.cpu_usage > self.config.alert_config.cpu_threshold_percent {
428            self.trigger_alert(AlertType::HighCpuUsage,
429                format!("CPU usage {:.1}% exceeds threshold", system.cpu_usage),
430                None,
431            ).await;
432        }
433        
434        // Check memory usage
435        if system.memory_usage_mb > self.config.alert_config.memory_threshold_mb {
436            self.trigger_alert(AlertType::HighMemoryUsage,
437                format!("Memory usage {}MB exceeds threshold", system.memory_usage_mb),
438                None,
439            ).await;
440        }
441        
442        // Check workflow timeouts
443        let now = SystemTime::now();
444        for (workflow_id, data) in metrics.iter() {
445            if matches!(data.status, WorkflowStatus::Running { .. }) {
446                if let Ok(duration) = now.duration_since(data.start_time) {
447                    if duration > self.config.alert_config.max_workflow_duration {
448                        self.trigger_alert(AlertType::WorkflowTimeout,
449                            format!("Workflow running for {:?}, exceeding threshold", duration),
450                            Some(*workflow_id),
451                        ).await;
452                    }
453                }
454            }
455        }
456    }
457
458    /// Trigger an alert
459    async fn trigger_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>) {
460        let event = MonitoringEvent::AlertTriggered {
461            workflow_id,
462            alert_type: alert_type.clone(),
463            message: message.clone(),
464            timestamp: SystemTime::now(),
465        };
466        
467        // Record the alert event
468        let _ = self.record_event(event).await;
469        
470        // Notify all handlers
471        let handlers = self.alert_handlers.read().await;
472        for handler in handlers.iter() {
473            handler.handle_alert(alert_type.clone(), message.clone(), workflow_id).await;
474        }
475    }
476
477    /// Cleanup loop for old data
478    async fn cleanup_loop(&self) {
479        let mut interval = interval(Duration::from_secs(3600)); // 1 hour
480        
481        loop {
482            interval.tick().await;
483            
484            let now = SystemTime::now();
485            let retention_cutoff = now - self.config.retention_period;
486            
487            // Clean up old workflow metrics
488            let mut metrics = self.workflow_metrics.write().await;
489            metrics.retain(|_, data| {
490                if let Some(end_time) = data.end_time {
491                    end_time > retention_cutoff
492                } else {
493                    true // Keep running workflows
494                }
495            });
496            
497            // Clean up old events
498            let mut history = self.event_history.write().await;
499            history.retain(|event| {
500                match event {
501                    MonitoringEvent::WorkflowStarted { timestamp, .. } |
502                    MonitoringEvent::WorkflowCompleted { timestamp, .. } |
503                    MonitoringEvent::StageStarted { timestamp, .. } |
504                    MonitoringEvent::StageCompleted { timestamp, .. } |
505                    MonitoringEvent::ErrorOccurred { timestamp, .. } |
506                    MonitoringEvent::MetricRecorded { timestamp, .. } |
507                    MonitoringEvent::AlertTriggered { timestamp, .. } => {
508                        *timestamp > retention_cutoff
509                    }
510                }
511            });
512            
513            debug!("Cleaned up old monitoring data");
514        }
515    }
516}
517
518impl Clone for WorkflowMonitor {
519    fn clone(&self) -> Self {
520        Self {
521            config: self.config.clone(),
522            event_history: self.event_history.clone(),
523            workflow_metrics: self.workflow_metrics.clone(),
524            system_metrics: self.system_metrics.clone(),
525            alert_handlers: self.alert_handlers.clone(),
526            event_tx: self.event_tx.clone(),
527            event_rx: self.event_rx.clone(),
528        }
529    }
530}
531
532/// Monitoring data for a workflow
533#[derive(Debug, Clone)]
534pub struct WorkflowMonitoringData {
535    /// Workflow ID
536    pub workflow_id: WorkflowId,
537    /// Workflow definition ID
538    pub definition_id: String,
539    /// Current status
540    pub status: WorkflowStatus,
541    /// Start time
542    pub start_time: SystemTime,
543    /// End time
544    pub end_time: Option<SystemTime>,
545    /// Number of stages completed
546    pub stages_completed: u32,
547    /// Errors encountered
548    pub errors: Vec<String>,
549    /// Workflow metrics
550    pub metrics: WorkflowMetrics,
551}
552
553/// System-wide metrics
554#[derive(Debug, Clone)]
555pub struct SystemMetrics {
556    /// Total workflows completed
557    pub total_workflows_completed: u64,
558    /// Successful workflows
559    pub successful_workflows: u64,
560    /// Failed workflows
561    pub failed_workflows: u64,
562    /// Total error count
563    pub error_count: u64,
564    /// CPU usage percentage
565    pub cpu_usage: f32,
566    /// Memory usage in MB
567    pub memory_usage_mb: u64,
568    /// Last update timestamp
569    pub last_updated: SystemTime,
570    /// Start time for rate calculations
571    pub start_time: SystemTime,
572}
573
574impl Default for SystemMetrics {
575    fn default() -> Self {
576        let now = SystemTime::now();
577        Self {
578            total_workflows_completed: 0,
579            successful_workflows: 0,
580            failed_workflows: 0,
581            error_count: 0,
582            cpu_usage: 0.0,
583            memory_usage_mb: 0,
584            last_updated: now,
585            start_time: now,
586        }
587    }
588}
589
590impl SystemMetrics {
591    /// Calculate error rate per minute
592    pub fn calculate_error_rate(&self) -> f32 {
593        if let Ok(duration) = self.last_updated.duration_since(self.start_time) {
594            let minutes = duration.as_secs_f32() / 60.0;
595            if minutes > 0.0 {
596                return self.error_count as f32 / minutes;
597            }
598        }
599        0.0
600    }
601}
602
603/// Summary of workflow system state
604#[derive(Debug, Clone)]
605pub struct WorkflowSummary {
606    /// Number of active workflows
607    pub active_workflows: usize,
608    /// Number of completed workflows
609    pub completed_workflows: usize,
610    /// Number of failed workflows
611    pub failed_workflows: usize,
612    /// Total workflows
613    pub total_workflows: usize,
614    /// Average workflow duration
615    pub average_duration: Duration,
616    /// System metrics
617    pub system_metrics: SystemMetrics,
618}
619
620/// Alert handler trait
621#[async_trait::async_trait]
622pub trait AlertHandler: Send + Sync {
623    /// Handle an alert
624    async fn handle_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>);
625}
626
627/// Simple logging alert handler
628pub struct LoggingAlertHandler;
629
630#[async_trait::async_trait]
631impl AlertHandler for LoggingAlertHandler {
632    async fn handle_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>) {
633        warn!("ALERT [{:?}] {}: {:?}", alert_type, message, workflow_id);
634    }
635}
636
637// Helper trait for Duration operations
638trait DurationExt {
639    fn from_hours(hours: u64) -> Duration;
640    fn from_mins(mins: u64) -> Duration;
641}
642
643impl DurationExt for Duration {
644    fn from_hours(hours: u64) -> Duration {
645        Duration::from_secs(hours * 3600)
646    }
647    
648    fn from_mins(mins: u64) -> Duration {
649        Duration::from_secs(mins * 60)
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656    
657    #[tokio::test]
658    async fn test_workflow_monitor() {
659        let monitor = WorkflowMonitor::new(MonitoringConfig::default());
660        monitor.start().await.unwrap();
661        
662        // Register alert handler
663        monitor.register_alert_handler(Box::new(LoggingAlertHandler)).await;
664        
665        // Record some events
666        let workflow_id = WorkflowId::generate();
667        
668        monitor.record_event(MonitoringEvent::WorkflowStarted {
669            workflow_id,
670            definition_id: "test_workflow".to_string(),
671            timestamp: SystemTime::now(),
672        }).await.unwrap();
673        
674        monitor.record_event(MonitoringEvent::StageCompleted {
675            workflow_id,
676            stage_id: StageId("stage1".to_string()),
677            duration: Duration::from_secs(5),
678            timestamp: SystemTime::now(),
679        }).await.unwrap();
680        
681        monitor.record_event(MonitoringEvent::WorkflowCompleted {
682            workflow_id,
683            duration: Duration::from_secs(10),
684            success: true,
685            timestamp: SystemTime::now(),
686        }).await.unwrap();
687        
688        // Give background tasks time to process events
689        tokio::time::sleep(Duration::from_millis(100)).await;
690        
691        // Check metrics
692        let summary = monitor.get_workflow_summary().await;
693        assert_eq!(summary.completed_workflows, 1);
694        assert_eq!(summary.total_workflows, 1);
695        
696        let events = monitor.get_recent_events(10).await;
697        assert_eq!(events.len(), 3);
698    }
699}