claude_agent/observability/
metrics.rs1use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
7use std::time::Duration;
8
9use rust_decimal::Decimal;
10
11use crate::budget::COST_SCALE_FACTOR;
12
13#[cfg(feature = "otel")]
14use super::otel::{OtelConfig, OtelMetricsBridge, SERVICE_NAME_DEFAULT};
15#[cfg(feature = "otel")]
16use opentelemetry::global;
17
18#[derive(Clone, Default)]
20pub struct MetricsConfig {
21 pub enabled: bool,
22 pub export_interval: Option<Duration>,
23}
24
25impl MetricsConfig {
26 pub fn new() -> Self {
27 Self {
28 enabled: true,
29 export_interval: Some(Duration::from_secs(60)),
30 }
31 }
32
33 pub fn disabled() -> Self {
34 Self {
35 enabled: false,
36 export_interval: None,
37 }
38 }
39}
40
41#[derive(Default)]
43pub struct Counter {
44 value: AtomicU64,
45}
46
47impl Counter {
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 pub fn inc(&self) {
53 self.value.fetch_add(1, Ordering::Relaxed);
54 }
55
56 pub fn add(&self, n: u64) {
57 self.value.fetch_add(n, Ordering::Relaxed);
58 }
59
60 pub fn get(&self) -> u64 {
61 self.value.load(Ordering::Relaxed)
62 }
63}
64
65#[derive(Default)]
67pub struct Gauge {
68 value: AtomicI64,
69}
70
71impl Gauge {
72 pub fn new() -> Self {
73 Self::default()
74 }
75
76 pub fn set(&self, value: i64) {
77 self.value.store(value, Ordering::Relaxed);
78 }
79
80 pub fn inc(&self) {
81 self.value.fetch_add(1, Ordering::Relaxed);
82 }
83
84 pub fn dec(&self) {
85 self.value.fetch_sub(1, Ordering::Relaxed);
86 }
87
88 pub fn get(&self) -> i64 {
89 self.value.load(Ordering::Relaxed)
90 }
91}
92
93pub struct Histogram {
95 buckets: Vec<AtomicU64>,
96 bucket_bounds: Vec<f64>,
97 sum: AtomicU64,
98 count: AtomicU64,
99}
100
101impl Histogram {
102 pub fn new(bucket_bounds: Vec<f64>) -> Self {
103 let buckets = (0..=bucket_bounds.len())
104 .map(|_| AtomicU64::new(0))
105 .collect();
106 Self {
107 buckets,
108 bucket_bounds,
109 sum: AtomicU64::new(0),
110 count: AtomicU64::new(0),
111 }
112 }
113
114 pub fn default_latency() -> Self {
115 Self::new(vec![
116 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, 10000.0,
117 ])
118 }
119
120 pub fn observe(&self, value: f64) {
121 let bucket_idx = self
122 .bucket_bounds
123 .iter()
124 .position(|&bound| value <= bound)
125 .unwrap_or(self.bucket_bounds.len());
126
127 self.buckets[bucket_idx].fetch_add(1, Ordering::Relaxed);
128 self.sum
129 .fetch_add((value * 1000.0) as u64, Ordering::Relaxed);
130 self.count.fetch_add(1, Ordering::Relaxed);
131 }
132
133 pub fn count(&self) -> u64 {
134 self.count.load(Ordering::Relaxed)
135 }
136
137 pub fn sum(&self) -> u64 {
138 self.sum.load(Ordering::Relaxed)
139 }
140
141 pub fn sum_ms(&self) -> f64 {
146 self.sum.load(Ordering::Relaxed) as f64 / 1000.0
147 }
148}
149
150pub struct MetricsRegistry {
155 pub requests_total: Counter,
156 pub requests_success: Counter,
157 pub requests_error: Counter,
158 pub tokens_input: Counter,
159 pub tokens_output: Counter,
160 pub cache_read_tokens: Counter,
161 pub cache_creation_tokens: Counter,
162 pub tool_calls_total: Counter,
163 pub tool_errors: Counter,
164 pub active_sessions: Gauge,
165 pub request_latency_ms: Histogram,
166 pub cost_total_micros: Counter,
167 #[cfg(feature = "otel")]
168 otel_bridge: Option<OtelMetricsBridge>,
169}
170
171impl MetricsRegistry {
172 pub fn new(_config: &MetricsConfig) -> Self {
173 Self {
174 requests_total: Counter::new(),
175 requests_success: Counter::new(),
176 requests_error: Counter::new(),
177 tokens_input: Counter::new(),
178 tokens_output: Counter::new(),
179 cache_read_tokens: Counter::new(),
180 cache_creation_tokens: Counter::new(),
181 tool_calls_total: Counter::new(),
182 tool_errors: Counter::new(),
183 active_sessions: Gauge::new(),
184 request_latency_ms: Histogram::default_latency(),
185 cost_total_micros: Counter::new(),
186 #[cfg(feature = "otel")]
187 otel_bridge: None,
188 }
189 }
190
191 #[cfg(feature = "otel")]
192 pub fn otel(_config: &MetricsConfig, otel_config: &OtelConfig) -> Self {
193 let meter = global::meter(SERVICE_NAME_DEFAULT);
194 let bridge = OtelMetricsBridge::new(&meter);
195 let _ = &otel_config.service_name; Self {
198 requests_total: Counter::new(),
199 requests_success: Counter::new(),
200 requests_error: Counter::new(),
201 tokens_input: Counter::new(),
202 tokens_output: Counter::new(),
203 cache_read_tokens: Counter::new(),
204 cache_creation_tokens: Counter::new(),
205 tool_calls_total: Counter::new(),
206 tool_errors: Counter::new(),
207 active_sessions: Gauge::new(),
208 request_latency_ms: Histogram::default_latency(),
209 cost_total_micros: Counter::new(),
210 otel_bridge: Some(bridge),
211 }
212 }
213
214 pub fn record_request_start(&self) {
215 self.requests_total.inc();
216 self.active_sessions.inc();
217
218 #[cfg(feature = "otel")]
219 if let Some(ref bridge) = self.otel_bridge {
220 bridge.record_request_start();
221 }
222 }
223
224 pub fn record_request_end(&self, success: bool, latency_ms: f64) {
225 self.active_sessions.dec();
226 self.request_latency_ms.observe(latency_ms);
227 if success {
228 self.requests_success.inc();
229 } else {
230 self.requests_error.inc();
231 }
232
233 #[cfg(feature = "otel")]
234 if let Some(ref bridge) = self.otel_bridge {
235 bridge.record_request_end(success, latency_ms);
236 }
237 }
238
239 pub fn record_tokens(&self, input: u32, output: u32) {
240 self.tokens_input.add(input as u64);
241 self.tokens_output.add(output as u64);
242
243 #[cfg(feature = "otel")]
244 if let Some(ref bridge) = self.otel_bridge {
245 bridge.record_tokens(input as u64, output as u64);
246 }
247 }
248
249 pub fn record_cache(&self, read: u32, creation: u32) {
250 self.cache_read_tokens.add(read as u64);
251 self.cache_creation_tokens.add(creation as u64);
252
253 #[cfg(feature = "otel")]
254 if let Some(ref bridge) = self.otel_bridge {
255 bridge.record_cache(read as u64, creation as u64);
256 }
257 }
258
259 pub fn record_tool_call(&self, success: bool) {
260 self.tool_calls_total.inc();
261 if !success {
262 self.tool_errors.inc();
263 }
264
265 #[cfg(feature = "otel")]
266 if let Some(ref bridge) = self.otel_bridge {
267 bridge.record_tool_call(success);
268 }
269 }
270
271 pub fn record_cost(&self, cost_usd: Decimal) {
272 let scaled = cost_usd * COST_SCALE_FACTOR;
273 let micros = scaled
274 .try_into()
275 .unwrap_or_else(|_| scaled.to_string().parse::<u64>().unwrap_or(0));
276 self.cost_total_micros.add(micros);
277
278 #[cfg(feature = "otel")]
279 if let Some(ref bridge) = self.otel_bridge {
280 bridge.record_cost(cost_usd);
281 }
282 }
283
284 pub fn total_cost_usd(&self) -> Decimal {
285 Decimal::from(self.cost_total_micros.get()) / COST_SCALE_FACTOR
286 }
287}
288
289impl Default for MetricsRegistry {
290 fn default() -> Self {
291 Self::new(&MetricsConfig::default())
292 }
293}
294
295#[derive(Debug, Clone, Default)]
301pub struct MetricsSummary {
302 pub total_requests: u64,
303 pub successful_requests: u64,
304 pub failed_requests: u64,
305 pub total_input_tokens: u64,
306 pub total_output_tokens: u64,
307 pub cache_read_tokens: u64,
308 pub cache_creation_tokens: u64,
309 pub total_tool_calls: u64,
310 pub failed_tool_calls: u64,
311 pub total_cost_usd: Decimal,
312 pub avg_latency_ms: f64,
313}
314
315impl MetricsSummary {
316 pub fn from_registry(registry: &MetricsRegistry) -> Self {
317 let count = registry.request_latency_ms.count();
318 let avg_latency = if count > 0 {
319 registry.request_latency_ms.sum_ms() / count as f64
320 } else {
321 0.0
322 };
323
324 Self {
325 total_requests: registry.requests_total.get(),
326 successful_requests: registry.requests_success.get(),
327 failed_requests: registry.requests_error.get(),
328 total_input_tokens: registry.tokens_input.get(),
329 total_output_tokens: registry.tokens_output.get(),
330 cache_read_tokens: registry.cache_read_tokens.get(),
331 cache_creation_tokens: registry.cache_creation_tokens.get(),
332 total_tool_calls: registry.tool_calls_total.get(),
333 failed_tool_calls: registry.tool_errors.get(),
334 total_cost_usd: registry.total_cost_usd(),
335 avg_latency_ms: avg_latency,
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343 use rust_decimal_macros::dec;
344
345 #[test]
346 fn test_counter() {
347 let counter = Counter::new();
348 assert_eq!(counter.get(), 0);
349 counter.inc();
350 assert_eq!(counter.get(), 1);
351 counter.add(5);
352 assert_eq!(counter.get(), 6);
353 }
354
355 #[test]
356 fn test_gauge() {
357 let gauge = Gauge::new();
358 gauge.set(10);
359 assert_eq!(gauge.get(), 10);
360 gauge.inc();
361 assert_eq!(gauge.get(), 11);
362 gauge.dec();
363 assert_eq!(gauge.get(), 10);
364 gauge.set(0);
366 gauge.dec();
367 assert_eq!(gauge.get(), -1);
368 }
369
370 #[test]
371 fn test_histogram() {
372 let hist = Histogram::new(vec![10.0, 50.0, 100.0]);
373 hist.observe(5.0);
374 hist.observe(25.0);
375 hist.observe(75.0);
376 hist.observe(150.0);
377 assert_eq!(hist.count(), 4);
378 }
379
380 #[test]
381 fn test_metrics_registry() {
382 let registry = MetricsRegistry::default();
383 registry.record_request_start();
384 registry.record_tokens(100, 50);
385 registry.record_tool_call(true);
386 registry.record_cost(dec!(0.001));
387 registry.record_request_end(true, 250.0);
388
389 let metrics = MetricsSummary::from_registry(®istry);
390 assert_eq!(metrics.total_requests, 1);
391 assert_eq!(metrics.total_input_tokens, 100);
392 assert_eq!(metrics.total_output_tokens, 50);
393 assert_eq!(metrics.total_tool_calls, 1);
394 }
395}