1use std::collections::HashMap;
31use std::sync::RwLock;
32use std::time::{Instant, SystemTime, UNIX_EPOCH};
33
34use serde::Serialize;
35
36const MAX_LATENCY_SAMPLES: usize = 10_000;
45
46pub struct MetricsStore {
62 started_at: Instant,
63 inner: RwLock<StoreInner>,
64}
65
66pub struct RequestMetrics {
68 pub route: String,
70 pub endpoint: String,
72 pub latency_ms: u64,
74 pub is_error: bool,
76 pub input_tokens: Option<u32>,
78 pub output_tokens: Option<u32>,
80}
81
82pub fn format_endpoint(provider_name: &str, model_id: &str) -> String {
86 format!("{provider_name}:{model_id}")
87}
88
89#[derive(Debug, Serialize)]
93pub struct MetricsSnapshot {
94 pub uptime_seconds: u64,
96 pub routes: HashMap<String, RouteSnapshot>,
98}
99
100#[derive(Debug, Serialize)]
102pub struct RouteSnapshot {
103 pub total_requests: u64,
104 pub total_errors: u64,
105 pub error_rate: f64,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub latency_p50_ms: Option<u64>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub latency_p99_ms: Option<u64>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub avg_input_tokens: Option<u64>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 pub avg_output_tokens: Option<u64>,
114 #[serde(skip_serializing_if = "Option::is_none")]
116 pub last_used: Option<u64>,
117 pub by_endpoint: HashMap<String, EndpointSnapshot>,
119}
120
121#[derive(Debug, Serialize)]
123pub struct EndpointSnapshot {
124 pub total_requests: u64,
125 pub total_errors: u64,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub latency_p50_ms: Option<u64>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub latency_p99_ms: Option<u64>,
130}
131
132struct StoreInner {
135 routes: HashMap<String, RouteData>,
136}
137
138struct RouteData {
139 total_requests: u64,
140 total_errors: u64,
141 latencies_ms: Vec<u64>,
142 total_input_tokens: u64,
143 total_output_tokens: u64,
144 token_request_count: u64,
146 last_used: Option<SystemTime>,
147 endpoints: HashMap<String, EndpointData>,
148}
149
150struct EndpointData {
151 total_requests: u64,
152 total_errors: u64,
153 latencies_ms: Vec<u64>,
154}
155
156struct ClonedRouteData {
159 total_requests: u64,
160 total_errors: u64,
161 latencies_ms: Vec<u64>,
162 total_input_tokens: u64,
163 total_output_tokens: u64,
164 token_request_count: u64,
165 last_used: Option<SystemTime>,
166 endpoints: Vec<(String, ClonedEndpointData)>,
167}
168
169struct ClonedEndpointData {
171 total_requests: u64,
172 total_errors: u64,
173 latencies_ms: Vec<u64>,
174}
175
176impl MetricsStore {
179 pub fn new() -> Self {
181 Self {
182 started_at: Instant::now(),
183 inner: RwLock::new(StoreInner {
184 routes: HashMap::new(),
185 }),
186 }
187 }
188
189 pub fn record(&self, event: RequestMetrics) {
194 let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner());
195
196 let route = inner
197 .routes
198 .entry(event.route)
199 .or_insert_with(RouteData::new);
200
201 route.total_requests += 1;
202 if event.is_error {
203 route.total_errors += 1;
204 }
205 push_latency(&mut route.latencies_ms, event.latency_ms);
206 route.last_used = Some(SystemTime::now());
207
208 if let (Some(input), Some(output)) = (event.input_tokens, event.output_tokens) {
209 route.total_input_tokens += input as u64;
210 route.total_output_tokens += output as u64;
211 route.token_request_count += 1;
212 }
213
214 let ep = route
215 .endpoints
216 .entry(event.endpoint)
217 .or_insert_with(EndpointData::new);
218 ep.total_requests += 1;
219 if event.is_error {
220 ep.total_errors += 1;
221 }
222 push_latency(&mut ep.latencies_ms, event.latency_ms);
223 }
224
225 pub fn record_success(
230 &self,
231 route: String,
232 endpoint: String,
233 start: Instant,
234 input_tokens: Option<u32>,
235 output_tokens: Option<u32>,
236 ) {
237 self.record(RequestMetrics {
238 route,
239 endpoint,
240 latency_ms: start.elapsed().as_millis() as u64,
241 is_error: false,
242 input_tokens,
243 output_tokens,
244 });
245 }
246
247 pub fn record_outcome(&self, route: String, endpoint: String, start: Instant, is_error: bool) {
252 self.record(RequestMetrics {
253 route,
254 endpoint,
255 latency_ms: start.elapsed().as_millis() as u64,
256 is_error,
257 input_tokens: None,
258 output_tokens: None,
259 });
260 }
261
262 pub fn snapshot(&self) -> MetricsSnapshot {
269 let uptime_seconds = self.started_at.elapsed().as_secs();
270
271 let cloned_routes: Vec<(String, ClonedRouteData)> = {
273 let inner = self.inner.read().unwrap_or_else(|e| e.into_inner());
274 inner
275 .routes
276 .iter()
277 .map(|(name, data)| {
278 let endpoints: Vec<(String, ClonedEndpointData)> = data
279 .endpoints
280 .iter()
281 .map(|(ep_name, ep)| {
282 (
283 ep_name.clone(),
284 ClonedEndpointData {
285 total_requests: ep.total_requests,
286 total_errors: ep.total_errors,
287 latencies_ms: ep.latencies_ms.clone(),
288 },
289 )
290 })
291 .collect();
292
293 (
294 name.clone(),
295 ClonedRouteData {
296 total_requests: data.total_requests,
297 total_errors: data.total_errors,
298 latencies_ms: data.latencies_ms.clone(),
299 total_input_tokens: data.total_input_tokens,
300 total_output_tokens: data.total_output_tokens,
301 token_request_count: data.token_request_count,
302 last_used: data.last_used,
303 endpoints,
304 },
305 )
306 })
307 .collect()
308 };
310
311 let routes = cloned_routes
313 .into_iter()
314 .map(|(name, data)| {
315 let by_endpoint = data
316 .endpoints
317 .into_iter()
318 .map(|(ep_name, ep)| {
319 let ep_snap = EndpointSnapshot {
320 total_requests: ep.total_requests,
321 total_errors: ep.total_errors,
322 latency_p50_ms: percentile(&ep.latencies_ms, 50.0),
323 latency_p99_ms: percentile(&ep.latencies_ms, 99.0),
324 };
325 (ep_name, ep_snap)
326 })
327 .collect();
328
329 let route_snapshot = RouteSnapshot {
330 total_requests: data.total_requests,
331 total_errors: data.total_errors,
332 error_rate: error_rate(data.total_requests, data.total_errors),
333 latency_p50_ms: percentile(&data.latencies_ms, 50.0),
334 latency_p99_ms: percentile(&data.latencies_ms, 99.0),
335 avg_input_tokens: avg(data.total_input_tokens, data.token_request_count),
336 avg_output_tokens: avg(data.total_output_tokens, data.token_request_count),
337 last_used: data.last_used.map(system_time_to_unix_secs),
338 by_endpoint,
339 };
340 (name, route_snapshot)
341 })
342 .collect();
343
344 MetricsSnapshot {
345 uptime_seconds,
346 routes,
347 }
348 }
349}
350
351impl Default for MetricsStore {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357impl RouteData {
360 fn new() -> Self {
361 Self {
362 total_requests: 0,
363 total_errors: 0,
364 latencies_ms: Vec::new(),
365 total_input_tokens: 0,
366 total_output_tokens: 0,
367 token_request_count: 0,
368 last_used: None,
369 endpoints: HashMap::new(),
370 }
371 }
372}
373
374impl EndpointData {
375 fn new() -> Self {
376 Self {
377 total_requests: 0,
378 total_errors: 0,
379 latencies_ms: Vec::new(),
380 }
381 }
382}
383
384fn push_latency(latencies: &mut Vec<u64>, value: u64) {
386 if latencies.len() >= MAX_LATENCY_SAMPLES {
387 let half = latencies.len() / 2;
388 latencies.drain(..half);
389 }
390 latencies.push(value);
391}
392
393fn percentile(latencies: &[u64], p: f64) -> Option<u64> {
395 if latencies.is_empty() {
396 return None;
397 }
398 let mut sorted = latencies.to_vec();
399 sorted.sort_unstable();
400 let n = sorted.len();
401 let rank = (p / 100.0 * n as f64).ceil() as usize;
402 let idx = rank.saturating_sub(1).min(n - 1);
403 Some(sorted[idx])
404}
405
406fn error_rate(total: u64, errors: u64) -> f64 {
407 if total == 0 {
408 0.0
409 } else {
410 errors as f64 / total as f64
411 }
412}
413
414fn avg(total: u64, count: u64) -> Option<u64> {
415 if count == 0 {
416 None
417 } else {
418 Some(total / count)
419 }
420}
421
422fn system_time_to_unix_secs(t: SystemTime) -> u64 {
423 t.duration_since(UNIX_EPOCH).unwrap_or_default().as_secs()
424}
425
426#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[test]
433 fn empty_store_returns_empty_snapshot() {
434 let store = MetricsStore::new();
435 let snap = store.snapshot();
436 assert!(snap.routes.is_empty());
437 assert!(snap.uptime_seconds < 2);
438 }
439
440 #[test]
441 fn record_single_success() {
442 let store = MetricsStore::new();
443 store.record(RequestMetrics {
444 route: "fast".into(),
445 endpoint: "openai:gpt-4o-mini".into(),
446 latency_ms: 300,
447 is_error: false,
448 input_tokens: Some(100),
449 output_tokens: Some(50),
450 });
451
452 let snap = store.snapshot();
453 let route = snap.routes.get("fast").expect("route should exist");
454 assert_eq!(route.total_requests, 1);
455 assert_eq!(route.total_errors, 0);
456 assert!((route.error_rate - 0.0).abs() < f64::EPSILON);
457 assert_eq!(route.latency_p50_ms, Some(300));
458 assert_eq!(route.latency_p99_ms, Some(300));
459 assert_eq!(route.avg_input_tokens, Some(100));
460 assert_eq!(route.avg_output_tokens, Some(50));
461 assert!(route.last_used.is_some());
462
463 let ep = route
464 .by_endpoint
465 .get("openai:gpt-4o-mini")
466 .expect("endpoint should exist");
467 assert_eq!(ep.total_requests, 1);
468 assert_eq!(ep.total_errors, 0);
469 }
470
471 #[test]
472 fn record_error_increments_counters() {
473 let store = MetricsStore::new();
474 store.record(RequestMetrics {
475 route: "fast".into(),
476 endpoint: "openai:gpt-4o-mini".into(),
477 latency_ms: 500,
478 is_error: true,
479 input_tokens: None,
480 output_tokens: None,
481 });
482
483 let snap = store.snapshot();
484 let route = &snap.routes["fast"];
485 assert_eq!(route.total_requests, 1);
486 assert_eq!(route.total_errors, 1);
487 assert!((route.error_rate - 1.0).abs() < f64::EPSILON);
488 assert_eq!(route.avg_input_tokens, None);
489 }
490
491 #[test]
492 fn multiple_endpoints_tracked_separately() {
493 let store = MetricsStore::new();
494 for _ in 0..3 {
495 store.record(RequestMetrics {
496 route: "fast".into(),
497 endpoint: "openai:gpt-4o-mini".into(),
498 latency_ms: 200,
499 is_error: false,
500 input_tokens: Some(50),
501 output_tokens: Some(25),
502 });
503 }
504 for _ in 0..2 {
505 store.record(RequestMetrics {
506 route: "fast".into(),
507 endpoint: "anthropic:claude-haiku".into(),
508 latency_ms: 400,
509 is_error: false,
510 input_tokens: Some(60),
511 output_tokens: Some(30),
512 });
513 }
514
515 let snap = store.snapshot();
516 let route = &snap.routes["fast"];
517 assert_eq!(route.total_requests, 5);
518 assert_eq!(route.by_endpoint.len(), 2);
519 assert_eq!(route.by_endpoint["openai:gpt-4o-mini"].total_requests, 3);
520 assert_eq!(
521 route.by_endpoint["anthropic:claude-haiku"].total_requests,
522 2
523 );
524 }
525
526 #[test]
527 fn percentile_calculation() {
528 let latencies: Vec<u64> = (1..=100).collect();
530 assert_eq!(percentile(&latencies, 50.0), Some(50));
531 assert_eq!(percentile(&latencies, 99.0), Some(99));
532 assert_eq!(percentile(&[], 50.0), None);
533 }
534
535 #[test]
536 fn latency_buffer_eviction() {
537 let store = MetricsStore::new();
538 for i in 0..(MAX_LATENCY_SAMPLES + 10) {
539 store.record(RequestMetrics {
540 route: "r".into(),
541 endpoint: "e".into(),
542 latency_ms: i as u64,
543 is_error: false,
544 input_tokens: None,
545 output_tokens: None,
546 });
547 }
548 let inner = store.inner.read().unwrap_or_else(|e| e.into_inner());
549 let route = &inner.routes["r"];
550 assert!(route.latencies_ms.len() <= MAX_LATENCY_SAMPLES);
551 }
552
553 #[test]
554 fn snapshot_serialises_to_json() {
555 let store = MetricsStore::new();
556 store.record(RequestMetrics {
557 route: "default".into(),
558 endpoint: "openai:gpt-4o".into(),
559 latency_ms: 250,
560 is_error: false,
561 input_tokens: Some(10),
562 output_tokens: Some(5),
563 });
564 let snap = store.snapshot();
565 let json = serde_json::to_value(&snap).expect("should serialise");
566 assert!(json["uptime_seconds"].is_number());
567 assert_eq!(json["routes"]["default"]["total_requests"], 1);
568 assert_eq!(
569 json["routes"]["default"]["by_endpoint"]["openai:gpt-4o"]["total_requests"],
570 1
571 );
572 }
573}