Skip to main content

bitrouter_api/
metrics.rs

1//! In-memory per-route metrics for BitRouter.
2//!
3//! This module provides a thread-safe, in-memory metrics store that records
4//! per-route and per-endpoint performance data. Metrics accumulate during the
5//! lifetime of the process and are reset on restart.
6//!
7//! # Persistence
8//!
9//! Metrics are currently held **in memory only** and are lost on process
10//! restart. This is intentional for the initial release — the consuming plugin
11//! layer can handle its own persistence if needed. A future release may back
12//! this store with a database or Redis cache.
13//!
14//! # Latency tracking
15//!
16//! Latency samples are stored in a bounded vector (up to
17//! [`MAX_LATENCY_SAMPLES`] per route and per endpoint). When the cap is
18//! reached the oldest half of the samples are discarded so percentile
19//! calculations remain representative of recent traffic. A future optimisation
20//! could replace this with an HDR histogram or t-digest sketch for constant
21//! memory overhead.
22//!
23//! # Streaming requests
24//!
25//! For streaming requests only the time-to-stream-start latency and
26//! request/error counts are recorded — token usage is not available until the
27//! stream completes, so `avg_input_tokens` and `avg_output_tokens` only
28//! reflect non-streaming (generate) requests.
29
30use std::collections::HashMap;
31use std::sync::RwLock;
32use std::time::{Instant, SystemTime, UNIX_EPOCH};
33
34use serde::Serialize;
35
36/// Maximum number of latency samples retained per route or endpoint.
37///
38/// When this limit is reached the oldest half of samples are discarded to keep
39/// memory usage bounded while preserving recent data for percentile accuracy.
40///
41/// This value is intentionally conservative to avoid excessive memory usage in
42/// deployments with many routes/endpoints while still keeping enough samples
43/// for stable percentile estimates.
44const MAX_LATENCY_SAMPLES: usize = 10_000;
45
46// ── Public types ────────────────────────────────────────────────────────────
47
48/// Thread-safe, in-memory store for per-route request metrics.
49///
50/// Create a single instance at server startup and share it (via `Arc`) with
51/// all request-handling filters. Call [`MetricsStore::record`] after each
52/// upstream request completes, and [`MetricsStore::snapshot`] to produce a
53/// serializable view for the `GET /v1/metrics` endpoint.
54///
55/// # Persistence
56///
57/// Metrics are currently held in memory only and are lost on process restart.
58/// This is intentional for the initial release — the consuming plugin layer
59/// (OpenClaw) can handle its own persistence if needed. A future release may
60/// back this store with a database or Redis cache.
61pub struct MetricsStore {
62    started_at: Instant,
63    inner: RwLock<StoreInner>,
64}
65
66/// Data captured from a single completed request, used to update the store.
67pub struct RequestMetrics {
68    /// The route name (incoming model name).
69    pub route: String,
70    /// The endpoint identifier, typically `"provider:model_id"`.
71    pub endpoint: String,
72    /// Request latency in milliseconds.
73    pub latency_ms: u64,
74    /// Whether the request resulted in an error.
75    pub is_error: bool,
76    /// Input token count from the response (if available).
77    pub input_tokens: Option<u32>,
78    /// Output token count from the response (if available).
79    pub output_tokens: Option<u32>,
80}
81
82/// Formats a `"provider:model_id"` endpoint identifier from routing target
83/// components. Used by all provider handlers to build the endpoint key before
84/// routing consumes the target.
85pub fn format_endpoint(provider_name: &str, model_id: &str) -> String {
86    format!("{provider_name}:{model_id}")
87}
88
89// ── Serialisable snapshot types ─────────────────────────────────────────────
90
91/// Top-level metrics snapshot returned by `GET /v1/metrics`.
92#[derive(Debug, Serialize)]
93pub struct MetricsSnapshot {
94    /// Seconds since the metrics store was created (≈ process uptime).
95    pub uptime_seconds: u64,
96    /// Per-route aggregate metrics keyed by route name.
97    pub routes: HashMap<String, RouteSnapshot>,
98}
99
100/// Aggregate metrics for a single route.
101#[derive(Debug, Serialize)]
102pub struct RouteSnapshot {
103    pub total_requests: u64,
104    pub total_errors: u64,
105    pub error_rate: f64,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub latency_p50_ms: Option<u64>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub latency_p99_ms: Option<u64>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub avg_input_tokens: Option<u64>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    pub avg_output_tokens: Option<u64>,
114    /// Unix timestamp (seconds) of the most recent request on this route.
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub last_used: Option<u64>,
117    /// Per-endpoint breakdown within this route.
118    pub by_endpoint: HashMap<String, EndpointSnapshot>,
119}
120
121/// Aggregate metrics for a single endpoint within a route.
122#[derive(Debug, Serialize)]
123pub struct EndpointSnapshot {
124    pub total_requests: u64,
125    pub total_errors: u64,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub latency_p50_ms: Option<u64>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub latency_p99_ms: Option<u64>,
130}
131
132// ── Internal storage ────────────────────────────────────────────────────────
133
134struct StoreInner {
135    routes: HashMap<String, RouteData>,
136}
137
138struct RouteData {
139    total_requests: u64,
140    total_errors: u64,
141    latencies_ms: Vec<u64>,
142    total_input_tokens: u64,
143    total_output_tokens: u64,
144    /// Number of requests that reported token data (for averaging).
145    token_request_count: u64,
146    last_used: Option<SystemTime>,
147    endpoints: HashMap<String, EndpointData>,
148}
149
150struct EndpointData {
151    total_requests: u64,
152    total_errors: u64,
153    latencies_ms: Vec<u64>,
154}
155
156/// Lightweight clone of [`RouteData`] used by [`MetricsStore::snapshot`] so
157/// that the read-lock can be released before percentile computation.
158struct ClonedRouteData {
159    total_requests: u64,
160    total_errors: u64,
161    latencies_ms: Vec<u64>,
162    total_input_tokens: u64,
163    total_output_tokens: u64,
164    token_request_count: u64,
165    last_used: Option<SystemTime>,
166    endpoints: Vec<(String, ClonedEndpointData)>,
167}
168
169/// Lightweight clone of [`EndpointData`] used by [`MetricsStore::snapshot`].
170struct ClonedEndpointData {
171    total_requests: u64,
172    total_errors: u64,
173    latencies_ms: Vec<u64>,
174}
175
176// ── Implementation ──────────────────────────────────────────────────────────
177
178impl MetricsStore {
179    /// Creates a new, empty metrics store. The uptime clock starts now.
180    pub fn new() -> Self {
181        Self {
182            started_at: Instant::now(),
183            inner: RwLock::new(StoreInner {
184                routes: HashMap::new(),
185            }),
186        }
187    }
188
189    /// Records a completed request into the store.
190    ///
191    /// This method acquires a write-lock for a very short duration (counter
192    /// increments + a `Vec::push`) so contention should be negligible.
193    pub fn record(&self, event: RequestMetrics) {
194        let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner());
195
196        let route = inner
197            .routes
198            .entry(event.route)
199            .or_insert_with(RouteData::new);
200
201        route.total_requests += 1;
202        if event.is_error {
203            route.total_errors += 1;
204        }
205        push_latency(&mut route.latencies_ms, event.latency_ms);
206        route.last_used = Some(SystemTime::now());
207
208        if let (Some(input), Some(output)) = (event.input_tokens, event.output_tokens) {
209            route.total_input_tokens += input as u64;
210            route.total_output_tokens += output as u64;
211            route.token_request_count += 1;
212        }
213
214        let ep = route
215            .endpoints
216            .entry(event.endpoint)
217            .or_insert_with(EndpointData::new);
218        ep.total_requests += 1;
219        if event.is_error {
220            ep.total_errors += 1;
221        }
222        push_latency(&mut ep.latencies_ms, event.latency_ms);
223    }
224
225    /// Records a successful generate (non-streaming) request.
226    ///
227    /// This is a convenience wrapper used by provider handlers to reduce
228    /// duplicated recording logic across OpenAI, Anthropic and Google filters.
229    pub fn record_success(
230        &self,
231        route: String,
232        endpoint: String,
233        start: Instant,
234        input_tokens: Option<u32>,
235        output_tokens: Option<u32>,
236    ) {
237        self.record(RequestMetrics {
238            route,
239            endpoint,
240            latency_ms: start.elapsed().as_millis() as u64,
241            is_error: false,
242            input_tokens,
243            output_tokens,
244        });
245    }
246
247    /// Records a failed or stream-only request.
248    ///
249    /// `is_error` should be `true` for upstream failures and `false` for
250    /// streaming requests where token counts are unavailable.
251    pub fn record_outcome(&self, route: String, endpoint: String, start: Instant, is_error: bool) {
252        self.record(RequestMetrics {
253            route,
254            endpoint,
255            latency_ms: start.elapsed().as_millis() as u64,
256            is_error,
257            input_tokens: None,
258            output_tokens: None,
259        });
260    }
261
262    /// Produces a serialisable snapshot of all collected metrics.
263    ///
264    /// The read-lock is held only long enough to clone raw counters and latency
265    /// vectors. Expensive work (sorting for percentiles, computing averages) is
266    /// performed after the lock is released so that concurrent `record()` calls
267    /// are not blocked by a `/v1/metrics` request.
268    pub fn snapshot(&self) -> MetricsSnapshot {
269        let uptime_seconds = self.started_at.elapsed().as_secs();
270
271        // Clone raw data under a short-lived read lock.
272        let cloned_routes: Vec<(String, ClonedRouteData)> = {
273            let inner = self.inner.read().unwrap_or_else(|e| e.into_inner());
274            inner
275                .routes
276                .iter()
277                .map(|(name, data)| {
278                    let endpoints: Vec<(String, ClonedEndpointData)> = data
279                        .endpoints
280                        .iter()
281                        .map(|(ep_name, ep)| {
282                            (
283                                ep_name.clone(),
284                                ClonedEndpointData {
285                                    total_requests: ep.total_requests,
286                                    total_errors: ep.total_errors,
287                                    latencies_ms: ep.latencies_ms.clone(),
288                                },
289                            )
290                        })
291                        .collect();
292
293                    (
294                        name.clone(),
295                        ClonedRouteData {
296                            total_requests: data.total_requests,
297                            total_errors: data.total_errors,
298                            latencies_ms: data.latencies_ms.clone(),
299                            total_input_tokens: data.total_input_tokens,
300                            total_output_tokens: data.total_output_tokens,
301                            token_request_count: data.token_request_count,
302                            last_used: data.last_used,
303                            endpoints,
304                        },
305                    )
306                })
307                .collect()
308            // lock released here
309        };
310
311        // Compute percentiles and build snapshots without holding the lock.
312        let routes = cloned_routes
313            .into_iter()
314            .map(|(name, data)| {
315                let by_endpoint = data
316                    .endpoints
317                    .into_iter()
318                    .map(|(ep_name, ep)| {
319                        let ep_snap = EndpointSnapshot {
320                            total_requests: ep.total_requests,
321                            total_errors: ep.total_errors,
322                            latency_p50_ms: percentile(&ep.latencies_ms, 50.0),
323                            latency_p99_ms: percentile(&ep.latencies_ms, 99.0),
324                        };
325                        (ep_name, ep_snap)
326                    })
327                    .collect();
328
329                let route_snapshot = RouteSnapshot {
330                    total_requests: data.total_requests,
331                    total_errors: data.total_errors,
332                    error_rate: error_rate(data.total_requests, data.total_errors),
333                    latency_p50_ms: percentile(&data.latencies_ms, 50.0),
334                    latency_p99_ms: percentile(&data.latencies_ms, 99.0),
335                    avg_input_tokens: avg(data.total_input_tokens, data.token_request_count),
336                    avg_output_tokens: avg(data.total_output_tokens, data.token_request_count),
337                    last_used: data.last_used.map(system_time_to_unix_secs),
338                    by_endpoint,
339                };
340                (name, route_snapshot)
341            })
342            .collect();
343
344        MetricsSnapshot {
345            uptime_seconds,
346            routes,
347        }
348    }
349}
350
351impl Default for MetricsStore {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357// ── Private helpers ─────────────────────────────────────────────────────────
358
359impl RouteData {
360    fn new() -> Self {
361        Self {
362            total_requests: 0,
363            total_errors: 0,
364            latencies_ms: Vec::new(),
365            total_input_tokens: 0,
366            total_output_tokens: 0,
367            token_request_count: 0,
368            last_used: None,
369            endpoints: HashMap::new(),
370        }
371    }
372}
373
374impl EndpointData {
375    fn new() -> Self {
376        Self {
377            total_requests: 0,
378            total_errors: 0,
379            latencies_ms: Vec::new(),
380        }
381    }
382}
383
384/// Appends a latency sample, evicting the oldest half when the buffer is full.
385fn push_latency(latencies: &mut Vec<u64>, value: u64) {
386    if latencies.len() >= MAX_LATENCY_SAMPLES {
387        let half = latencies.len() / 2;
388        latencies.drain(..half);
389    }
390    latencies.push(value);
391}
392
393/// Computes a percentile from an unsorted slice using the ceiling-rank method.
394fn percentile(latencies: &[u64], p: f64) -> Option<u64> {
395    if latencies.is_empty() {
396        return None;
397    }
398    let mut sorted = latencies.to_vec();
399    sorted.sort_unstable();
400    let n = sorted.len();
401    let rank = (p / 100.0 * n as f64).ceil() as usize;
402    let idx = rank.saturating_sub(1).min(n - 1);
403    Some(sorted[idx])
404}
405
406fn error_rate(total: u64, errors: u64) -> f64 {
407    if total == 0 {
408        0.0
409    } else {
410        errors as f64 / total as f64
411    }
412}
413
414fn avg(total: u64, count: u64) -> Option<u64> {
415    if count == 0 {
416        None
417    } else {
418        Some(total / count)
419    }
420}
421
422fn system_time_to_unix_secs(t: SystemTime) -> u64 {
423    t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs()
424}
425
426// ── Tests ───────────────────────────────────────────────────────────────────
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[test]
433    fn empty_store_returns_empty_snapshot() {
434        let store = MetricsStore::new();
435        let snap = store.snapshot();
436        assert!(snap.routes.is_empty());
437        assert!(snap.uptime_seconds < 2);
438    }
439
440    #[test]
441    fn record_single_success() {
442        let store = MetricsStore::new();
443        store.record(RequestMetrics {
444            route: "fast".into(),
445            endpoint: "openai:gpt-4o-mini".into(),
446            latency_ms: 300,
447            is_error: false,
448            input_tokens: Some(100),
449            output_tokens: Some(50),
450        });
451
452        let snap = store.snapshot();
453        let route = snap.routes.get("fast").expect("route should exist");
454        assert_eq!(route.total_requests, 1);
455        assert_eq!(route.total_errors, 0);
456        assert!((route.error_rate - 0.0).abs() < f64::EPSILON);
457        assert_eq!(route.latency_p50_ms, Some(300));
458        assert_eq!(route.latency_p99_ms, Some(300));
459        assert_eq!(route.avg_input_tokens, Some(100));
460        assert_eq!(route.avg_output_tokens, Some(50));
461        assert!(route.last_used.is_some());
462
463        let ep = route
464            .by_endpoint
465            .get("openai:gpt-4o-mini")
466            .expect("endpoint should exist");
467        assert_eq!(ep.total_requests, 1);
468        assert_eq!(ep.total_errors, 0);
469    }
470
471    #[test]
472    fn record_error_increments_counters() {
473        let store = MetricsStore::new();
474        store.record(RequestMetrics {
475            route: "fast".into(),
476            endpoint: "openai:gpt-4o-mini".into(),
477            latency_ms: 500,
478            is_error: true,
479            input_tokens: None,
480            output_tokens: None,
481        });
482
483        let snap = store.snapshot();
484        let route = &snap.routes["fast"];
485        assert_eq!(route.total_requests, 1);
486        assert_eq!(route.total_errors, 1);
487        assert!((route.error_rate - 1.0).abs() < f64::EPSILON);
488        assert_eq!(route.avg_input_tokens, None);
489    }
490
491    #[test]
492    fn multiple_endpoints_tracked_separately() {
493        let store = MetricsStore::new();
494        for _ in 0..3 {
495            store.record(RequestMetrics {
496                route: "fast".into(),
497                endpoint: "openai:gpt-4o-mini".into(),
498                latency_ms: 200,
499                is_error: false,
500                input_tokens: Some(50),
501                output_tokens: Some(25),
502            });
503        }
504        for _ in 0..2 {
505            store.record(RequestMetrics {
506                route: "fast".into(),
507                endpoint: "anthropic:claude-haiku".into(),
508                latency_ms: 400,
509                is_error: false,
510                input_tokens: Some(60),
511                output_tokens: Some(30),
512            });
513        }
514
515        let snap = store.snapshot();
516        let route = &snap.routes["fast"];
517        assert_eq!(route.total_requests, 5);
518        assert_eq!(route.by_endpoint.len(), 2);
519        assert_eq!(route.by_endpoint["openai:gpt-4o-mini"].total_requests, 3);
520        assert_eq!(
521            route.by_endpoint["anthropic:claude-haiku"].total_requests,
522            2
523        );
524    }
525
526    #[test]
527    fn percentile_calculation() {
528        // Deterministic check with known values.
529        let latencies: Vec<u64> = (1..=100).collect();
530        assert_eq!(percentile(&latencies, 50.0), Some(50));
531        assert_eq!(percentile(&latencies, 99.0), Some(99));
532        assert_eq!(percentile(&[], 50.0), None);
533    }
534
535    #[test]
536    fn latency_buffer_eviction() {
537        let store = MetricsStore::new();
538        for i in 0..(MAX_LATENCY_SAMPLES + 10) {
539            store.record(RequestMetrics {
540                route: "r".into(),
541                endpoint: "e".into(),
542                latency_ms: i as u64,
543                is_error: false,
544                input_tokens: None,
545                output_tokens: None,
546            });
547        }
548        let inner = store.inner.read().unwrap_or_else(|e| e.into_inner());
549        let route = &inner.routes["r"];
550        assert!(route.latencies_ms.len() <= MAX_LATENCY_SAMPLES);
551    }
552
553    #[test]
554    fn snapshot_serialises_to_json() {
555        let store = MetricsStore::new();
556        store.record(RequestMetrics {
557            route: "default".into(),
558            endpoint: "openai:gpt-4o".into(),
559            latency_ms: 250,
560            is_error: false,
561            input_tokens: Some(10),
562            output_tokens: Some(5),
563        });
564        let snap = store.snapshot();
565        let json = serde_json::to_value(&snap).expect("should serialise");
566        assert!(json["uptime_seconds"].is_number());
567        assert_eq!(json["routes"]["default"]["total_requests"], 1);
568        assert_eq!(
569            json["routes"]["default"]["by_endpoint"]["openai:gpt-4o"]["total_requests"],
570            1
571        );
572    }
573}