ceylon_observability/
metrics.rs

1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5pub struct Metrics {
6    // Message metrics
7    pub message_count: AtomicU64,
8    pub message_latency_sum_us: AtomicU64,
9    pub message_latency_count: AtomicU64,
10
11    // Agent execution metrics
12    pub agent_execution_time_sum_us: AtomicU64,
13    pub agent_execution_time_count: AtomicU64,
14
15    // LLM metrics
16    pub llm_tokens_total: AtomicU64,
17    pub llm_latency_sum_us: AtomicU64,
18    pub llm_latency_count: AtomicU64,
19    pub llm_cost_total_us: AtomicU64,
20
21    // Memory metrics
22    pub memory_hits: AtomicU64,
23    pub memory_misses: AtomicU64,
24    pub memory_writes: AtomicU64,
25
26    // Errors
27    pub errors: DashMap<String, AtomicU64>,
28}
29
30impl Metrics {
31    pub fn new() -> Self {
32        Self {
33            message_count: AtomicU64::new(0),
34            message_latency_sum_us: AtomicU64::new(0),
35            message_latency_count: AtomicU64::new(0),
36            agent_execution_time_sum_us: AtomicU64::new(0),
37            agent_execution_time_count: AtomicU64::new(0),
38            llm_tokens_total: AtomicU64::new(0),
39            llm_latency_sum_us: AtomicU64::new(0),
40            llm_latency_count: AtomicU64::new(0),
41            llm_cost_total_us: AtomicU64::new(0),
42            memory_hits: AtomicU64::new(0),
43            memory_misses: AtomicU64::new(0),
44            memory_writes: AtomicU64::new(0),
45            errors: DashMap::new(),
46        }
47    }
48
49    pub fn record_message(&self, latency_us: u64) {
50        self.message_count.fetch_add(1, Ordering::Relaxed);
51        self.message_latency_sum_us
52            .fetch_add(latency_us, Ordering::Relaxed);
53        self.message_latency_count.fetch_add(1, Ordering::Relaxed);
54    }
55
56    pub fn record_agent_execution(&self, duration_us: u64) {
57        self.agent_execution_time_sum_us
58            .fetch_add(duration_us, Ordering::Relaxed);
59        self.agent_execution_time_count
60            .fetch_add(1, Ordering::Relaxed);
61    }
62
63    pub fn record_llm_call(&self, duration_us: u64, tokens: u64, cost_us: u64) {
64        self.llm_latency_sum_us
65            .fetch_add(duration_us, Ordering::Relaxed);
66        self.llm_latency_count.fetch_add(1, Ordering::Relaxed);
67        self.llm_tokens_total.fetch_add(tokens, Ordering::Relaxed);
68        self.llm_cost_total_us.fetch_add(cost_us, Ordering::Relaxed);
69    }
70
71    pub fn record_memory_hit(&self) {
72        self.memory_hits.fetch_add(1, Ordering::Relaxed);
73    }
74
75    pub fn record_memory_miss(&self) {
76        self.memory_misses.fetch_add(1, Ordering::Relaxed);
77    }
78
79    pub fn record_memory_write(&self) {
80        self.memory_writes.fetch_add(1, Ordering::Relaxed);
81    }
82
83    pub fn record_error(&self, error_type: &str) {
84        self.errors
85            .entry(error_type.to_string())
86            .and_modify(|c| {
87                c.fetch_add(1, Ordering::Relaxed);
88            })
89            .or_insert(AtomicU64::new(1));
90    }
91
92    pub fn snapshot(&self) -> MetricsSnapshot {
93        MetricsSnapshot {
94            message_throughput: self.message_count.load(Ordering::Relaxed),
95            avg_message_latency_us: self.avg(
96                self.message_latency_sum_us.load(Ordering::Relaxed),
97                self.message_latency_count.load(Ordering::Relaxed),
98            ),
99            avg_agent_execution_time_us: self.avg(
100                self.agent_execution_time_sum_us.load(Ordering::Relaxed),
101                self.agent_execution_time_count.load(Ordering::Relaxed),
102            ),
103            total_llm_tokens: self.llm_tokens_total.load(Ordering::Relaxed),
104            avg_llm_latency_us: self.avg(
105                self.llm_latency_sum_us.load(Ordering::Relaxed),
106                self.llm_latency_count.load(Ordering::Relaxed),
107            ),
108            total_llm_cost_us: self.llm_cost_total_us.load(Ordering::Relaxed),
109            memory_hits: self.memory_hits.load(Ordering::Relaxed),
110            memory_misses: self.memory_misses.load(Ordering::Relaxed),
111            memory_writes: self.memory_writes.load(Ordering::Relaxed),
112            errors: self
113                .errors
114                .iter()
115                .map(|r| (r.key().clone(), r.value().load(Ordering::Relaxed)))
116                .collect(),
117        }
118    }
119
120    fn avg(&self, sum: u64, count: u64) -> f64 {
121        if count == 0 {
122            0.0
123        } else {
124            sum as f64 / count as f64
125        }
126    }
127}
128
129impl Default for Metrics {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135#[derive(Debug, Clone, serde::Serialize)]
136pub struct MetricsSnapshot {
137    pub message_throughput: u64,
138    pub avg_message_latency_us: f64,
139    pub avg_agent_execution_time_us: f64,
140    pub total_llm_tokens: u64,
141    pub avg_llm_latency_us: f64,
142    pub total_llm_cost_us: u64,
143    pub memory_hits: u64,
144    pub memory_misses: u64,
145    pub memory_writes: u64,
146    pub errors: std::collections::HashMap<String, u64>,
147}
148
149// Global metrics instance
150use std::sync::OnceLock;
151
152pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
153
154pub fn metrics() -> &'static Arc<Metrics> {
155    GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
156}