use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerHealthState {
Healthy,
Degraded,
Unhealthy,
}
pub trait WorkflowMetrics: Send + Sync {
fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration);
fn increment_node_success(&self, workflow_name: &str, node_id: &str);
fn increment_node_failure(&self, workflow_name: &str, node_id: &str);
fn set_queue_depth(&self, queue_name: &str, depth: u64);
fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState);
}
#[derive(Debug, Default)]
pub struct NoopWorkflowMetrics;
impl WorkflowMetrics for NoopWorkflowMetrics {
fn observe_node_latency(&self, _workflow_name: &str, _node_id: &str, _latency: Duration) {}
fn increment_node_success(&self, _workflow_name: &str, _node_id: &str) {}
fn increment_node_failure(&self, _workflow_name: &str, _node_id: &str) {}
fn set_queue_depth(&self, _queue_name: &str, _depth: u64) {}
fn set_worker_health(&self, _worker_id: &str, _health: WorkerHealthState) {}
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryWorkflowMetrics {
inner: Arc<Mutex<MetricsState>>,
}
#[derive(Debug, Default)]
struct MetricsState {
success: BTreeMap<(String, String), u64>,
failure: BTreeMap<(String, String), u64>,
latency_ms: BTreeMap<(String, String), Vec<u128>>,
queue_depth: BTreeMap<String, u64>,
worker_health: BTreeMap<String, WorkerHealthState>,
}
impl InMemoryWorkflowMetrics {
pub fn snapshot(&self) -> InMemoryMetricsSnapshot {
let state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
InMemoryMetricsSnapshot {
success: state.success.clone(),
failure: state.failure.clone(),
latency_ms: state.latency_ms.clone(),
queue_depth: state.queue_depth.clone(),
worker_health: state.worker_health.clone(),
}
}
}
impl WorkflowMetrics for InMemoryWorkflowMetrics {
fn observe_node_latency(&self, workflow_name: &str, node_id: &str, latency: Duration) {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state
.latency_ms
.entry((workflow_name.to_string(), node_id.to_string()))
.or_default()
.push(latency.as_millis());
}
fn increment_node_success(&self, workflow_name: &str, node_id: &str) {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*state
.success
.entry((workflow_name.to_string(), node_id.to_string()))
.or_insert(0) += 1;
}
fn increment_node_failure(&self, workflow_name: &str, node_id: &str) {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
*state
.failure
.entry((workflow_name.to_string(), node_id.to_string()))
.or_insert(0) += 1;
}
fn set_queue_depth(&self, queue_name: &str, depth: u64) {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.queue_depth.insert(queue_name.to_string(), depth);
}
fn set_worker_health(&self, worker_id: &str, health: WorkerHealthState) {
let mut state = self
.inner
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
state.worker_health.insert(worker_id.to_string(), health);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InMemoryMetricsSnapshot {
pub success: BTreeMap<(String, String), u64>,
pub failure: BTreeMap<(String, String), u64>,
pub latency_ms: BTreeMap<(String, String), Vec<u128>>,
pub queue_depth: BTreeMap<String, u64>,
pub worker_health: BTreeMap<String, WorkerHealthState>,
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::{InMemoryWorkflowMetrics, WorkerHealthState, WorkflowMetrics};
#[test]
fn records_metrics_in_memory() {
let metrics = InMemoryWorkflowMetrics::default();
metrics.observe_node_latency("wf", "llm", Duration::from_millis(23));
metrics.increment_node_success("wf", "llm");
metrics.increment_node_failure("wf", "tool");
metrics.set_queue_depth("worker-q", 7);
metrics.set_worker_health("worker-1", WorkerHealthState::Degraded);
let snap = metrics.snapshot();
assert_eq!(
snap.success.get(&("wf".to_string(), "llm".to_string())),
Some(&1)
);
assert_eq!(
snap.failure.get(&("wf".to_string(), "tool".to_string())),
Some(&1)
);
assert_eq!(
snap.latency_ms.get(&("wf".to_string(), "llm".to_string())),
Some(&vec![23])
);
assert_eq!(snap.queue_depth.get("worker-q"), Some(&7));
assert_eq!(
snap.worker_health.get("worker-1"),
Some(&WorkerHealthState::Degraded)
);
}
}