kotoba_workflow/
monitoring.rs

1//! Monitoring and Observability - Phase 3
2//!
3//! ワークフロー実行の監視、観測性、メトリクス収集を提供します。
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::RwLock;
8use serde::{Deserialize, Serialize};
9
10use crate::ir::{WorkflowExecutionId, ActivityExecutionId, ExecutionStatus, ExecutionEventType};
11
12/// メトリクス種別
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum MetricType {
15    Counter,
16    Gauge,
17    Histogram,
18    Summary,
19}
20
21/// メトリクス値
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub enum MetricValue {
24    Counter(u64),
25    Gauge(f64),
26    Histogram { count: u64, sum: f64, buckets: HashMap<String, u64> },
27    Summary { count: u64, sum: f64, quantiles: HashMap<String, f64> },
28}
29
30/// メトリクスデータ
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct Metric {
33    pub name: String,
34    pub metric_type: MetricType,
35    pub value: MetricValue,
36    pub labels: HashMap<String, String>,
37    pub timestamp: chrono::DateTime<chrono::Utc>,
38    pub description: Option<String>,
39}
40
41/// トレース情報
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct TraceSpan {
44    pub span_id: String,
45    pub trace_id: String,
46    pub parent_span_id: Option<String>,
47    pub name: String,
48    pub start_time: chrono::DateTime<chrono::Utc>,
49    pub end_time: Option<chrono::DateTime<chrono::Utc>>,
50    pub attributes: HashMap<String, serde_json::Value>,
51    pub events: Vec<TraceEvent>,
52    pub status: TraceStatus,
53}
54
55/// トレースイベント
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct TraceEvent {
58    pub name: String,
59    pub timestamp: chrono::DateTime<chrono::Utc>,
60    pub attributes: HashMap<String, serde_json::Value>,
61}
62
63/// トレースステータス
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub enum TraceStatus {
66    Ok,
67    Error { message: String },
68}
69
70/// ログエントリ
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct LogEntry {
73    pub timestamp: chrono::DateTime<chrono::Utc>,
74    pub level: LogLevel,
75    pub message: String,
76    pub context: HashMap<String, serde_json::Value>,
77    pub execution_id: Option<WorkflowExecutionId>,
78    pub activity_id: Option<ActivityExecutionId>,
79}
80
81/// ログレベル
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub enum LogLevel {
84    Debug,
85    Info,
86    Warn,
87    Error,
88}
89
90/// 監視設定
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct MonitoringConfig {
93    pub enable_metrics: bool,
94    pub enable_tracing: bool,
95    pub enable_logging: bool,
96    pub metrics_interval: std::time::Duration,
97    pub log_level: String,
98    pub exporters: Vec<MonitoringExporter>,
99}
100
101/// 監視エクスポーター設定
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub enum MonitoringExporter {
104    Prometheus { endpoint: String },
105    Jaeger { endpoint: String },
106    Elasticsearch { endpoint: String, index: String },
107    File { path: String },
108    Stdout,
109}
110
111/// ワークフロー実行統計
112#[derive(Debug, Clone, Serialize, Deserialize, Default)]
113pub struct WorkflowStats {
114    pub total_executions: u64,
115    pub successful_executions: u64,
116    pub failed_executions: u64,
117    pub cancelled_executions: u64,
118    pub timed_out_executions: u64,
119    pub avg_execution_time: std::time::Duration,
120    pub min_execution_time: std::time::Duration,
121    pub max_execution_time: std::time::Duration,
122    pub execution_time_histogram: HashMap<String, u64>,
123}
124
125/// Activity実行統計
126#[derive(Debug, Clone, Serialize, Deserialize, Default)]
127pub struct ActivityStats {
128    pub activity_name: String,
129    pub total_executions: u64,
130    pub successful_executions: u64,
131    pub failed_executions: u64,
132    pub avg_execution_time: std::time::Duration,
133    pub error_rate: f64,
134}
135
136/// システムヘルス情報
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SystemHealth {
139    pub timestamp: chrono::DateTime<chrono::Utc>,
140    pub status: HealthStatus,
141    pub components: HashMap<String, ComponentHealth>,
142    pub metrics: SystemMetrics,
143}
144
145/// ヘルスステータス
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub enum HealthStatus {
148    Healthy,
149    Degraded,
150    Unhealthy,
151}
152
153/// コンポーネントヘルス
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ComponentHealth {
156    pub name: String,
157    pub status: HealthStatus,
158    pub last_check: chrono::DateTime<chrono::Utc>,
159    pub message: Option<String>,
160}
161
162/// システムメトリクス
163#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct SystemMetrics {
165    pub cpu_usage: f64,
166    pub memory_usage: f64,
167    pub active_workflows: u64,
168    pub pending_tasks: u64,
169    pub queue_depth: u64,
170    pub throughput: f64, // executions per second
171}
172
173/// 監視マネージャー
174pub struct MonitoringManager {
175    config: MonitoringConfig,
176    metrics: RwLock<HashMap<String, Metric>>,
177    traces: RwLock<HashMap<String, Vec<TraceSpan>>>,
178    logs: RwLock<Vec<LogEntry>>,
179    workflow_stats: RwLock<HashMap<String, WorkflowStats>>,
180    activity_stats: RwLock<HashMap<String, ActivityStats>>,
181    exporters: Vec<Box<dyn MonitoringExporterImpl>>,
182}
183
184#[async_trait::async_trait]
185pub trait MonitoringExporterImpl: Send + Sync {
186    async fn export_metrics(&self, metrics: &[Metric]) -> Result<(), MonitoringError>;
187    async fn export_traces(&self, traces: &[TraceSpan]) -> Result<(), MonitoringError>;
188    async fn export_logs(&self, logs: &[LogEntry]) -> Result<(), MonitoringError>;
189}
190
191#[derive(Debug, thiserror::Error)]
192pub enum MonitoringError {
193    #[error("Export failed: {0}")]
194    ExportFailed(String),
195    #[error("Connection error: {0}")]
196    ConnectionError(String),
197    #[error("Configuration error: {0}")]
198    ConfigurationError(String),
199}
200
201impl MonitoringManager {
202    pub fn new(config: MonitoringConfig) -> Self {
203        Self {
204            config,
205            metrics: RwLock::new(HashMap::new()),
206            traces: RwLock::new(HashMap::new()),
207            logs: RwLock::new(Vec::new()),
208            workflow_stats: RwLock::new(HashMap::new()),
209            activity_stats: RwLock::new(HashMap::new()),
210            exporters: Vec::new(),
211        }
212    }
213
214    /// エクスポーターを追加
215    pub fn add_exporter(&mut self, exporter: Box<dyn MonitoringExporterImpl>) {
216        self.exporters.push(exporter);
217    }
218
219    /// メトリクスを記録
220    pub async fn record_metric(&self, metric: Metric) -> Result<(), MonitoringError> {
221        let mut metrics = self.metrics.write().await;
222        metrics.insert(metric.name.clone(), metric.clone());
223
224        // エクスポーターに送信
225        if self.config.enable_metrics {
226            for exporter in &self.exporters {
227                exporter.export_metrics(&[metric.clone()]).await?;
228            }
229        }
230
231        Ok(())
232    }
233
234    /// カウンターメトリクスをインクリメント
235    pub async fn increment_counter(&self, name: &str, labels: HashMap<String, String>, value: u64) -> Result<(), MonitoringError> {
236        let mut metrics = self.metrics.write().await;
237
238        let metric = metrics.entry(name.to_string()).or_insert_with(|| Metric {
239            name: name.to_string(),
240            metric_type: MetricType::Counter,
241            value: MetricValue::Counter(0),
242            labels: labels.clone(),
243            timestamp: chrono::Utc::now(),
244            description: None,
245        });
246
247        if let MetricValue::Counter(ref mut current) = metric.value {
248            *current += value;
249        }
250
251        metric.timestamp = chrono::Utc::now();
252
253        // エクスポーターに送信
254        if self.config.enable_metrics {
255            for exporter in &self.exporters {
256                exporter.export_metrics(&[metric.clone()]).await?;
257            }
258        }
259
260        Ok(())
261    }
262
263    /// ゲージメトリクスを更新
264    pub async fn update_gauge(&self, name: &str, labels: HashMap<String, String>, value: f64) -> Result<(), MonitoringError> {
265        let metric = Metric {
266            name: name.to_string(),
267            metric_type: MetricType::Gauge,
268            value: MetricValue::Gauge(value),
269            labels,
270            timestamp: chrono::Utc::now(),
271            description: None,
272        };
273
274        self.record_metric(metric).await
275    }
276
277    /// トレーススパンを記録
278    pub async fn record_trace_span(&self, span: TraceSpan) -> Result<(), MonitoringError> {
279        let mut traces = self.traces.write().await;
280        traces.entry(span.trace_id.clone())
281            .or_insert_with(Vec::new)
282            .push(span.clone());
283
284        // エクスポーターに送信
285        if self.config.enable_tracing {
286            for exporter in &self.exporters {
287                exporter.export_traces(&[span.clone()]).await?;
288            }
289        }
290
291        Ok(())
292    }
293
294    /// ログエントリを記録
295    pub async fn record_log(&self, entry: LogEntry) -> Result<(), MonitoringError> {
296        let mut logs = self.logs.write().await;
297        logs.push(entry.clone());
298
299        // エクスポーターに送信
300        if self.config.enable_logging {
301            for exporter in &self.exporters {
302                exporter.export_logs(&[entry.clone()]).await?;
303            }
304        }
305
306        Ok(())
307    }
308
309    /// ワークフロー実行統計を更新
310    pub async fn update_workflow_stats(&self, workflow_id: &str, execution_time: std::time::Duration, status: &ExecutionStatus) -> Result<(), MonitoringError> {
311        let mut stats = self.workflow_stats.write().await;
312        let workflow_stats = stats.entry(workflow_id.to_string()).or_insert_with(WorkflowStats::default);
313
314        workflow_stats.total_executions += 1;
315
316        match status {
317            ExecutionStatus::Completed => workflow_stats.successful_executions += 1,
318            ExecutionStatus::Failed => workflow_stats.failed_executions += 1,
319            ExecutionStatus::Cancelled => workflow_stats.cancelled_executions += 1,
320            ExecutionStatus::TimedOut => workflow_stats.timed_out_executions += 1,
321            _ => {}
322        }
323
324        // 実行時間を更新
325        let total_time = workflow_stats.avg_execution_time * (workflow_stats.total_executions - 1) as u32;
326        workflow_stats.avg_execution_time = (total_time + execution_time) / workflow_stats.total_executions as u32;
327
328        if execution_time < workflow_stats.min_execution_time {
329            workflow_stats.min_execution_time = execution_time;
330        }
331
332        if execution_time > workflow_stats.max_execution_time {
333            workflow_stats.max_execution_time = execution_time;
334        }
335
336        // ヒストグラムを更新
337        let bucket = format!("{:.0}s", execution_time.as_secs_f64());
338        *workflow_stats.execution_time_histogram.entry(bucket).or_insert(0) += 1;
339
340        Ok(())
341    }
342
343    /// Activity実行統計を更新
344    pub async fn update_activity_stats(&self, activity_name: &str, execution_time: std::time::Duration, success: bool) -> Result<(), MonitoringError> {
345        let mut stats = self.activity_stats.write().await;
346        let activity_stats = stats.entry(activity_name.to_string()).or_insert_with(|| ActivityStats {
347            activity_name: activity_name.to_string(),
348            ..Default::default()
349        });
350
351        activity_stats.total_executions += 1;
352
353        if success {
354            activity_stats.successful_executions += 1;
355        } else {
356            activity_stats.failed_executions += 1;
357        }
358
359        // 平均実行時間を更新
360        let total_time = activity_stats.avg_execution_time * (activity_stats.total_executions - 1) as u32;
361        activity_stats.avg_execution_time = (total_time + execution_time) / activity_stats.total_executions as u32;
362
363        // エラーレートを更新
364        activity_stats.error_rate = activity_stats.failed_executions as f64 / activity_stats.total_executions as f64;
365
366        Ok(())
367    }
368
369    /// ワークフロー実行イベントを監視
370    pub async fn track_workflow_event(&self, execution_id: &WorkflowExecutionId, event_type: ExecutionEventType, metadata: HashMap<String, serde_json::Value>) -> Result<(), MonitoringError> {
371        // メトリクスを記録
372        self.increment_counter(
373            &format!("workflow_events_{:?}", event_type),
374            HashMap::from([("execution_id".to_string(), execution_id.0.clone())]),
375            1
376        ).await?;
377
378        // ログを記録
379        let log_entry = LogEntry {
380            timestamp: chrono::Utc::now(),
381            level: match event_type {
382                ExecutionEventType::WorkflowFailed | ExecutionEventType::ActivityFailed => LogLevel::Error,
383                ExecutionEventType::WorkflowCancelled => LogLevel::Warn,
384                _ => LogLevel::Info,
385            },
386            message: format!("Workflow event: {:?}", event_type),
387            context: metadata,
388            execution_id: Some(execution_id.clone()),
389            activity_id: None,
390        };
391
392        self.record_log(log_entry).await?;
393
394        Ok(())
395    }
396
397    /// Activity実行イベントを監視
398    pub async fn track_activity_event(&self, execution_id: &WorkflowExecutionId, activity_id: &ActivityExecutionId, activity_name: &str, success: bool, execution_time: std::time::Duration) -> Result<(), MonitoringError> {
399        // メトリクスを記録
400        let status = if success { "success" } else { "failure" };
401        self.increment_counter(
402            &format!("activity_executions_{}", status),
403            HashMap::from([
404                ("execution_id".to_string(), execution_id.0.clone()),
405                ("activity_name".to_string(), activity_name.to_string()),
406            ]),
407            1
408        ).await?;
409
410        // Activity統計を更新
411        self.update_activity_stats(activity_name, execution_time, success).await?;
412
413        // ログを記録
414        let log_entry = LogEntry {
415            timestamp: chrono::Utc::now(),
416            level: if success { LogLevel::Info } else { LogLevel::Error },
417            message: format!("Activity {} {}", activity_name, if success { "completed" } else { "failed" }),
418            context: HashMap::from([
419                ("execution_time_ms".to_string(), serde_json::json!(execution_time.as_millis())),
420                ("success".to_string(), serde_json::json!(success)),
421            ]),
422            execution_id: Some(execution_id.clone()),
423            activity_id: Some(activity_id.clone()),
424        };
425
426        self.record_log(log_entry).await?;
427
428        Ok(())
429    }
430
431    /// システムヘルスチェックを実行
432    pub async fn perform_health_check(&self) -> SystemHealth {
433        let timestamp = chrono::Utc::now();
434
435        // TODO: 実際のヘルスチェックロジックを実装
436        let components = HashMap::from([
437            ("workflow_engine".to_string(), ComponentHealth {
438                name: "workflow_engine".to_string(),
439                status: HealthStatus::Healthy,
440                last_check: timestamp,
441                message: None,
442            }),
443            ("activity_registry".to_string(), ComponentHealth {
444                name: "activity_registry".to_string(),
445                status: HealthStatus::Healthy,
446                last_check: timestamp,
447                message: None,
448            }),
449        ]);
450
451        let metrics = SystemMetrics {
452            cpu_usage: 0.0, // TODO: 実際のCPU使用率を取得
453            memory_usage: 0.0, // TODO: 実際のメモリ使用率を取得
454            active_workflows: 0, // TODO: 実行中のワークフロー数を取得
455            pending_tasks: 0, // TODO: 保留中のタスク数を取得
456            queue_depth: 0, // TODO: キュー深度を取得
457            throughput: 0.0, // TODO: スループットを計算
458        };
459
460        SystemHealth {
461            timestamp,
462            status: HealthStatus::Healthy,
463            components,
464            metrics,
465        }
466    }
467
468    /// ワークフロー統計を取得
469    pub async fn get_workflow_stats(&self, workflow_id: &str) -> Option<WorkflowStats> {
470        let stats = self.workflow_stats.read().await;
471        stats.get(workflow_id).cloned()
472    }
473
474    /// Activity統計を取得
475    pub async fn get_activity_stats(&self, activity_name: &str) -> Option<ActivityStats> {
476        let stats = self.activity_stats.read().await;
477        stats.get(activity_name).cloned()
478    }
479
480    /// 全てのメトリクスを取得
481    pub async fn get_all_metrics(&self) -> Vec<Metric> {
482        let metrics = self.metrics.read().await;
483        metrics.values().cloned().collect()
484    }
485
486    /// 最近のログを取得
487    pub async fn get_recent_logs(&self, limit: usize) -> Vec<LogEntry> {
488        let logs = self.logs.read().await;
489        logs.iter().rev().take(limit).cloned().collect()
490    }
491}
492
493/// ヘルパーマクロ for monitoring
494#[macro_export]
495macro_rules! track_execution {
496    ($monitor:expr, $execution_id:expr, $operation:expr) => {
497        async {
498            let start = std::time::Instant::now();
499            let result = $operation.await;
500            let duration = start.elapsed();
501
502            match &result {
503                Ok(_) => {
504                    $monitor.update_workflow_stats(&$execution_id.0, duration, &crate::ir::ExecutionStatus::Completed).await.ok();
505                }
506                Err(_) => {
507                    $monitor.update_workflow_stats(&$execution_id.0, duration, &crate::ir::ExecutionStatus::Failed).await.ok();
508                }
509            }
510
511            result
512        }
513    };
514}