Skip to main content

lean_ctx/core/
telemetry.rs

1//! Telemetry and metrics collection following OpenTelemetry GenAI conventions.
2//!
3//! Provides lock-free, zero-allocation metrics collection for:
4//! - Token usage (input, output, saved, compression ratio)
5//! - Tool call latency and success rates
6//! - Search quality metrics (latency, result counts)
7//! - Embedding inference performance
8//! - Cache hit/miss rates
9//!
10//! Naming follows the OpenTelemetry GenAI Semantic Conventions:
11//! https://opentelemetry.io/docs/specs/semconv/gen-ai/
12
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::OnceLock;
15use std::time::Instant;
16
17static METRICS: OnceLock<Metrics> = OnceLock::new();
18
19pub fn global_metrics() -> &'static Metrics {
20    METRICS.get_or_init(Metrics::new)
21}
22
23#[derive(Debug)]
24pub struct Metrics {
25    // gen_ai.usage.input_tokens / gen_ai.usage.output_tokens
26    pub tokens_input: AtomicU64,
27    pub tokens_output: AtomicU64,
28    pub tokens_saved: AtomicU64,
29
30    pub tool_calls_total: AtomicU64,
31    pub tool_calls_error: AtomicU64,
32    pub tool_call_latency_sum_us: AtomicU64,
33
34    pub search_queries_total: AtomicU64,
35    pub search_latency_sum_us: AtomicU64,
36    pub search_results_total: AtomicU64,
37
38    pub embedding_inferences_total: AtomicU64,
39    pub embedding_latency_sum_us: AtomicU64,
40    pub embedding_tokens_total: AtomicU64,
41
42    pub cache_hits: AtomicU64,
43    pub cache_misses: AtomicU64,
44
45    pub compression_calls: AtomicU64,
46    pub compression_input_bytes: AtomicU64,
47    pub compression_output_bytes: AtomicU64,
48
49    pub session_start: Instant,
50}
51
52impl Default for Metrics {
53    fn default() -> Self {
54        Self {
55            tokens_input: AtomicU64::new(0),
56            tokens_output: AtomicU64::new(0),
57            tokens_saved: AtomicU64::new(0),
58            tool_calls_total: AtomicU64::new(0),
59            tool_calls_error: AtomicU64::new(0),
60            tool_call_latency_sum_us: AtomicU64::new(0),
61            search_queries_total: AtomicU64::new(0),
62            search_latency_sum_us: AtomicU64::new(0),
63            search_results_total: AtomicU64::new(0),
64            embedding_inferences_total: AtomicU64::new(0),
65            embedding_latency_sum_us: AtomicU64::new(0),
66            embedding_tokens_total: AtomicU64::new(0),
67            cache_hits: AtomicU64::new(0),
68            cache_misses: AtomicU64::new(0),
69            compression_calls: AtomicU64::new(0),
70            compression_input_bytes: AtomicU64::new(0),
71            compression_output_bytes: AtomicU64::new(0),
72            session_start: Instant::now(),
73        }
74    }
75}
76
77impl Metrics {
78    pub fn new() -> Self {
79        Self::default()
80    }
81
82    pub fn record_tool_call(&self, latency_us: u64, success: bool) {
83        self.tool_calls_total.fetch_add(1, Ordering::Relaxed);
84        self.tool_call_latency_sum_us
85            .fetch_add(latency_us, Ordering::Relaxed);
86        if !success {
87            self.tool_calls_error.fetch_add(1, Ordering::Relaxed);
88        }
89    }
90
91    pub fn record_tokens(&self, input: u64, output: u64, saved: u64) {
92        self.tokens_input.fetch_add(input, Ordering::Relaxed);
93        self.tokens_output.fetch_add(output, Ordering::Relaxed);
94        self.tokens_saved.fetch_add(saved, Ordering::Relaxed);
95    }
96
97    pub fn record_search(&self, latency_us: u64, result_count: u64) {
98        self.search_queries_total.fetch_add(1, Ordering::Relaxed);
99        self.search_latency_sum_us
100            .fetch_add(latency_us, Ordering::Relaxed);
101        self.search_results_total
102            .fetch_add(result_count, Ordering::Relaxed);
103    }
104
105    pub fn record_embedding(&self, latency_us: u64, token_count: u64) {
106        self.embedding_inferences_total
107            .fetch_add(1, Ordering::Relaxed);
108        self.embedding_latency_sum_us
109            .fetch_add(latency_us, Ordering::Relaxed);
110        self.embedding_tokens_total
111            .fetch_add(token_count, Ordering::Relaxed);
112    }
113
114    pub fn record_cache(&self, hit: bool) {
115        if hit {
116            self.cache_hits.fetch_add(1, Ordering::Relaxed);
117        } else {
118            self.cache_misses.fetch_add(1, Ordering::Relaxed);
119        }
120    }
121
122    pub fn record_compression(&self, input_bytes: u64, output_bytes: u64) {
123        self.compression_calls.fetch_add(1, Ordering::Relaxed);
124        self.compression_input_bytes
125            .fetch_add(input_bytes, Ordering::Relaxed);
126        self.compression_output_bytes
127            .fetch_add(output_bytes, Ordering::Relaxed);
128    }
129
130    pub fn snapshot(&self) -> MetricsSnapshot {
131        let tool_calls = self.tool_calls_total.load(Ordering::Relaxed);
132        let tool_latency = self.tool_call_latency_sum_us.load(Ordering::Relaxed);
133        let cache_hits = self.cache_hits.load(Ordering::Relaxed);
134        let cache_misses = self.cache_misses.load(Ordering::Relaxed);
135        let comp_in = self.compression_input_bytes.load(Ordering::Relaxed);
136        let comp_out = self.compression_output_bytes.load(Ordering::Relaxed);
137
138        MetricsSnapshot {
139            tokens_input: self.tokens_input.load(Ordering::Relaxed),
140            tokens_output: self.tokens_output.load(Ordering::Relaxed),
141            tokens_saved: self.tokens_saved.load(Ordering::Relaxed),
142            tool_calls_total: tool_calls,
143            tool_calls_error: self.tool_calls_error.load(Ordering::Relaxed),
144            tool_call_avg_latency_ms: if tool_calls > 0 {
145                tool_latency as f64 / tool_calls as f64 / 1000.0
146            } else {
147                0.0
148            },
149            search_queries_total: self.search_queries_total.load(Ordering::Relaxed),
150            search_avg_latency_ms: {
151                let q = self.search_queries_total.load(Ordering::Relaxed);
152                if q > 0 {
153                    self.search_latency_sum_us.load(Ordering::Relaxed) as f64 / q as f64 / 1000.0
154                } else {
155                    0.0
156                }
157            },
158            embedding_inferences: self.embedding_inferences_total.load(Ordering::Relaxed),
159            embedding_avg_latency_ms: {
160                let e = self.embedding_inferences_total.load(Ordering::Relaxed);
161                if e > 0 {
162                    self.embedding_latency_sum_us.load(Ordering::Relaxed) as f64 / e as f64 / 1000.0
163                } else {
164                    0.0
165                }
166            },
167            cache_hit_rate: if cache_hits + cache_misses > 0 {
168                cache_hits as f64 / (cache_hits + cache_misses) as f64
169            } else {
170                0.0
171            },
172            compression_ratio: if comp_in > 0 {
173                1.0 - (comp_out as f64 / comp_in as f64)
174            } else {
175                0.0
176            },
177            session_uptime_secs: self.session_start.elapsed().as_secs(),
178        }
179    }
180
181    /// Format as OpenTelemetry-compatible attributes for logging.
182    pub fn to_otel_attributes(&self) -> Vec<(&'static str, String)> {
183        let snap = self.snapshot();
184        vec![
185            ("gen_ai.usage.input_tokens", snap.tokens_input.to_string()),
186            ("gen_ai.usage.output_tokens", snap.tokens_output.to_string()),
187            ("lean_ctx.tokens.saved", snap.tokens_saved.to_string()),
188            (
189                "lean_ctx.tool.calls.total",
190                snap.tool_calls_total.to_string(),
191            ),
192            (
193                "lean_ctx.tool.calls.error",
194                snap.tool_calls_error.to_string(),
195            ),
196            (
197                "lean_ctx.tool.latency_avg_ms",
198                format!("{:.2}", snap.tool_call_avg_latency_ms),
199            ),
200            (
201                "lean_ctx.search.queries",
202                snap.search_queries_total.to_string(),
203            ),
204            (
205                "lean_ctx.search.latency_avg_ms",
206                format!("{:.2}", snap.search_avg_latency_ms),
207            ),
208            (
209                "lean_ctx.embedding.inferences",
210                snap.embedding_inferences.to_string(),
211            ),
212            (
213                "lean_ctx.embedding.latency_avg_ms",
214                format!("{:.2}", snap.embedding_avg_latency_ms),
215            ),
216            (
217                "lean_ctx.cache.hit_rate",
218                format!("{:.4}", snap.cache_hit_rate),
219            ),
220            (
221                "lean_ctx.compression.ratio",
222                format!("{:.4}", snap.compression_ratio),
223            ),
224            (
225                "lean_ctx.session.uptime_secs",
226                snap.session_uptime_secs.to_string(),
227            ),
228        ]
229    }
230}
231
232/// Point-in-time snapshot of all metrics.
233#[derive(Debug, Clone)]
234pub struct MetricsSnapshot {
235    pub tokens_input: u64,
236    pub tokens_output: u64,
237    pub tokens_saved: u64,
238    pub tool_calls_total: u64,
239    pub tool_calls_error: u64,
240    pub tool_call_avg_latency_ms: f64,
241    pub search_queries_total: u64,
242    pub search_avg_latency_ms: f64,
243    pub embedding_inferences: u64,
244    pub embedding_avg_latency_ms: f64,
245    pub cache_hit_rate: f64,
246    pub compression_ratio: f64,
247    pub session_uptime_secs: u64,
248}
249
250impl MetricsSnapshot {
251    pub fn to_compact_string(&self) -> String {
252        format!(
253            "tok={}/{}/{} calls={}/{} search={} embed={} cache={:.0}% comp={:.0}% up={}s",
254            self.tokens_input,
255            self.tokens_output,
256            self.tokens_saved,
257            self.tool_calls_total,
258            self.tool_calls_error,
259            self.search_queries_total,
260            self.embedding_inferences,
261            self.cache_hit_rate * 100.0,
262            self.compression_ratio * 100.0,
263            self.session_uptime_secs,
264        )
265    }
266
267    pub fn to_json(&self) -> String {
268        serde_json::json!({
269            "gen_ai": {
270                "usage": {
271                    "input_tokens": self.tokens_input,
272                    "output_tokens": self.tokens_output,
273                }
274            },
275            "lean_ctx": {
276                "tokens": { "saved": self.tokens_saved },
277                "tool": {
278                    "calls_total": self.tool_calls_total,
279                    "calls_error": self.tool_calls_error,
280                    "avg_latency_ms": self.tool_call_avg_latency_ms,
281                },
282                "search": {
283                    "queries": self.search_queries_total,
284                    "avg_latency_ms": self.search_avg_latency_ms,
285                },
286                "embedding": {
287                    "inferences": self.embedding_inferences,
288                    "avg_latency_ms": self.embedding_avg_latency_ms,
289                },
290                "cache": { "hit_rate": self.cache_hit_rate },
291                "compression": { "ratio": self.compression_ratio },
292                "session": { "uptime_secs": self.session_uptime_secs },
293            }
294        })
295        .to_string()
296    }
297}
298
299/// RAII guard that records tool call latency on drop.
300pub struct ToolCallTimer {
301    start: Instant,
302    tool_name: &'static str,
303}
304
305impl ToolCallTimer {
306    pub fn new(tool_name: &'static str) -> Self {
307        Self {
308            start: Instant::now(),
309            tool_name,
310        }
311    }
312
313    pub fn finish(self, success: bool) {
314        let elapsed = self.start.elapsed();
315        let us = elapsed.as_micros() as u64;
316        global_metrics().record_tool_call(us, success);
317        tracing::debug!(
318            tool = self.tool_name,
319            latency_ms = elapsed.as_millis() as u64,
320            success,
321            "tool_call"
322        );
323    }
324}
325
326// ---------------------------------------------------------------------------
327// Prometheus text format export (Zero-PII)
328// ---------------------------------------------------------------------------
329
330impl Metrics {
331    pub fn to_prometheus(&self) -> String {
332        let snap = self.snapshot();
333        let budget = crate::core::budget_tracker::BudgetTracker::global().check();
334        let slo_snap = crate::core::slo::evaluate_quiet();
335        let slo_violations = slo_snap.violations.len();
336
337        let mut lines = Vec::with_capacity(32);
338
339        lines.push("# HELP lean_ctx_tokens_saved_total Total tokens saved by compression".into());
340        lines.push("# TYPE lean_ctx_tokens_saved_total counter".into());
341        lines.push(format!("lean_ctx_tokens_saved_total {}", snap.tokens_saved));
342
343        lines.push("# HELP lean_ctx_tokens_input_total Total input tokens processed".into());
344        lines.push("# TYPE lean_ctx_tokens_input_total counter".into());
345        lines.push(format!("lean_ctx_tokens_input_total {}", snap.tokens_input));
346
347        lines.push("# HELP lean_ctx_tokens_output_total Total output tokens generated".into());
348        lines.push("# TYPE lean_ctx_tokens_output_total counter".into());
349        lines.push(format!(
350            "lean_ctx_tokens_output_total {}",
351            snap.tokens_output
352        ));
353
354        lines.push("# HELP lean_ctx_compression_ratio Current compression ratio".into());
355        lines.push("# TYPE lean_ctx_compression_ratio gauge".into());
356        lines.push(format!(
357            "lean_ctx_compression_ratio {:.4}",
358            snap.compression_ratio
359        ));
360
361        lines.push("# HELP lean_ctx_tool_calls_total Total tool calls".into());
362        lines.push("# TYPE lean_ctx_tool_calls_total counter".into());
363        lines.push(format!(
364            "lean_ctx_tool_calls_total {}",
365            snap.tool_calls_total
366        ));
367
368        lines.push("# HELP lean_ctx_tool_calls_error_total Total failed tool calls".into());
369        lines.push("# TYPE lean_ctx_tool_calls_error_total counter".into());
370        lines.push(format!(
371            "lean_ctx_tool_calls_error_total {}",
372            snap.tool_calls_error
373        ));
374
375        lines.push("# HELP lean_ctx_session_cost_usd Estimated session cost in USD".into());
376        lines.push("# TYPE lean_ctx_session_cost_usd gauge".into());
377        lines.push(format!(
378            "lean_ctx_session_cost_usd {:.4}",
379            budget.cost.used_usd
380        ));
381
382        lines.push("# HELP lean_ctx_session_context_tokens Current context token count".into());
383        lines.push("# TYPE lean_ctx_session_context_tokens gauge".into());
384        lines.push(format!(
385            "lean_ctx_session_context_tokens {}",
386            budget.tokens.used
387        ));
388
389        lines.push("# HELP lean_ctx_shell_invocations_total Total shell invocations".into());
390        lines.push("# TYPE lean_ctx_shell_invocations_total counter".into());
391        lines.push(format!(
392            "lean_ctx_shell_invocations_total {}",
393            budget.shell.used
394        ));
395
396        lines.push("# HELP lean_ctx_slo_violations_total Total active SLO violations".into());
397        lines.push("# TYPE lean_ctx_slo_violations_total gauge".into());
398        lines.push(format!("lean_ctx_slo_violations_total {slo_violations}"));
399
400        lines.push("# HELP lean_ctx_cache_hit_rate Cache hit rate (0-1)".into());
401        lines.push("# TYPE lean_ctx_cache_hit_rate gauge".into());
402        lines.push(format!(
403            "lean_ctx_cache_hit_rate {:.4}",
404            snap.cache_hit_rate
405        ));
406
407        lines.push("# HELP lean_ctx_anomalies_total Total anomaly detections".into());
408        lines.push("# TYPE lean_ctx_anomalies_total gauge".into());
409        let anomaly_count = crate::core::anomaly::summary()
410            .iter()
411            .filter(|m| m.count > 0)
412            .count();
413        lines.push(format!("lean_ctx_anomalies_total {anomaly_count}"));
414
415        let verify_snap = crate::core::output_verification::stats_snapshot();
416        lines.push("# HELP lean_ctx_verification_pass_total Total verification passes".into());
417        lines.push("# TYPE lean_ctx_verification_pass_total counter".into());
418        lines.push(format!(
419            "lean_ctx_verification_pass_total {}",
420            verify_snap.pass
421        ));
422
423        lines.push("# HELP lean_ctx_verification_warn_total Total verification warnings".into());
424        lines.push("# TYPE lean_ctx_verification_warn_total counter".into());
425        lines.push(format!(
426            "lean_ctx_verification_warn_total {}",
427            verify_snap.warn_items
428        ));
429
430        lines.push(
431            "# HELP lean_ctx_verification_warn_runs_total Total runs with verification warnings"
432                .into(),
433        );
434        lines.push("# TYPE lean_ctx_verification_warn_runs_total counter".into());
435        lines.push(format!(
436            "lean_ctx_verification_warn_runs_total {}",
437            verify_snap.warn_runs
438        ));
439
440        lines.push("# HELP lean_ctx_verification_pass_rate Verification pass rate".into());
441        lines.push("# TYPE lean_ctx_verification_pass_rate gauge".into());
442        lines.push(format!(
443            "lean_ctx_verification_pass_rate {:.4}",
444            verify_snap.pass_rate
445        ));
446
447        lines.push("# HELP lean_ctx_info_loss_score Average info loss score (0..1)".into());
448        lines.push("# TYPE lean_ctx_info_loss_score gauge".into());
449        lines.push(format!(
450            "lean_ctx_info_loss_score {:.6}",
451            verify_snap.avg_info_loss_score
452        ));
453
454        lines.push("# HELP lean_ctx_session_uptime_seconds Session uptime in seconds".into());
455        lines.push("# TYPE lean_ctx_session_uptime_seconds gauge".into());
456        lines.push(format!(
457            "lean_ctx_session_uptime_seconds {}",
458            snap.session_uptime_secs
459        ));
460
461        lines.join("\n") + "\n"
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468
469    #[test]
470    fn record_tool_call() {
471        let m = Metrics::new();
472        m.record_tool_call(5000, true);
473        m.record_tool_call(3000, false);
474
475        let snap = m.snapshot();
476        assert_eq!(snap.tool_calls_total, 2);
477        assert_eq!(snap.tool_calls_error, 1);
478        assert!(snap.tool_call_avg_latency_ms > 0.0);
479    }
480
481    #[test]
482    fn record_tokens() {
483        let m = Metrics::new();
484        m.record_tokens(100, 50, 200);
485        m.record_tokens(150, 75, 300);
486
487        let snap = m.snapshot();
488        assert_eq!(snap.tokens_input, 250);
489        assert_eq!(snap.tokens_output, 125);
490        assert_eq!(snap.tokens_saved, 500);
491    }
492
493    #[test]
494    fn record_search() {
495        let m = Metrics::new();
496        m.record_search(2000, 5);
497        m.record_search(4000, 3);
498
499        let snap = m.snapshot();
500        assert_eq!(snap.search_queries_total, 2);
501        assert!((snap.search_avg_latency_ms - 3.0).abs() < 0.01);
502    }
503
504    #[test]
505    fn cache_hit_rate() {
506        let m = Metrics::new();
507        m.record_cache(true);
508        m.record_cache(true);
509        m.record_cache(false);
510
511        let snap = m.snapshot();
512        assert!((snap.cache_hit_rate - 0.6667).abs() < 0.01);
513    }
514
515    #[test]
516    fn compression_ratio() {
517        let m = Metrics::new();
518        m.record_compression(1000, 200);
519
520        let snap = m.snapshot();
521        assert!((snap.compression_ratio - 0.8).abs() < 0.01);
522    }
523
524    #[test]
525    fn snapshot_compact_string() {
526        let m = Metrics::new();
527        m.record_tokens(100, 50, 200);
528        m.record_tool_call(5000, true);
529        let compact = m.snapshot().to_compact_string();
530        assert!(compact.contains("tok=100/50/200"));
531        assert!(compact.contains("calls=1/0"));
532    }
533
534    #[test]
535    fn snapshot_json() {
536        let m = Metrics::new();
537        m.record_tokens(100, 50, 200);
538        let json = m.snapshot().to_json();
539        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
540        assert_eq!(parsed["gen_ai"]["usage"]["input_tokens"], 100);
541        assert_eq!(parsed["lean_ctx"]["tokens"]["saved"], 200);
542    }
543
544    #[test]
545    fn otel_attributes() {
546        let m = Metrics::new();
547        m.record_tokens(100, 50, 200);
548        let attrs = m.to_otel_attributes();
549        assert!(attrs
550            .iter()
551            .any(|(k, v)| *k == "gen_ai.usage.input_tokens" && v == "100"));
552    }
553
554    #[test]
555    fn global_metrics_singleton() {
556        let m1 = global_metrics();
557        let m2 = global_metrics();
558        assert!(std::ptr::eq(m1, m2));
559    }
560
561    #[test]
562    fn tool_call_timer() {
563        let timer = ToolCallTimer::new("test_tool");
564        std::thread::sleep(std::time::Duration::from_millis(5));
565        timer.finish(true);
566    }
567}