1use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9 time::Duration,
10};
11
12use crate::{SystemTime};
13
14use serde::{Deserialize, Serialize};
15use tokio::{
16 sync::{mpsc, RwLock, Mutex},
17 time::interval,
18};
19use tracing::{debug, info, warn};
20
21use crate::workflow::{
22 StageId, WorkflowId, WorkflowStatus, WorkflowError, WorkflowMetrics,
23};
24
25#[derive(Debug, Clone)]
27pub struct MonitoringConfig {
28 pub collection_interval: Duration,
30 pub retention_period: Duration,
32 pub max_events_per_workflow: usize,
34 pub enable_tracing: bool,
36 pub alert_config: AlertConfig,
38}
39
40impl Default for MonitoringConfig {
41 fn default() -> Self {
42 Self {
43 collection_interval: Duration::from_secs(5),
44 retention_period: Duration::from_secs(24 * 3600),
45 max_events_per_workflow: 1000,
46 enable_tracing: true,
47 alert_config: AlertConfig::default(),
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct AlertConfig {
55 pub max_workflow_duration: Duration,
57 pub max_stage_duration: Duration,
59 pub error_rate_threshold: f32,
61 pub memory_threshold_mb: u64,
63 pub cpu_threshold_percent: f32,
65}
66
67impl Default for AlertConfig {
68 fn default() -> Self {
69 Self {
70 max_workflow_duration: Duration::from_secs(3600),
71 max_stage_duration: Duration::from_secs(600),
72 error_rate_threshold: 10.0,
73 memory_threshold_mb: 1024,
74 cpu_threshold_percent: 80.0,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub enum MonitoringEvent {
82 WorkflowStarted {
84 workflow_id: WorkflowId,
85 definition_id: String,
86 timestamp: SystemTime,
87 },
88 WorkflowCompleted {
90 workflow_id: WorkflowId,
91 duration: Duration,
92 success: bool,
93 timestamp: SystemTime,
94 },
95 StageStarted {
97 workflow_id: WorkflowId,
98 stage_id: StageId,
99 timestamp: SystemTime,
100 },
101 StageCompleted {
103 workflow_id: WorkflowId,
104 stage_id: StageId,
105 duration: Duration,
106 timestamp: SystemTime,
107 },
108 ErrorOccurred {
110 workflow_id: WorkflowId,
111 stage_id: Option<StageId>,
112 error: String,
113 timestamp: SystemTime,
114 },
115 MetricRecorded {
117 workflow_id: WorkflowId,
118 metric_name: String,
119 value: f64,
120 timestamp: SystemTime,
121 },
122 AlertTriggered {
124 workflow_id: Option<WorkflowId>,
125 alert_type: AlertType,
126 message: String,
127 timestamp: SystemTime,
128 },
129}
130
131#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
133pub enum AlertType {
134 WorkflowTimeout,
135 StageTimeout,
136 HighErrorRate,
137 HighMemoryUsage,
138 HighCpuUsage,
139 SystemError,
140}
141
142pub struct WorkflowMonitor {
144 config: MonitoringConfig,
146 event_history: Arc<RwLock<VecDeque<MonitoringEvent>>>,
148 workflow_metrics: Arc<RwLock<HashMap<WorkflowId, WorkflowMonitoringData>>>,
150 system_metrics: Arc<RwLock<SystemMetrics>>,
152 alert_handlers: Arc<RwLock<Vec<Box<dyn AlertHandler>>>>,
154 event_tx: mpsc::Sender<MonitoringEvent>,
156 event_rx: Arc<Mutex<mpsc::Receiver<MonitoringEvent>>>,
157}
158
159impl WorkflowMonitor {
160 pub fn new(config: MonitoringConfig) -> Self {
162 let (event_tx, event_rx) = mpsc::channel(1000);
163
164 Self {
165 config,
166 event_history: Arc::new(RwLock::new(VecDeque::new())),
167 workflow_metrics: Arc::new(RwLock::new(HashMap::new())),
168 system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
169 alert_handlers: Arc::new(RwLock::new(Vec::new())),
170 event_tx,
171 event_rx: Arc::new(Mutex::new(event_rx)),
172 }
173 }
174
175 pub async fn start(&self) -> Result<(), WorkflowError> {
177 info!("Starting workflow monitor");
178
179 let monitor = self.clone();
181 tokio::spawn(async move {
182 monitor.event_processing_loop().await;
183 });
184
185 let monitor = self.clone();
187 tokio::spawn(async move {
188 monitor.metrics_collection_loop().await;
189 });
190
191 let monitor = self.clone();
193 tokio::spawn(async move {
194 monitor.cleanup_loop().await;
195 });
196
197 Ok(())
198 }
199
200 pub async fn register_alert_handler(&self, handler: Box<dyn AlertHandler>) {
202 let mut handlers = self.alert_handlers.write().await;
203 handlers.push(handler);
204 }
205
206 pub async fn record_event(&self, event: MonitoringEvent) -> Result<(), WorkflowError> {
208 self.event_tx.send(event).await
209 .map_err(|_| WorkflowError {
210 code: "MONITORING_ERROR".to_string(),
211 message: "Failed to record monitoring event".to_string(),
212 stage: None,
213 trace: None,
214 recovery_hints: vec![],
215 })
216 }
217
218 pub async fn get_workflow_metrics(&self, workflow_id: &WorkflowId) -> Option<WorkflowMonitoringData> {
220 let metrics = self.workflow_metrics.read().await;
221 metrics.get(workflow_id).cloned()
222 }
223
224 pub async fn get_system_metrics(&self) -> SystemMetrics {
226 self.system_metrics.read().await.clone()
227 }
228
229 pub async fn get_recent_events(&self, count: usize) -> Vec<MonitoringEvent> {
231 let history = self.event_history.read().await;
232 history.iter().rev().take(count).cloned().collect()
233 }
234
235 pub async fn get_workflow_summary(&self) -> WorkflowSummary {
237 let metrics = self.workflow_metrics.read().await;
238 let system = self.system_metrics.read().await;
239
240 let active_workflows = metrics.iter()
241 .filter(|(_, data)| matches!(data.status, WorkflowStatus::Running { .. }))
242 .count();
243
244 let completed_workflows = metrics.iter()
245 .filter(|(_, data)| matches!(data.status, WorkflowStatus::Completed { .. }))
246 .count();
247
248 let failed_workflows = metrics.iter()
249 .filter(|(_, data)| matches!(data.status, WorkflowStatus::Failed { .. }))
250 .count();
251
252 let total_duration: Duration = metrics.values()
253 .filter_map(|data| data.end_time.map(|end| end.duration_since(data.start_time).unwrap_or_default()))
254 .sum();
255
256 let avg_duration = if completed_workflows > 0 {
257 total_duration / completed_workflows as u32
258 } else {
259 Duration::default()
260 };
261
262 WorkflowSummary {
263 active_workflows,
264 completed_workflows,
265 failed_workflows,
266 total_workflows: metrics.len(),
267 average_duration: avg_duration,
268 system_metrics: system.clone(),
269 }
270 }
271
272 async fn event_processing_loop(&self) {
274 let mut receiver = self.event_rx.lock().await;
275
276 while let Some(event) = receiver.recv().await {
277 if let Err(e) = self.process_event(event.clone()).await {
278 warn!("Error processing monitoring event: {:?}", e);
279 }
280 }
281 }
282
283 async fn process_event(&self, event: MonitoringEvent) -> Result<(), WorkflowError> {
285 {
287 let mut history = self.event_history.write().await;
288 history.push_back(event.clone());
289
290 while history.len() > 10000 {
292 history.pop_front();
293 }
294 }
295
296 match &event {
298 MonitoringEvent::WorkflowStarted { workflow_id, definition_id, timestamp } => {
299 let mut metrics = self.workflow_metrics.write().await;
300 metrics.insert(*workflow_id, WorkflowMonitoringData {
301 workflow_id: *workflow_id,
302 definition_id: definition_id.clone(),
303 status: WorkflowStatus::Running { current_stage: StageId("init".to_string()) },
304 start_time: *timestamp,
305 end_time: None,
306 stages_completed: 0,
307 errors: Vec::new(),
308 metrics: WorkflowMetrics::default(),
309 });
310 }
311 MonitoringEvent::WorkflowCompleted { workflow_id, duration, success, timestamp } => {
312 let mut metrics = self.workflow_metrics.write().await;
313 if let Some(data) = metrics.get_mut(workflow_id) {
314 data.status = if *success {
315 WorkflowStatus::Completed {
316 result: crate::workflow::WorkflowResult {
317 output: HashMap::new(),
318 duration: *duration,
319 metrics: data.metrics.clone(),
320 }
321 }
322 } else {
323 WorkflowStatus::Failed {
324 error: WorkflowError {
325 code: "WORKFLOW_FAILED".to_string(),
326 message: "Workflow failed".to_string(),
327 stage: None,
328 trace: None,
329 recovery_hints: vec![],
330 }
331 }
332 };
333 data.end_time = Some(*timestamp);
334 }
335
336 let mut system = self.system_metrics.write().await;
338 system.total_workflows_completed += 1;
339 if *success {
340 system.successful_workflows += 1;
341 } else {
342 system.failed_workflows += 1;
343 }
344 }
345 MonitoringEvent::StageCompleted { workflow_id, stage_id: _, duration, timestamp: _ } => {
346 let mut metrics = self.workflow_metrics.write().await;
347 if let Some(data) = metrics.get_mut(workflow_id) {
348 data.stages_completed += 1;
349 data.metrics.stages_executed += 1;
350 }
351
352 if *duration > self.config.alert_config.max_stage_duration {
354 self.trigger_alert(AlertType::StageTimeout,
355 format!("Stage took {:?}, exceeding threshold", duration),
356 Some(*workflow_id),
357 ).await;
358 }
359 }
360 MonitoringEvent::ErrorOccurred { workflow_id, stage_id: _, error, timestamp: _ } => {
361 let mut metrics = self.workflow_metrics.write().await;
362 if let Some(data) = metrics.get_mut(workflow_id) {
363 data.errors.push(error.clone());
364 data.metrics.error_count += 1;
365 }
366
367 let mut system = self.system_metrics.write().await;
369 system.error_count += 1;
370
371 let error_rate = system.calculate_error_rate();
373 if error_rate > self.config.alert_config.error_rate_threshold {
374 self.trigger_alert(AlertType::HighErrorRate,
375 format!("Error rate {:.1}/min exceeds threshold", error_rate),
376 None,
377 ).await;
378 }
379 }
380 MonitoringEvent::AlertTriggered { .. } => {
381 }
383 _ => {}
384 }
385
386 Ok(())
387 }
388
389 async fn metrics_collection_loop(&self) {
391 let mut interval = interval(self.config.collection_interval);
392
393 loop {
394 interval.tick().await;
395
396 if let Err(e) = self.collect_system_metrics().await {
398 warn!("Failed to collect system metrics: {:?}", e);
399 }
400
401 self.check_alerts().await;
403 }
404 }
405
406 async fn collect_system_metrics(&self) -> Result<(), WorkflowError> {
408 let mut system = self.system_metrics.write().await;
409
410 system.last_updated = SystemTime::now();
412
413 system.cpu_usage = 45.0;
416 system.memory_usage_mb = 512;
417
418 Ok(())
419 }
420
421 async fn check_alerts(&self) {
423 let metrics = self.workflow_metrics.read().await;
424 let system = self.system_metrics.read().await;
425
426 if system.cpu_usage > self.config.alert_config.cpu_threshold_percent {
428 self.trigger_alert(AlertType::HighCpuUsage,
429 format!("CPU usage {:.1}% exceeds threshold", system.cpu_usage),
430 None,
431 ).await;
432 }
433
434 if system.memory_usage_mb > self.config.alert_config.memory_threshold_mb {
436 self.trigger_alert(AlertType::HighMemoryUsage,
437 format!("Memory usage {}MB exceeds threshold", system.memory_usage_mb),
438 None,
439 ).await;
440 }
441
442 let now = SystemTime::now();
444 for (workflow_id, data) in metrics.iter() {
445 if matches!(data.status, WorkflowStatus::Running { .. }) {
446 if let Ok(duration) = now.duration_since(data.start_time) {
447 if duration > self.config.alert_config.max_workflow_duration {
448 self.trigger_alert(AlertType::WorkflowTimeout,
449 format!("Workflow running for {:?}, exceeding threshold", duration),
450 Some(*workflow_id),
451 ).await;
452 }
453 }
454 }
455 }
456 }
457
458 async fn trigger_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>) {
460 let event = MonitoringEvent::AlertTriggered {
461 workflow_id,
462 alert_type: alert_type.clone(),
463 message: message.clone(),
464 timestamp: SystemTime::now(),
465 };
466
467 let _ = self.record_event(event).await;
469
470 let handlers = self.alert_handlers.read().await;
472 for handler in handlers.iter() {
473 handler.handle_alert(alert_type.clone(), message.clone(), workflow_id).await;
474 }
475 }
476
477 async fn cleanup_loop(&self) {
479 let mut interval = interval(Duration::from_secs(3600)); loop {
482 interval.tick().await;
483
484 let now = SystemTime::now();
485 let retention_cutoff = now - self.config.retention_period;
486
487 let mut metrics = self.workflow_metrics.write().await;
489 metrics.retain(|_, data| {
490 if let Some(end_time) = data.end_time {
491 end_time > retention_cutoff
492 } else {
493 true }
495 });
496
497 let mut history = self.event_history.write().await;
499 history.retain(|event| {
500 match event {
501 MonitoringEvent::WorkflowStarted { timestamp, .. } |
502 MonitoringEvent::WorkflowCompleted { timestamp, .. } |
503 MonitoringEvent::StageStarted { timestamp, .. } |
504 MonitoringEvent::StageCompleted { timestamp, .. } |
505 MonitoringEvent::ErrorOccurred { timestamp, .. } |
506 MonitoringEvent::MetricRecorded { timestamp, .. } |
507 MonitoringEvent::AlertTriggered { timestamp, .. } => {
508 *timestamp > retention_cutoff
509 }
510 }
511 });
512
513 debug!("Cleaned up old monitoring data");
514 }
515 }
516}
517
518impl Clone for WorkflowMonitor {
519 fn clone(&self) -> Self {
520 Self {
521 config: self.config.clone(),
522 event_history: self.event_history.clone(),
523 workflow_metrics: self.workflow_metrics.clone(),
524 system_metrics: self.system_metrics.clone(),
525 alert_handlers: self.alert_handlers.clone(),
526 event_tx: self.event_tx.clone(),
527 event_rx: self.event_rx.clone(),
528 }
529 }
530}
531
532#[derive(Debug, Clone)]
534pub struct WorkflowMonitoringData {
535 pub workflow_id: WorkflowId,
537 pub definition_id: String,
539 pub status: WorkflowStatus,
541 pub start_time: SystemTime,
543 pub end_time: Option<SystemTime>,
545 pub stages_completed: u32,
547 pub errors: Vec<String>,
549 pub metrics: WorkflowMetrics,
551}
552
553#[derive(Debug, Clone)]
555pub struct SystemMetrics {
556 pub total_workflows_completed: u64,
558 pub successful_workflows: u64,
560 pub failed_workflows: u64,
562 pub error_count: u64,
564 pub cpu_usage: f32,
566 pub memory_usage_mb: u64,
568 pub last_updated: SystemTime,
570 pub start_time: SystemTime,
572}
573
574impl Default for SystemMetrics {
575 fn default() -> Self {
576 let now = SystemTime::now();
577 Self {
578 total_workflows_completed: 0,
579 successful_workflows: 0,
580 failed_workflows: 0,
581 error_count: 0,
582 cpu_usage: 0.0,
583 memory_usage_mb: 0,
584 last_updated: now,
585 start_time: now,
586 }
587 }
588}
589
590impl SystemMetrics {
591 pub fn calculate_error_rate(&self) -> f32 {
593 if let Ok(duration) = self.last_updated.duration_since(self.start_time) {
594 let minutes = duration.as_secs_f32() / 60.0;
595 if minutes > 0.0 {
596 return self.error_count as f32 / minutes;
597 }
598 }
599 0.0
600 }
601}
602
603#[derive(Debug, Clone)]
605pub struct WorkflowSummary {
606 pub active_workflows: usize,
608 pub completed_workflows: usize,
610 pub failed_workflows: usize,
612 pub total_workflows: usize,
614 pub average_duration: Duration,
616 pub system_metrics: SystemMetrics,
618}
619
620#[async_trait::async_trait]
622pub trait AlertHandler: Send + Sync {
623 async fn handle_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>);
625}
626
627pub struct LoggingAlertHandler;
629
630#[async_trait::async_trait]
631impl AlertHandler for LoggingAlertHandler {
632 async fn handle_alert(&self, alert_type: AlertType, message: String, workflow_id: Option<WorkflowId>) {
633 warn!("ALERT [{:?}] {}: {:?}", alert_type, message, workflow_id);
634 }
635}
636
637trait DurationExt {
639 fn from_hours(hours: u64) -> Duration;
640 fn from_mins(mins: u64) -> Duration;
641}
642
643impl DurationExt for Duration {
644 fn from_hours(hours: u64) -> Duration {
645 Duration::from_secs(hours * 3600)
646 }
647
648 fn from_mins(mins: u64) -> Duration {
649 Duration::from_secs(mins * 60)
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[tokio::test]
658 async fn test_workflow_monitor() {
659 let monitor = WorkflowMonitor::new(MonitoringConfig::default());
660 monitor.start().await.unwrap();
661
662 monitor.register_alert_handler(Box::new(LoggingAlertHandler)).await;
664
665 let workflow_id = WorkflowId::generate();
667
668 monitor.record_event(MonitoringEvent::WorkflowStarted {
669 workflow_id,
670 definition_id: "test_workflow".to_string(),
671 timestamp: SystemTime::now(),
672 }).await.unwrap();
673
674 monitor.record_event(MonitoringEvent::StageCompleted {
675 workflow_id,
676 stage_id: StageId("stage1".to_string()),
677 duration: Duration::from_secs(5),
678 timestamp: SystemTime::now(),
679 }).await.unwrap();
680
681 monitor.record_event(MonitoringEvent::WorkflowCompleted {
682 workflow_id,
683 duration: Duration::from_secs(10),
684 success: true,
685 timestamp: SystemTime::now(),
686 }).await.unwrap();
687
688 tokio::time::sleep(Duration::from_millis(100)).await;
690
691 let summary = monitor.get_workflow_summary().await;
693 assert_eq!(summary.completed_workflows, 1);
694 assert_eq!(summary.total_workflows, 1);
695
696 let events = monitor.get_recent_events(10).await;
697 assert_eq!(events.len(), 3);
698 }
699}