Skip to main content

simple_agents_workflow/observability/
metrics.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5/// Worker health state used by metrics adapters.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum WorkerHealthState {
8    Healthy,
9    Degraded,
10    Unhealthy,
11}
12
13/// Prometheus-friendly workflow metrics surface.
14pub trait WorkflowMetrics: Send + Sync {
15    fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration);
16    fn increment_node_success(&self, workflow_name: &str, node_id: &str);
17    fn increment_node_failure(&self, workflow_name: &str, node_id: &str);
18    fn set_queue_depth(&self, queue_name: &str, depth: u64);
19    fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState);
20}
21
22/// No-op metrics adapter.
23#[derive(Debug, Default)]
24pub struct NoopWorkflowMetrics;
25
26impl WorkflowMetrics for NoopWorkflowMetrics {
27    fn observe_node_latency(&self, _workflow_name: &str, _node_id: &str, _latency: Duration) {}
28
29    fn increment_node_success(&self, _workflow_name: &str, _node_id: &str) {}
30
31    fn increment_node_failure(&self, _workflow_name: &str, _node_id: &str) {}
32
33    fn set_queue_depth(&self, _queue_name: &str, _depth: u64) {}
34
35    fn set_worker_health(&self, _worker_id: &str, _health: WorkerHealthState) {}
36}
37
38/// In-memory adapter useful for tests and lightweight local diagnostics.
39#[derive(Debug, Clone, Default)]
40pub struct InMemoryWorkflowMetrics {
41    inner: Arc<Mutex<MetricsState>>,
42}
43
44#[derive(Debug, Default)]
45struct MetricsState {
46    success: BTreeMap<(String, String), u64>,
47    failure: BTreeMap<(String, String), u64>,
48    latency_ms: BTreeMap<(String, String), Vec<u128>>,
49    queue_depth: BTreeMap<String, u64>,
50    worker_health: BTreeMap<String, WorkerHealthState>,
51}
52
53impl InMemoryWorkflowMetrics {
54    pub fn snapshot(&self) -> InMemoryMetricsSnapshot {
55        let state = self
56            .inner
57            .lock()
58            .unwrap_or_else(|poisoned| poisoned.into_inner());
59        InMemoryMetricsSnapshot {
60            success: state.success.clone(),
61            failure: state.failure.clone(),
62            latency_ms: state.latency_ms.clone(),
63            queue_depth: state.queue_depth.clone(),
64            worker_health: state.worker_health.clone(),
65        }
66    }
67}
68
69impl WorkflowMetrics for InMemoryWorkflowMetrics {
70    fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration) {
71        let mut state = self
72            .inner
73            .lock()
74            .unwrap_or_else(|poisoned| poisoned.into_inner());
75        state
76            .latency_ms
77            .entry((workflow_name.to_string(), node_id.to_string()))
78            .or_default()
79            .push(latency.as_millis());
80    }
81
82    fn increment_node_success(&self, workflow_name: &str, node_id: &str) {
83        let mut state = self
84            .inner
85            .lock()
86            .unwrap_or_else(|poisoned| poisoned.into_inner());
87        *state
88            .success
89            .entry((workflow_name.to_string(), node_id.to_string()))
90            .or_insert(0) += 1;
91    }
92
93    fn increment_node_failure(&self, workflow_name: &str, node_id: &str) {
94        let mut state = self
95            .inner
96            .lock()
97            .unwrap_or_else(|poisoned| poisoned.into_inner());
98        *state
99            .failure
100            .entry((workflow_name.to_string(), node_id.to_string()))
101            .or_insert(0) += 1;
102    }
103
104    fn set_queue_depth(&self, queue_name: &str, depth: u64) {
105        let mut state = self
106            .inner
107            .lock()
108            .unwrap_or_else(|poisoned| poisoned.into_inner());
109        state.queue_depth.insert(queue_name.to_string(), depth);
110    }
111
112    fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState) {
113        let mut state = self
114            .inner
115            .lock()
116            .unwrap_or_else(|poisoned| poisoned.into_inner());
117        state.worker_health.insert(worker_id.to_string(), health);
118    }
119}
120
121/// Snapshot view of in-memory metrics state.
122#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct InMemoryMetricsSnapshot {
124    pub success: BTreeMap<(String, String), u64>,
125    pub failure: BTreeMap<(String, String), u64>,
126    pub latency_ms: BTreeMap<(String, String), Vec<u128>>,
127    pub queue_depth: BTreeMap<String, u64>,
128    pub worker_health: BTreeMap<String, WorkerHealthState>,
129}
130
131#[cfg(test)]
132mod tests {
133    use std::time::Duration;
134
135    use super::{InMemoryWorkflowMetrics, WorkerHealthState, WorkflowMetrics};
136
137    #[test]
138    fn records_metrics_in_memory() {
139        let metrics = InMemoryWorkflowMetrics::default();
140        metrics.observe_node_latency("wf", "llm", Duration::from_millis(23));
141        metrics.increment_node_success("wf", "llm");
142        metrics.increment_node_failure("wf", "tool");
143        metrics.set_queue_depth("worker-q", 7);
144        metrics.set_worker_health("worker-1", WorkerHealthState::Degraded);
145
146        let snap = metrics.snapshot();
147        assert_eq!(
148            snap.success.get(&("wf".to_string(), "llm".to_string())),
149            Some(&1)
150        );
151        assert_eq!(
152            snap.failure.get(&("wf".to_string(), "tool".to_string())),
153            Some(&1)
154        );
155        assert_eq!(
156            snap.latency_ms.get(&("wf".to_string(), "llm".to_string())),
157            Some(&vec![23])
158        );
159        assert_eq!(snap.queue_depth.get("worker-q"), Some(&7));
160        assert_eq!(
161            snap.worker_health.get("worker-1"),
162            Some(&WorkerHealthState::Degraded)
163        );
164    }
165}