1use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use serde::{Deserialize, Serialize};
9
10use crate::ir::{WorkflowExecutionId, ActivityExecutionId, ExecutionStatus, ExecutionEventType};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum MetricType {
15 Counter,
16 Gauge,
17 Histogram,
18 Summary,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum MetricValue {
24 Counter(u64),
25 Gauge(f64),
26 Histogram { count: u64, sum: f64, buckets: HashMap<String, u64> },
27 Summary { count: u64, sum: f64, quantiles: HashMap<String, f64> },
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Metric {
33 pub name: String,
34 pub metric_type: MetricType,
35 pub value: MetricValue,
36 pub labels: HashMap<String, String>,
37 pub timestamp: chrono::DateTime<chrono::Utc>,
38 pub description: Option<String>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TraceSpan {
44 pub span_id: String,
45 pub trace_id: String,
46 pub parent_span_id: Option<String>,
47 pub name: String,
48 pub start_time: chrono::DateTime<chrono::Utc>,
49 pub end_time: Option<chrono::DateTime<chrono::Utc>>,
50 pub attributes: HashMap<String, serde_json::Value>,
51 pub events: Vec<TraceEvent>,
52 pub status: TraceStatus,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TraceEvent {
58 pub name: String,
59 pub timestamp: chrono::DateTime<chrono::Utc>,
60 pub attributes: HashMap<String, serde_json::Value>,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub enum TraceStatus {
66 Ok,
67 Error { message: String },
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct LogEntry {
73 pub timestamp: chrono::DateTime<chrono::Utc>,
74 pub level: LogLevel,
75 pub message: String,
76 pub context: HashMap<String, serde_json::Value>,
77 pub execution_id: Option<WorkflowExecutionId>,
78 pub activity_id: Option<ActivityExecutionId>,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum LogLevel {
84 Debug,
85 Info,
86 Warn,
87 Error,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct MonitoringConfig {
93 pub enable_metrics: bool,
94 pub enable_tracing: bool,
95 pub enable_logging: bool,
96 pub metrics_interval: std::time::Duration,
97 pub log_level: String,
98 pub exporters: Vec<MonitoringExporter>,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum MonitoringExporter {
104 Prometheus { endpoint: String },
105 Jaeger { endpoint: String },
106 Elasticsearch { endpoint: String, index: String },
107 File { path: String },
108 Stdout,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, Default)]
113pub struct WorkflowStats {
114 pub total_executions: u64,
115 pub successful_executions: u64,
116 pub failed_executions: u64,
117 pub cancelled_executions: u64,
118 pub timed_out_executions: u64,
119 pub avg_execution_time: std::time::Duration,
120 pub min_execution_time: std::time::Duration,
121 pub max_execution_time: std::time::Duration,
122 pub execution_time_histogram: HashMap<String, u64>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize, Default)]
127pub struct ActivityStats {
128 pub activity_name: String,
129 pub total_executions: u64,
130 pub successful_executions: u64,
131 pub failed_executions: u64,
132 pub avg_execution_time: std::time::Duration,
133 pub error_rate: f64,
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SystemHealth {
139 pub timestamp: chrono::DateTime<chrono::Utc>,
140 pub status: HealthStatus,
141 pub components: HashMap<String, ComponentHealth>,
142 pub metrics: SystemMetrics,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub enum HealthStatus {
148 Healthy,
149 Degraded,
150 Unhealthy,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ComponentHealth {
156 pub name: String,
157 pub status: HealthStatus,
158 pub last_check: chrono::DateTime<chrono::Utc>,
159 pub message: Option<String>,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct SystemMetrics {
165 pub cpu_usage: f64,
166 pub memory_usage: f64,
167 pub active_workflows: u64,
168 pub pending_tasks: u64,
169 pub queue_depth: u64,
170 pub throughput: f64, }
172
173pub struct MonitoringManager {
175 config: MonitoringConfig,
176 metrics: RwLock<HashMap<String, Metric>>,
177 traces: RwLock<HashMap<String, Vec<TraceSpan>>>,
178 logs: RwLock<Vec<LogEntry>>,
179 workflow_stats: RwLock<HashMap<String, WorkflowStats>>,
180 activity_stats: RwLock<HashMap<String, ActivityStats>>,
181 exporters: Vec<Box<dyn MonitoringExporterImpl>>,
182}
183
184#[async_trait::async_trait]
185pub trait MonitoringExporterImpl: Send + Sync {
186 async fn export_metrics(&self, metrics: &[Metric]) -> Result<(), MonitoringError>;
187 async fn export_traces(&self, traces: &[TraceSpan]) -> Result<(), MonitoringError>;
188 async fn export_logs(&self, logs: &[LogEntry]) -> Result<(), MonitoringError>;
189}
190
191#[derive(Debug, thiserror::Error)]
192pub enum MonitoringError {
193 #[error("Export failed: {0}")]
194 ExportFailed(String),
195 #[error("Connection error: {0}")]
196 ConnectionError(String),
197 #[error("Configuration error: {0}")]
198 ConfigurationError(String),
199}
200
201impl MonitoringManager {
202 pub fn new(config: MonitoringConfig) -> Self {
203 Self {
204 config,
205 metrics: RwLock::new(HashMap::new()),
206 traces: RwLock::new(HashMap::new()),
207 logs: RwLock::new(Vec::new()),
208 workflow_stats: RwLock::new(HashMap::new()),
209 activity_stats: RwLock::new(HashMap::new()),
210 exporters: Vec::new(),
211 }
212 }
213
214 pub fn add_exporter(&mut self, exporter: Box<dyn MonitoringExporterImpl>) {
216 self.exporters.push(exporter);
217 }
218
219 pub async fn record_metric(&self, metric: Metric) -> Result<(), MonitoringError> {
221 let mut metrics = self.metrics.write().await;
222 metrics.insert(metric.name.clone(), metric.clone());
223
224 if self.config.enable_metrics {
226 for exporter in &self.exporters {
227 exporter.export_metrics(&[metric.clone()]).await?;
228 }
229 }
230
231 Ok(())
232 }
233
234 pub async fn increment_counter(&self, name: &str, labels: HashMap<String, String>, value: u64) -> Result<(), MonitoringError> {
236 let mut metrics = self.metrics.write().await;
237
238 let metric = metrics.entry(name.to_string()).or_insert_with(|| Metric {
239 name: name.to_string(),
240 metric_type: MetricType::Counter,
241 value: MetricValue::Counter(0),
242 labels: labels.clone(),
243 timestamp: chrono::Utc::now(),
244 description: None,
245 });
246
247 if let MetricValue::Counter(ref mut current) = metric.value {
248 *current += value;
249 }
250
251 metric.timestamp = chrono::Utc::now();
252
253 if self.config.enable_metrics {
255 for exporter in &self.exporters {
256 exporter.export_metrics(&[metric.clone()]).await?;
257 }
258 }
259
260 Ok(())
261 }
262
263 pub async fn update_gauge(&self, name: &str, labels: HashMap<String, String>, value: f64) -> Result<(), MonitoringError> {
265 let metric = Metric {
266 name: name.to_string(),
267 metric_type: MetricType::Gauge,
268 value: MetricValue::Gauge(value),
269 labels,
270 timestamp: chrono::Utc::now(),
271 description: None,
272 };
273
274 self.record_metric(metric).await
275 }
276
277 pub async fn record_trace_span(&self, span: TraceSpan) -> Result<(), MonitoringError> {
279 let mut traces = self.traces.write().await;
280 traces.entry(span.trace_id.clone())
281 .or_insert_with(Vec::new)
282 .push(span.clone());
283
284 if self.config.enable_tracing {
286 for exporter in &self.exporters {
287 exporter.export_traces(&[span.clone()]).await?;
288 }
289 }
290
291 Ok(())
292 }
293
294 pub async fn record_log(&self, entry: LogEntry) -> Result<(), MonitoringError> {
296 let mut logs = self.logs.write().await;
297 logs.push(entry.clone());
298
299 if self.config.enable_logging {
301 for exporter in &self.exporters {
302 exporter.export_logs(&[entry.clone()]).await?;
303 }
304 }
305
306 Ok(())
307 }
308
309 pub async fn update_workflow_stats(&self, workflow_id: &str, execution_time: std::time::Duration, status: &ExecutionStatus) -> Result<(), MonitoringError> {
311 let mut stats = self.workflow_stats.write().await;
312 let workflow_stats = stats.entry(workflow_id.to_string()).or_insert_with(WorkflowStats::default);
313
314 workflow_stats.total_executions += 1;
315
316 match status {
317 ExecutionStatus::Completed => workflow_stats.successful_executions += 1,
318 ExecutionStatus::Failed => workflow_stats.failed_executions += 1,
319 ExecutionStatus::Cancelled => workflow_stats.cancelled_executions += 1,
320 ExecutionStatus::TimedOut => workflow_stats.timed_out_executions += 1,
321 _ => {}
322 }
323
324 let total_time = workflow_stats.avg_execution_time * (workflow_stats.total_executions - 1) as u32;
326 workflow_stats.avg_execution_time = (total_time + execution_time) / workflow_stats.total_executions as u32;
327
328 if execution_time < workflow_stats.min_execution_time {
329 workflow_stats.min_execution_time = execution_time;
330 }
331
332 if execution_time > workflow_stats.max_execution_time {
333 workflow_stats.max_execution_time = execution_time;
334 }
335
336 let bucket = format!("{:.0}s", execution_time.as_secs_f64());
338 *workflow_stats.execution_time_histogram.entry(bucket).or_insert(0) += 1;
339
340 Ok(())
341 }
342
343 pub async fn update_activity_stats(&self, activity_name: &str, execution_time: std::time::Duration, success: bool) -> Result<(), MonitoringError> {
345 let mut stats = self.activity_stats.write().await;
346 let activity_stats = stats.entry(activity_name.to_string()).or_insert_with(|| ActivityStats {
347 activity_name: activity_name.to_string(),
348 ..Default::default()
349 });
350
351 activity_stats.total_executions += 1;
352
353 if success {
354 activity_stats.successful_executions += 1;
355 } else {
356 activity_stats.failed_executions += 1;
357 }
358
359 let total_time = activity_stats.avg_execution_time * (activity_stats.total_executions - 1) as u32;
361 activity_stats.avg_execution_time = (total_time + execution_time) / activity_stats.total_executions as u32;
362
363 activity_stats.error_rate = activity_stats.failed_executions as f64 / activity_stats.total_executions as f64;
365
366 Ok(())
367 }
368
369 pub async fn track_workflow_event(&self, execution_id: &WorkflowExecutionId, event_type: ExecutionEventType, metadata: HashMap<String, serde_json::Value>) -> Result<(), MonitoringError> {
371 self.increment_counter(
373 &format!("workflow_events_{:?}", event_type),
374 HashMap::from([("execution_id".to_string(), execution_id.0.clone())]),
375 1
376 ).await?;
377
378 let log_entry = LogEntry {
380 timestamp: chrono::Utc::now(),
381 level: match event_type {
382 ExecutionEventType::WorkflowFailed | ExecutionEventType::ActivityFailed => LogLevel::Error,
383 ExecutionEventType::WorkflowCancelled => LogLevel::Warn,
384 _ => LogLevel::Info,
385 },
386 message: format!("Workflow event: {:?}", event_type),
387 context: metadata,
388 execution_id: Some(execution_id.clone()),
389 activity_id: None,
390 };
391
392 self.record_log(log_entry).await?;
393
394 Ok(())
395 }
396
397 pub async fn track_activity_event(&self, execution_id: &WorkflowExecutionId, activity_id: &ActivityExecutionId, activity_name: &str, success: bool, execution_time: std::time::Duration) -> Result<(), MonitoringError> {
399 let status = if success { "success" } else { "failure" };
401 self.increment_counter(
402 &format!("activity_executions_{}", status),
403 HashMap::from([
404 ("execution_id".to_string(), execution_id.0.clone()),
405 ("activity_name".to_string(), activity_name.to_string()),
406 ]),
407 1
408 ).await?;
409
410 self.update_activity_stats(activity_name, execution_time, success).await?;
412
413 let log_entry = LogEntry {
415 timestamp: chrono::Utc::now(),
416 level: if success { LogLevel::Info } else { LogLevel::Error },
417 message: format!("Activity {} {}", activity_name, if success { "completed" } else { "failed" }),
418 context: HashMap::from([
419 ("execution_time_ms".to_string(), serde_json::json!(execution_time.as_millis())),
420 ("success".to_string(), serde_json::json!(success)),
421 ]),
422 execution_id: Some(execution_id.clone()),
423 activity_id: Some(activity_id.clone()),
424 };
425
426 self.record_log(log_entry).await?;
427
428 Ok(())
429 }
430
431 pub async fn perform_health_check(&self) -> SystemHealth {
433 let timestamp = chrono::Utc::now();
434
435 let components = HashMap::from([
437 ("workflow_engine".to_string(), ComponentHealth {
438 name: "workflow_engine".to_string(),
439 status: HealthStatus::Healthy,
440 last_check: timestamp,
441 message: None,
442 }),
443 ("activity_registry".to_string(), ComponentHealth {
444 name: "activity_registry".to_string(),
445 status: HealthStatus::Healthy,
446 last_check: timestamp,
447 message: None,
448 }),
449 ]);
450
451 let metrics = SystemMetrics {
452 cpu_usage: 0.0, memory_usage: 0.0, active_workflows: 0, pending_tasks: 0, queue_depth: 0, throughput: 0.0, };
459
460 SystemHealth {
461 timestamp,
462 status: HealthStatus::Healthy,
463 components,
464 metrics,
465 }
466 }
467
468 pub async fn get_workflow_stats(&self, workflow_id: &str) -> Option<WorkflowStats> {
470 let stats = self.workflow_stats.read().await;
471 stats.get(workflow_id).cloned()
472 }
473
474 pub async fn get_activity_stats(&self, activity_name: &str) -> Option<ActivityStats> {
476 let stats = self.activity_stats.read().await;
477 stats.get(activity_name).cloned()
478 }
479
480 pub async fn get_all_metrics(&self) -> Vec<Metric> {
482 let metrics = self.metrics.read().await;
483 metrics.values().cloned().collect()
484 }
485
486 pub async fn get_recent_logs(&self, limit: usize) -> Vec<LogEntry> {
488 let logs = self.logs.read().await;
489 logs.iter().rev().take(limit).cloned().collect()
490 }
491}
492
493#[macro_export]
495macro_rules! track_execution {
496 ($monitor:expr, $execution_id:expr, $operation:expr) => {
497 async {
498 let start = std::time::Instant::now();
499 let result = $operation.await;
500 let duration = start.elapsed();
501
502 match &result {
503 Ok(_) => {
504 $monitor.update_workflow_stats(&$execution_id.0, duration, &crate::ir::ExecutionStatus::Completed).await.ok();
505 }
506 Err(_) => {
507 $monitor.update_workflow_stats(&$execution_id.0, duration, &crate::ir::ExecutionStatus::Failed).await.ok();
508 }
509 }
510
511 result
512 }
513 };
514}