ceylon_observability/
metrics.rs1use dashmap::DashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Arc;
4
5pub struct Metrics {
6 pub message_count: AtomicU64,
8 pub message_latency_sum_us: AtomicU64,
9 pub message_latency_count: AtomicU64,
10
11 pub agent_execution_time_sum_us: AtomicU64,
13 pub agent_execution_time_count: AtomicU64,
14
15 pub llm_tokens_total: AtomicU64,
17 pub llm_latency_sum_us: AtomicU64,
18 pub llm_latency_count: AtomicU64,
19 pub llm_cost_total_us: AtomicU64,
20
21 pub memory_hits: AtomicU64,
23 pub memory_misses: AtomicU64,
24 pub memory_writes: AtomicU64,
25
26 pub errors: DashMap<String, AtomicU64>,
28}
29
30impl Metrics {
31 pub fn new() -> Self {
32 Self {
33 message_count: AtomicU64::new(0),
34 message_latency_sum_us: AtomicU64::new(0),
35 message_latency_count: AtomicU64::new(0),
36 agent_execution_time_sum_us: AtomicU64::new(0),
37 agent_execution_time_count: AtomicU64::new(0),
38 llm_tokens_total: AtomicU64::new(0),
39 llm_latency_sum_us: AtomicU64::new(0),
40 llm_latency_count: AtomicU64::new(0),
41 llm_cost_total_us: AtomicU64::new(0),
42 memory_hits: AtomicU64::new(0),
43 memory_misses: AtomicU64::new(0),
44 memory_writes: AtomicU64::new(0),
45 errors: DashMap::new(),
46 }
47 }
48
49 pub fn record_message(&self, latency_us: u64) {
50 self.message_count.fetch_add(1, Ordering::Relaxed);
51 self.message_latency_sum_us
52 .fetch_add(latency_us, Ordering::Relaxed);
53 self.message_latency_count.fetch_add(1, Ordering::Relaxed);
54 }
55
56 pub fn record_agent_execution(&self, duration_us: u64) {
57 self.agent_execution_time_sum_us
58 .fetch_add(duration_us, Ordering::Relaxed);
59 self.agent_execution_time_count
60 .fetch_add(1, Ordering::Relaxed);
61 }
62
63 pub fn record_llm_call(&self, duration_us: u64, tokens: u64, cost_us: u64) {
64 self.llm_latency_sum_us
65 .fetch_add(duration_us, Ordering::Relaxed);
66 self.llm_latency_count.fetch_add(1, Ordering::Relaxed);
67 self.llm_tokens_total.fetch_add(tokens, Ordering::Relaxed);
68 self.llm_cost_total_us.fetch_add(cost_us, Ordering::Relaxed);
69 }
70
71 pub fn record_memory_hit(&self) {
72 self.memory_hits.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn record_memory_miss(&self) {
76 self.memory_misses.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn record_memory_write(&self) {
80 self.memory_writes.fetch_add(1, Ordering::Relaxed);
81 }
82
83 pub fn record_error(&self, error_type: &str) {
84 self.errors
85 .entry(error_type.to_string())
86 .and_modify(|c| {
87 c.fetch_add(1, Ordering::Relaxed);
88 })
89 .or_insert(AtomicU64::new(1));
90 }
91
92 pub fn snapshot(&self) -> MetricsSnapshot {
93 MetricsSnapshot {
94 message_throughput: self.message_count.load(Ordering::Relaxed),
95 avg_message_latency_us: self.avg(
96 self.message_latency_sum_us.load(Ordering::Relaxed),
97 self.message_latency_count.load(Ordering::Relaxed),
98 ),
99 avg_agent_execution_time_us: self.avg(
100 self.agent_execution_time_sum_us.load(Ordering::Relaxed),
101 self.agent_execution_time_count.load(Ordering::Relaxed),
102 ),
103 total_llm_tokens: self.llm_tokens_total.load(Ordering::Relaxed),
104 avg_llm_latency_us: self.avg(
105 self.llm_latency_sum_us.load(Ordering::Relaxed),
106 self.llm_latency_count.load(Ordering::Relaxed),
107 ),
108 total_llm_cost_us: self.llm_cost_total_us.load(Ordering::Relaxed),
109 memory_hits: self.memory_hits.load(Ordering::Relaxed),
110 memory_misses: self.memory_misses.load(Ordering::Relaxed),
111 memory_writes: self.memory_writes.load(Ordering::Relaxed),
112 errors: self
113 .errors
114 .iter()
115 .map(|r| (r.key().clone(), r.value().load(Ordering::Relaxed)))
116 .collect(),
117 }
118 }
119
120 fn avg(&self, sum: u64, count: u64) -> f64 {
121 if count == 0 {
122 0.0
123 } else {
124 sum as f64 / count as f64
125 }
126 }
127}
128
129impl Default for Metrics {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135#[derive(Debug, Clone, serde::Serialize)]
136pub struct MetricsSnapshot {
137 pub message_throughput: u64,
138 pub avg_message_latency_us: f64,
139 pub avg_agent_execution_time_us: f64,
140 pub total_llm_tokens: u64,
141 pub avg_llm_latency_us: f64,
142 pub total_llm_cost_us: u64,
143 pub memory_hits: u64,
144 pub memory_misses: u64,
145 pub memory_writes: u64,
146 pub errors: std::collections::HashMap<String, u64>,
147}
148
149use std::sync::OnceLock;
151
152pub static GLOBAL_METRICS: OnceLock<Arc<Metrics>> = OnceLock::new();
153
154pub fn metrics() -> &'static Arc<Metrics> {
155 GLOBAL_METRICS.get_or_init(|| Arc::new(Metrics::new()))
156}