simple_agents_workflow/observability/
metrics.rs1use std::collections::BTreeMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum WorkerHealthState {
8 Healthy,
9 Degraded,
10 Unhealthy,
11}
12
13pub trait WorkflowMetrics: Send + Sync {
15 fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration);
16 fn increment_node_success(&self, workflow_name: &str, node_id: &str);
17 fn increment_node_failure(&self, workflow_name: &str, node_id: &str);
18 fn set_queue_depth(&self, queue_name: &str, depth: u64);
19 fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState);
20}
21
22#[derive(Debug, Default)]
24pub struct NoopWorkflowMetrics;
25
26impl WorkflowMetrics for NoopWorkflowMetrics {
27 fn observe_node_latency(&self, _workflow_name: &str, _node_id: &str, _latency: Duration) {}
28
29 fn increment_node_success(&self, _workflow_name: &str, _node_id: &str) {}
30
31 fn increment_node_failure(&self, _workflow_name: &str, _node_id: &str) {}
32
33 fn set_queue_depth(&self, _queue_name: &str, _depth: u64) {}
34
35 fn set_worker_health(&self, _worker_id: &str, _health: WorkerHealthState) {}
36}
37
38#[derive(Debug, Clone, Default)]
40pub struct InMemoryWorkflowMetrics {
41 inner: Arc<Mutex<MetricsState>>,
42}
43
44#[derive(Debug, Default)]
45struct MetricsState {
46 success: BTreeMap<(String, String), u64>,
47 failure: BTreeMap<(String, String), u64>,
48 latency_ms: BTreeMap<(String, String), Vec<u128>>,
49 queue_depth: BTreeMap<String, u64>,
50 worker_health: BTreeMap<String, WorkerHealthState>,
51}
52
53impl InMemoryWorkflowMetrics {
54 pub fn snapshot(&self) -> InMemoryMetricsSnapshot {
55 let state = self
56 .inner
57 .lock()
58 .unwrap_or_else(|poisoned| poisoned.into_inner());
59 InMemoryMetricsSnapshot {
60 success: state.success.clone(),
61 failure: state.failure.clone(),
62 latency_ms: state.latency_ms.clone(),
63 queue_depth: state.queue_depth.clone(),
64 worker_health: state.worker_health.clone(),
65 }
66 }
67}
68
69impl WorkflowMetrics for InMemoryWorkflowMetrics {
70 fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration) {
71 let mut state = self
72 .inner
73 .lock()
74 .unwrap_or_else(|poisoned| poisoned.into_inner());
75 state
76 .latency_ms
77 .entry((workflow_name.to_string(), node_id.to_string()))
78 .or_default()
79 .push(latency.as_millis());
80 }
81
82 fn increment_node_success(&self, workflow_name: &str, node_id: &str) {
83 let mut state = self
84 .inner
85 .lock()
86 .unwrap_or_else(|poisoned| poisoned.into_inner());
87 *state
88 .success
89 .entry((workflow_name.to_string(), node_id.to_string()))
90 .or_insert(0) += 1;
91 }
92
93 fn increment_node_failure(&self, workflow_name: &str, node_id: &str) {
94 let mut state = self
95 .inner
96 .lock()
97 .unwrap_or_else(|poisoned| poisoned.into_inner());
98 *state
99 .failure
100 .entry((workflow_name.to_string(), node_id.to_string()))
101 .or_insert(0) += 1;
102 }
103
104 fn set_queue_depth(&self, queue_name: &str, depth: u64) {
105 let mut state = self
106 .inner
107 .lock()
108 .unwrap_or_else(|poisoned| poisoned.into_inner());
109 state.queue_depth.insert(queue_name.to_string(), depth);
110 }
111
112 fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState) {
113 let mut state = self
114 .inner
115 .lock()
116 .unwrap_or_else(|poisoned| poisoned.into_inner());
117 state.worker_health.insert(worker_id.to_string(), health);
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub struct InMemoryMetricsSnapshot {
124 pub success: BTreeMap<(String, String), u64>,
125 pub failure: BTreeMap<(String, String), u64>,
126 pub latency_ms: BTreeMap<(String, String), Vec<u128>>,
127 pub queue_depth: BTreeMap<String, u64>,
128 pub worker_health: BTreeMap<String, WorkerHealthState>,
129}
130
131#[cfg(test)]
132mod tests {
133 use std::time::Duration;
134
135 use super::{InMemoryWorkflowMetrics, WorkerHealthState, WorkflowMetrics};
136
137 #[test]
138 fn records_metrics_in_memory() {
139 let metrics = InMemoryWorkflowMetrics::default();
140 metrics.observe_node_latency("wf", "llm", Duration::from_millis(23));
141 metrics.increment_node_success("wf", "llm");
142 metrics.increment_node_failure("wf", "tool");
143 metrics.set_queue_depth("worker-q", 7);
144 metrics.set_worker_health("worker-1", WorkerHealthState::Degraded);
145
146 let snap = metrics.snapshot();
147 assert_eq!(
148 snap.success.get(&("wf".to_string(), "llm".to_string())),
149 Some(&1)
150 );
151 assert_eq!(
152 snap.failure.get(&("wf".to_string(), "tool".to_string())),
153 Some(&1)
154 );
155 assert_eq!(
156 snap.latency_ms.get(&("wf".to_string(), "llm".to_string())),
157 Some(&vec![23])
158 );
159 assert_eq!(snap.queue_depth.get("worker-q"), Some(&7));
160 assert_eq!(
161 snap.worker_health.get("worker-1"),
162 Some(&WorkerHealthState::Degraded)
163 );
164 }
165}