Skip to main content

lean_ctx/core/
telemetry.rs

1//! Telemetry and metrics collection following OpenTelemetry GenAI conventions.
2//!
3//! Provides lock-free, zero-allocation metrics collection for:
4//! - Token usage (input, output, saved, compression ratio)
5//! - Tool call latency and success rates
6//! - Search quality metrics (latency, result counts)
7//! - Embedding inference performance
8//! - Cache hit/miss rates
9//!
10//! Naming follows the OpenTelemetry GenAI Semantic Conventions:
11//! https://opentelemetry.io/docs/specs/semconv/gen-ai/
12
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::OnceLock;
15use std::time::Instant;
16
17static METRICS: OnceLock<Metrics> = OnceLock::new();
18
19pub fn global_metrics() -> &'static Metrics {
20    METRICS.get_or_init(Metrics::new)
21}
22
23#[derive(Debug)]
24pub struct Metrics {
25    // gen_ai.usage.input_tokens / gen_ai.usage.output_tokens
26    pub tokens_input: AtomicU64,
27    pub tokens_output: AtomicU64,
28    pub tokens_saved: AtomicU64,
29
30    pub tool_calls_total: AtomicU64,
31    pub tool_calls_error: AtomicU64,
32    pub tool_call_latency_sum_us: AtomicU64,
33
34    pub search_queries_total: AtomicU64,
35    pub search_latency_sum_us: AtomicU64,
36    pub search_results_total: AtomicU64,
37
38    pub embedding_inferences_total: AtomicU64,
39    pub embedding_latency_sum_us: AtomicU64,
40    pub embedding_tokens_total: AtomicU64,
41
42    pub cache_hits: AtomicU64,
43    pub cache_misses: AtomicU64,
44
45    pub compression_calls: AtomicU64,
46    pub compression_input_bytes: AtomicU64,
47    pub compression_output_bytes: AtomicU64,
48
49    pub session_start: Instant,
50}
51
52impl Metrics {
53    pub fn new() -> Self {
54        Self {
55            tokens_input: AtomicU64::new(0),
56            tokens_output: AtomicU64::new(0),
57            tokens_saved: AtomicU64::new(0),
58            tool_calls_total: AtomicU64::new(0),
59            tool_calls_error: AtomicU64::new(0),
60            tool_call_latency_sum_us: AtomicU64::new(0),
61            search_queries_total: AtomicU64::new(0),
62            search_latency_sum_us: AtomicU64::new(0),
63            search_results_total: AtomicU64::new(0),
64            embedding_inferences_total: AtomicU64::new(0),
65            embedding_latency_sum_us: AtomicU64::new(0),
66            embedding_tokens_total: AtomicU64::new(0),
67            cache_hits: AtomicU64::new(0),
68            cache_misses: AtomicU64::new(0),
69            compression_calls: AtomicU64::new(0),
70            compression_input_bytes: AtomicU64::new(0),
71            compression_output_bytes: AtomicU64::new(0),
72            session_start: Instant::now(),
73        }
74    }
75
76    pub fn record_tool_call(&self, latency_us: u64, success: bool) {
77        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
78        self.tool_call_latency_sum_us
79            .fetch_add(latency_us, Ordering::Relaxed);
80        if !success {
81            self.tool_calls_error.fetch_add(1, Ordering::Relaxed);
82        }
83    }
84
85    pub fn record_tokens(&self, input: u64, output: u64, saved: u64) {
86        self.tokens_input.fetch_add(input, Ordering::Relaxed);
87        self.tokens_output.fetch_add(output, Ordering::Relaxed);
88        self.tokens_saved.fetch_add(saved, Ordering::Relaxed);
89    }
90
91    pub fn record_search(&self, latency_us: u64, result_count: u64) {
92        self.search_queries_total.fetch_add(1, Ordering::Relaxed);
93        self.search_latency_sum_us
94            .fetch_add(latency_us, Ordering::Relaxed);
95        self.search_results_total
96            .fetch_add(result_count, Ordering::Relaxed);
97    }
98
99    pub fn record_embedding(&self, latency_us: u64, token_count: u64) {
100        self.embedding_inferences_total
101            .fetch_add(1, Ordering::Relaxed);
102        self.embedding_latency_sum_us
103            .fetch_add(latency_us, Ordering::Relaxed);
104        self.embedding_tokens_total
105            .fetch_add(token_count, Ordering::Relaxed);
106    }
107
108    pub fn record_cache(&self, hit: bool) {
109        if hit {
110            self.cache_hits.fetch_add(1, Ordering::Relaxed);
111        } else {
112            self.cache_misses.fetch_add(1, Ordering::Relaxed);
113        }
114    }
115
116    pub fn record_compression(&self, input_bytes: u64, output_bytes: u64) {
117        self.compression_calls.fetch_add(1, Ordering::Relaxed);
118        self.compression_input_bytes
119            .fetch_add(input_bytes, Ordering::Relaxed);
120        self.compression_output_bytes
121            .fetch_add(output_bytes, Ordering::Relaxed);
122    }
123
124    pub fn snapshot(&self) -> MetricsSnapshot {
125        let tool_calls = self.tool_calls_total.load(Ordering::Relaxed);
126        let tool_latency = self.tool_call_latency_sum_us.load(Ordering::Relaxed);
127        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
128        let cache_misses = self.cache_misses.load(Ordering::Relaxed);
129        let comp_in = self.compression_input_bytes.load(Ordering::Relaxed);
130        let comp_out = self.compression_output_bytes.load(Ordering::Relaxed);
131
132        MetricsSnapshot {
133            tokens_input: self.tokens_input.load(Ordering::Relaxed),
134            tokens_output: self.tokens_output.load(Ordering::Relaxed),
135            tokens_saved: self.tokens_saved.load(Ordering::Relaxed),
136            tool_calls_total: tool_calls,
137            tool_calls_error: self.tool_calls_error.load(Ordering::Relaxed),
138            tool_call_avg_latency_ms: if tool_calls > 0 {
139                tool_latency as f64 / tool_calls as f64 / 1000.0
140            } else {
141                0.0
142            },
143            search_queries_total: self.search_queries_total.load(Ordering::Relaxed),
144            search_avg_latency_ms: {
145                let q = self.search_queries_total.load(Ordering::Relaxed);
146                if q > 0 {
147                    self.search_latency_sum_us.load(Ordering::Relaxed) as f64 / q as f64 / 1000.0
148                } else {
149                    0.0
150                }
151            },
152            embedding_inferences: self.embedding_inferences_total.load(Ordering::Relaxed),
153            embedding_avg_latency_ms: {
154                let e = self.embedding_inferences_total.load(Ordering::Relaxed);
155                if e > 0 {
156                    self.embedding_latency_sum_us.load(Ordering::Relaxed) as f64 / e as f64 / 1000.0
157                } else {
158                    0.0
159                }
160            },
161            cache_hit_rate: if cache_hits + cache_misses > 0 {
162                cache_hits as f64 / (cache_hits + cache_misses) as f64
163            } else {
164                0.0
165            },
166            compression_ratio: if comp_in > 0 {
167                1.0 - (comp_out as f64 / comp_in as f64)
168            } else {
169                0.0
170            },
171            session_uptime_secs: self.session_start.elapsed().as_secs(),
172        }
173    }
174
175    /// Format as OpenTelemetry-compatible attributes for logging.
176    pub fn to_otel_attributes(&self) -> Vec<(&'static str, String)> {
177        let snap = self.snapshot();
178        vec![
179            ("gen_ai.usage.input_tokens", snap.tokens_input.to_string()),
180            ("gen_ai.usage.output_tokens", snap.tokens_output.to_string()),
181            ("lean_ctx.tokens.saved", snap.tokens_saved.to_string()),
182            ("lean_ctx.tool.calls.total", snap.tool_calls_total.to_string()),
183            ("lean_ctx.tool.calls.error", snap.tool_calls_error.to_string()),
184            ("lean_ctx.tool.latency_avg_ms", format!("{:.2}", snap.tool_call_avg_latency_ms)),
185            ("lean_ctx.search.queries", snap.search_queries_total.to_string()),
186            ("lean_ctx.search.latency_avg_ms", format!("{:.2}", snap.search_avg_latency_ms)),
187            ("lean_ctx.embedding.inferences", snap.embedding_inferences.to_string()),
188            ("lean_ctx.embedding.latency_avg_ms", format!("{:.2}", snap.embedding_avg_latency_ms)),
189            ("lean_ctx.cache.hit_rate", format!("{:.4}", snap.cache_hit_rate)),
190            ("lean_ctx.compression.ratio", format!("{:.4}", snap.compression_ratio)),
191            ("lean_ctx.session.uptime_secs", snap.session_uptime_secs.to_string()),
192        ]
193    }
194}
195
196/// Point-in-time snapshot of all metrics.
197#[derive(Debug, Clone)]
198pub struct MetricsSnapshot {
199    pub tokens_input: u64,
200    pub tokens_output: u64,
201    pub tokens_saved: u64,
202    pub tool_calls_total: u64,
203    pub tool_calls_error: u64,
204    pub tool_call_avg_latency_ms: f64,
205    pub search_queries_total: u64,
206    pub search_avg_latency_ms: f64,
207    pub embedding_inferences: u64,
208    pub embedding_avg_latency_ms: f64,
209    pub cache_hit_rate: f64,
210    pub compression_ratio: f64,
211    pub session_uptime_secs: u64,
212}
213
214impl MetricsSnapshot {
215    pub fn to_compact_string(&self) -> String {
216        format!(
217            "tok={}/{}/{} calls={}/{} search={} embed={} cache={:.0}% comp={:.0}% up={}s",
218            self.tokens_input,
219            self.tokens_output,
220            self.tokens_saved,
221            self.tool_calls_total,
222            self.tool_calls_error,
223            self.search_queries_total,
224            self.embedding_inferences,
225            self.cache_hit_rate * 100.0,
226            self.compression_ratio * 100.0,
227            self.session_uptime_secs,
228        )
229    }
230
231    pub fn to_json(&self) -> String {
232        serde_json::json!({
233            "gen_ai": {
234                "usage": {
235                    "input_tokens": self.tokens_input,
236                    "output_tokens": self.tokens_output,
237                }
238            },
239            "lean_ctx": {
240                "tokens": { "saved": self.tokens_saved },
241                "tool": {
242                    "calls_total": self.tool_calls_total,
243                    "calls_error": self.tool_calls_error,
244                    "avg_latency_ms": self.tool_call_avg_latency_ms,
245                },
246                "search": {
247                    "queries": self.search_queries_total,
248                    "avg_latency_ms": self.search_avg_latency_ms,
249                },
250                "embedding": {
251                    "inferences": self.embedding_inferences,
252                    "avg_latency_ms": self.embedding_avg_latency_ms,
253                },
254                "cache": { "hit_rate": self.cache_hit_rate },
255                "compression": { "ratio": self.compression_ratio },
256                "session": { "uptime_secs": self.session_uptime_secs },
257            }
258        })
259        .to_string()
260    }
261}
262
263/// RAII guard that records tool call latency on drop.
264pub struct ToolCallTimer {
265    start: Instant,
266    tool_name: &'static str,
267}
268
269impl ToolCallTimer {
270    pub fn new(tool_name: &'static str) -> Self {
271        Self {
272            start: Instant::now(),
273            tool_name,
274        }
275    }
276
277    pub fn finish(self, success: bool) {
278        let elapsed = self.start.elapsed();
279        let us = elapsed.as_micros() as u64;
280        global_metrics().record_tool_call(us, success);
281        tracing::debug!(
282            tool = self.tool_name,
283            latency_ms = elapsed.as_millis() as u64,
284            success,
285            "tool_call"
286        );
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    #[test]
295    fn record_tool_call() {
296        let m = Metrics::new();
297        m.record_tool_call(5000, true);
298        m.record_tool_call(3000, false);
299
300        let snap = m.snapshot();
301        assert_eq!(snap.tool_calls_total, 2);
302        assert_eq!(snap.tool_calls_error, 1);
303        assert!(snap.tool_call_avg_latency_ms > 0.0);
304    }
305
306    #[test]
307    fn record_tokens() {
308        let m = Metrics::new();
309        m.record_tokens(100, 50, 200);
310        m.record_tokens(150, 75, 300);
311
312        let snap = m.snapshot();
313        assert_eq!(snap.tokens_input, 250);
314        assert_eq!(snap.tokens_output, 125);
315        assert_eq!(snap.tokens_saved, 500);
316    }
317
318    #[test]
319    fn record_search() {
320        let m = Metrics::new();
321        m.record_search(2000, 5);
322        m.record_search(4000, 3);
323
324        let snap = m.snapshot();
325        assert_eq!(snap.search_queries_total, 2);
326        assert!((snap.search_avg_latency_ms - 3.0).abs() < 0.01);
327    }
328
329    #[test]
330    fn cache_hit_rate() {
331        let m = Metrics::new();
332        m.record_cache(true);
333        m.record_cache(true);
334        m.record_cache(false);
335
336        let snap = m.snapshot();
337        assert!((snap.cache_hit_rate - 0.6667).abs() < 0.01);
338    }
339
340    #[test]
341    fn compression_ratio() {
342        let m = Metrics::new();
343        m.record_compression(1000, 200);
344
345        let snap = m.snapshot();
346        assert!((snap.compression_ratio - 0.8).abs() < 0.01);
347    }
348
349    #[test]
350    fn snapshot_compact_string() {
351        let m = Metrics::new();
352        m.record_tokens(100, 50, 200);
353        m.record_tool_call(5000, true);
354        let compact = m.snapshot().to_compact_string();
355        assert!(compact.contains("tok=100/50/200"));
356        assert!(compact.contains("calls=1/0"));
357    }
358
359    #[test]
360    fn snapshot_json() {
361        let m = Metrics::new();
362        m.record_tokens(100, 50, 200);
363        let json = m.snapshot().to_json();
364        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
365        assert_eq!(parsed["gen_ai"]["usage"]["input_tokens"], 100);
366        assert_eq!(parsed["lean_ctx"]["tokens"]["saved"], 200);
367    }
368
369    #[test]
370    fn otel_attributes() {
371        let m = Metrics::new();
372        m.record_tokens(100, 50, 200);
373        let attrs = m.to_otel_attributes();
374        assert!(attrs.iter().any(|(k, v)| *k == "gen_ai.usage.input_tokens" && v == "100"));
375    }
376
377    #[test]
378    fn global_metrics_singleton() {
379        let m1 = global_metrics();
380        let m2 = global_metrics();
381        assert!(std::ptr::eq(m1, m2));
382    }
383
384    #[test]
385    fn tool_call_timer() {
386        let timer = ToolCallTimer::new("test_tool");
387        std::thread::sleep(std::time::Duration::from_millis(5));
388        timer.finish(true);
389    }
390}