1use std::fmt::Write as FmtWrite;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::Instant;
24
25pub struct Counter {
29 value: AtomicU64,
30 name: &'static str,
31 help: &'static str,
32}
33
34impl Counter {
35 pub fn new(name: &'static str, help: &'static str) -> Self {
37 Self {
38 value: AtomicU64::new(0),
39 name,
40 help,
41 }
42 }
43
44 pub fn inc(&self) {
46 self.value.fetch_add(1, Ordering::Relaxed);
47 }
48
49 pub fn inc_by(&self, n: u64) {
51 self.value.fetch_add(n, Ordering::Relaxed);
52 }
53
54 pub fn get(&self) -> u64 {
56 self.value.load(Ordering::Relaxed)
57 }
58
59 pub fn name(&self) -> &'static str {
61 self.name
62 }
63
64 pub fn help(&self) -> &'static str {
66 self.help
67 }
68}
69
70pub struct Gauge {
76 value: AtomicU64,
77 name: &'static str,
78 help: &'static str,
79}
80
81impl Gauge {
82 pub fn new(name: &'static str, help: &'static str) -> Self {
84 Self {
85 value: AtomicU64::new(f64::to_bits(0.0)),
86 name,
87 help,
88 }
89 }
90
91 pub fn set(&self, val: f64) {
93 self.value.store(f64::to_bits(val), Ordering::Relaxed);
94 }
95
96 pub fn inc(&self) {
98 self.add(1.0);
99 }
100
101 pub fn dec(&self) {
103 self.add(-1.0);
104 }
105
106 fn add(&self, delta: f64) {
108 loop {
109 let current_bits = self.value.load(Ordering::Relaxed);
110 let current = f64::from_bits(current_bits);
111 let new_val = current + delta;
112 let new_bits = f64::to_bits(new_val);
113 if self
114 .value
115 .compare_exchange_weak(current_bits, new_bits, Ordering::Relaxed, Ordering::Relaxed)
116 .is_ok()
117 {
118 break;
119 }
120 }
121 }
122
123 pub fn get(&self) -> f64 {
125 f64::from_bits(self.value.load(Ordering::Relaxed))
126 }
127
128 pub fn name(&self) -> &'static str {
130 self.name
131 }
132
133 pub fn help(&self) -> &'static str {
135 self.help
136 }
137}
138
139pub struct Histogram {
146 buckets: Vec<f64>,
147 counts: Vec<AtomicU64>,
149 sum: AtomicU64,
151 count: AtomicU64,
153 name: &'static str,
154 help: &'static str,
155}
156
157impl Histogram {
158 pub fn new(name: &'static str, help: &'static str, mut buckets: Vec<f64>) -> Self {
163 buckets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
164 buckets.dedup();
165
166 let counts: Vec<AtomicU64> = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
168
169 Self {
170 buckets,
171 counts,
172 sum: AtomicU64::new(f64::to_bits(0.0)),
173 count: AtomicU64::new(0),
174 name,
175 help,
176 }
177 }
178
179 pub fn observe(&self, value: f64) {
181 for (i, &boundary) in self.buckets.iter().enumerate() {
183 if value <= boundary {
184 self.counts[i].fetch_add(1, Ordering::Relaxed);
185 }
186 }
187 if let Some(inf_bucket) = self.counts.last() {
189 inf_bucket.fetch_add(1, Ordering::Relaxed);
190 }
191
192 loop {
194 let current_bits = self.sum.load(Ordering::Relaxed);
195 let current = f64::from_bits(current_bits);
196 let new_val = current + value;
197 let new_bits = f64::to_bits(new_val);
198 if self
199 .sum
200 .compare_exchange_weak(current_bits, new_bits, Ordering::Relaxed, Ordering::Relaxed)
201 .is_ok()
202 {
203 break;
204 }
205 }
206
207 self.count.fetch_add(1, Ordering::Relaxed);
208 }
209
210 pub fn time<F, R>(&self, f: F) -> R
212 where
213 F: FnOnce() -> R,
214 {
215 let start = Instant::now();
216 let result = f();
217 let elapsed = start.elapsed().as_secs_f64();
218 self.observe(elapsed);
219 result
220 }
221
222 pub fn name(&self) -> &'static str {
224 self.name
225 }
226
227 pub fn help(&self) -> &'static str {
229 self.help
230 }
231
232 pub fn count(&self) -> u64 {
234 self.count.load(Ordering::Relaxed)
235 }
236
237 pub fn sum(&self) -> f64 {
239 f64::from_bits(self.sum.load(Ordering::Relaxed))
240 }
241
242 pub fn bucket_count(&self, index: usize) -> u64 {
244 self.counts
245 .get(index)
246 .map(|c| c.load(Ordering::Relaxed))
247 .unwrap_or(0)
248 }
249
250 pub fn bucket_boundaries(&self) -> &[f64] {
252 &self.buckets
253 }
254}
255
256pub fn default_latency_buckets() -> Vec<f64> {
262 vec![
263 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
264 ]
265}
266
267pub fn default_rate_buckets() -> Vec<f64> {
271 vec![1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]
272}
273
274pub struct InferenceMetrics {
299 pub tokens_generated_total: Counter,
302 pub requests_total: Counter,
304 pub errors_total: Counter,
306 pub prompt_tokens_total: Counter,
308
309 pub prefill_duration_seconds: Histogram,
312 pub decode_token_duration_seconds: Histogram,
314 pub request_duration_seconds: Histogram,
316 pub tokens_per_second: Histogram,
318
319 pub active_requests: Gauge,
322 pub kv_cache_utilization: Gauge,
324 pub model_memory_bytes: Gauge,
326 pub request_tokens_per_second: Gauge,
328 pub inter_token_latency_p50_seconds: Gauge,
330 pub inter_token_latency_p95_seconds: Gauge,
332 pub queue_wait_seconds: Gauge,
334 pub kv_cache_compression_level: Gauge,
337}
338
339impl InferenceMetrics {
340 pub fn update_memory_from_rss(&self) {
345 let rss = crate::memory::get_rss_bytes();
346 self.model_memory_bytes.set(rss as f64);
347 }
348
349 pub fn new() -> Self {
351 Self {
352 tokens_generated_total: Counter::new(
353 "oxibonsai_tokens_generated_total",
354 "Total tokens generated",
355 ),
356 requests_total: Counter::new("oxibonsai_requests_total", "Total inference requests"),
357 errors_total: Counter::new("oxibonsai_errors_total", "Total inference errors"),
358 prompt_tokens_total: Counter::new(
359 "oxibonsai_prompt_tokens_total",
360 "Total prompt tokens processed",
361 ),
362
363 prefill_duration_seconds: Histogram::new(
364 "oxibonsai_prefill_duration_seconds",
365 "Prefill (prompt processing) duration in seconds",
366 default_latency_buckets(),
367 ),
368 decode_token_duration_seconds: Histogram::new(
369 "oxibonsai_decode_token_duration_seconds",
370 "Per-token decode step duration in seconds",
371 default_latency_buckets(),
372 ),
373 request_duration_seconds: Histogram::new(
374 "oxibonsai_request_duration_seconds",
375 "End-to-end request duration in seconds",
376 default_latency_buckets(),
377 ),
378 tokens_per_second: Histogram::new(
379 "oxibonsai_tokens_per_second",
380 "Observed tokens per second rate",
381 default_rate_buckets(),
382 ),
383
384 active_requests: Gauge::new(
385 "oxibonsai_active_requests",
386 "Number of currently active requests",
387 ),
388 kv_cache_utilization: Gauge::new(
389 "oxibonsai_kv_cache_utilization",
390 "KV cache utilization ratio (0.0 to 1.0)",
391 ),
392 model_memory_bytes: Gauge::new(
393 "oxibonsai_model_memory_bytes",
394 "Model memory usage in bytes",
395 ),
396 request_tokens_per_second: Gauge::new(
397 "oxibonsai_request_tokens_per_second",
398 "EWMA tokens-per-second across recent requests",
399 ),
400 inter_token_latency_p50_seconds: Gauge::new(
401 "oxibonsai_inter_token_latency_p50_seconds",
402 "Median inter-token latency across recent requests (seconds)",
403 ),
404 inter_token_latency_p95_seconds: Gauge::new(
405 "oxibonsai_inter_token_latency_p95_seconds",
406 "p95 inter-token latency across recent requests (seconds)",
407 ),
408 queue_wait_seconds: Gauge::new(
409 "oxibonsai_queue_wait_seconds",
410 "Mean queue-wait (admission to first token) across recent requests (seconds)",
411 ),
412 kv_cache_compression_level: Gauge::new(
413 "oxibonsai_kv_cache_compression_level",
414 "KV cache compression tier: 0=FP16, 1=Q8, 2=Q4",
415 ),
416 }
417 }
418
419 pub fn update_request_rate(&self, snap: &crate::request_metrics::AggregateRateSnapshot) {
421 self.request_tokens_per_second
422 .set(snap.mean_tokens_per_second);
423 self.inter_token_latency_p50_seconds
424 .set(snap.tbt_p50_seconds);
425 self.inter_token_latency_p95_seconds
426 .set(snap.tbt_p95_seconds);
427 self.queue_wait_seconds.set(snap.mean_queue_wait_seconds);
428 }
429
430 pub fn update_kv_cache_level(&self, level: crate::kv_cache_policy::KvCacheLevel) {
432 self.kv_cache_compression_level.set(level.ordinal() as f64);
433 }
434
435 pub fn render_prometheus(&self) -> String {
437 let mut out = String::with_capacity(4096);
438
439 render_counter(&mut out, &self.tokens_generated_total);
441 render_counter(&mut out, &self.requests_total);
442 render_counter(&mut out, &self.errors_total);
443 render_counter(&mut out, &self.prompt_tokens_total);
444
445 render_histogram(&mut out, &self.prefill_duration_seconds);
447 render_histogram(&mut out, &self.decode_token_duration_seconds);
448 render_histogram(&mut out, &self.request_duration_seconds);
449 render_histogram(&mut out, &self.tokens_per_second);
450
451 render_gauge(&mut out, &self.active_requests);
453 render_gauge(&mut out, &self.kv_cache_utilization);
454 render_gauge(&mut out, &self.model_memory_bytes);
455 render_gauge(&mut out, &self.request_tokens_per_second);
456 render_gauge(&mut out, &self.inter_token_latency_p50_seconds);
457 render_gauge(&mut out, &self.inter_token_latency_p95_seconds);
458 render_gauge(&mut out, &self.queue_wait_seconds);
459 render_gauge(&mut out, &self.kv_cache_compression_level);
460
461 out
462 }
463}
464
465impl Default for InferenceMetrics {
466 fn default() -> Self {
467 Self::new()
468 }
469}
470
471fn render_counter(out: &mut String, counter: &Counter) {
474 let _ = writeln!(out, "# HELP {} {}", counter.name(), counter.help());
475 let _ = writeln!(out, "# TYPE {} counter", counter.name());
476 let _ = writeln!(out, "{} {}", counter.name(), counter.get());
477 let _ = writeln!(out);
478}
479
480fn render_gauge(out: &mut String, gauge: &Gauge) {
481 let _ = writeln!(out, "# HELP {} {}", gauge.name(), gauge.help());
482 let _ = writeln!(out, "# TYPE {} gauge", gauge.name());
483 let value = gauge.get();
484 if value.fract() == 0.0 && value.is_finite() {
486 let _ = writeln!(out, "{} {}", gauge.name(), value as i64);
487 } else {
488 let _ = writeln!(out, "{} {value}", gauge.name());
489 }
490 let _ = writeln!(out);
491}
492
493fn render_histogram(out: &mut String, hist: &Histogram) {
494 let _ = writeln!(out, "# HELP {} {}", hist.name(), hist.help());
495 let _ = writeln!(out, "# TYPE {} histogram", hist.name());
496
497 for (i, &boundary) in hist.bucket_boundaries().iter().enumerate() {
498 let count = hist.bucket_count(i);
499 let le = format_f64_prometheus(boundary);
501 let _ = writeln!(out, "{}_bucket{{le=\"{le}\"}} {count}", hist.name());
502 }
503
504 let inf_count = hist.bucket_count(hist.bucket_boundaries().len());
506 let _ = writeln!(out, "{}_bucket{{le=\"+Inf\"}} {inf_count}", hist.name());
507
508 let sum = hist.sum();
509 let _ = writeln!(out, "{}_sum {}", hist.name(), format_f64_prometheus(sum));
510 let _ = writeln!(out, "{}_count {}", hist.name(), hist.count());
511 let _ = writeln!(out);
512}
513
514fn format_f64_prometheus(value: f64) -> String {
519 if value.fract() == 0.0 && value.is_finite() && value.abs() < 1e15 {
520 format!("{}", value as i64)
521 } else {
522 let s = format!("{value:.6}");
524 let s = s.trim_end_matches('0');
525 let s = s.trim_end_matches('.');
526 s.to_string()
527 }
528}
529
530#[cfg(test)]
533mod tests {
534 use super::*;
535
536 #[test]
537 fn counter_basic() {
538 let c = Counter::new("test_counter", "A test counter");
539 assert_eq!(c.get(), 0);
540 c.inc();
541 assert_eq!(c.get(), 1);
542 c.inc_by(5);
543 assert_eq!(c.get(), 6);
544 c.inc_by(0);
545 assert_eq!(c.get(), 6);
546 }
547
548 #[test]
549 fn counter_concurrent() {
550 use std::sync::Arc;
551 let c = Arc::new(Counter::new("concurrent_counter", "concurrent test"));
552 let mut handles = Vec::new();
553 for _ in 0..10 {
554 let c = Arc::clone(&c);
555 handles.push(std::thread::spawn(move || {
556 for _ in 0..1000 {
557 c.inc();
558 }
559 }));
560 }
561 for h in handles {
562 h.join().expect("thread should not panic");
563 }
564 assert_eq!(c.get(), 10_000);
565 }
566
567 #[test]
568 fn gauge_set_and_get() {
569 let g = Gauge::new("test_gauge", "A test gauge");
570 assert!((g.get() - 0.0).abs() < f64::EPSILON);
571 g.set(42.5);
572 assert!((g.get() - 42.5).abs() < f64::EPSILON);
573 }
574
575 #[test]
576 fn gauge_inc_dec() {
577 let g = Gauge::new("test_gauge_incdec", "inc dec test");
578 g.inc();
579 assert!((g.get() - 1.0).abs() < f64::EPSILON);
580 g.inc();
581 assert!((g.get() - 2.0).abs() < f64::EPSILON);
582 g.dec();
583 assert!((g.get() - 1.0).abs() < f64::EPSILON);
584 g.dec();
585 assert!(g.get().abs() < f64::EPSILON);
586 }
587
588 #[test]
589 fn gauge_concurrent() {
590 use std::sync::Arc;
591 let g = Arc::new(Gauge::new("concurrent_gauge", "concurrent gauge"));
592 let mut handles = Vec::new();
593 for i in 0..10 {
595 let g = Arc::clone(&g);
596 handles.push(std::thread::spawn(move || {
597 for _ in 0..1000 {
598 if i < 5 {
599 g.inc();
600 } else {
601 g.dec();
602 }
603 }
604 }));
605 }
606 for h in handles {
607 h.join().expect("thread should not panic");
608 }
609 assert!(g.get().abs() < f64::EPSILON);
610 }
611
612 #[test]
613 fn histogram_observe() {
614 let h = Histogram::new("test_hist", "A test histogram", vec![1.0, 5.0, 10.0]);
615 h.observe(0.5);
616 h.observe(3.0);
617 h.observe(7.0);
618 h.observe(15.0);
619
620 assert_eq!(h.bucket_count(0), 1);
626 assert_eq!(h.bucket_count(1), 2);
627 assert_eq!(h.bucket_count(2), 3);
628 assert_eq!(h.bucket_count(3), 4); assert_eq!(h.count(), 4);
631 let expected_sum = 0.5 + 3.0 + 7.0 + 15.0;
632 assert!((h.sum() - expected_sum).abs() < 1e-9);
633 }
634
635 #[test]
636 fn histogram_empty() {
637 let h = Histogram::new("empty_hist", "empty", vec![1.0, 5.0]);
638 assert_eq!(h.count(), 0);
639 assert!(h.sum().abs() < f64::EPSILON);
640 assert_eq!(h.bucket_count(0), 0);
641 assert_eq!(h.bucket_count(1), 0);
642 assert_eq!(h.bucket_count(2), 0); }
644
645 #[test]
646 fn histogram_time_closure() {
647 let h = Histogram::new("timed_hist", "timed", vec![0.001, 0.01, 0.1, 1.0]);
648 let result = h.time(|| {
649 42
651 });
652 assert_eq!(result, 42);
653 assert_eq!(h.count(), 1);
654 assert!(h.sum() < 1.0);
656 }
657
658 #[test]
659 fn histogram_boundary_values() {
660 let h = Histogram::new("boundary_hist", "boundary", vec![1.0, 5.0, 10.0]);
661 h.observe(5.0);
663 assert_eq!(h.bucket_count(0), 0);
668 assert_eq!(h.bucket_count(1), 1);
669 assert_eq!(h.bucket_count(2), 1);
670 assert_eq!(h.bucket_count(3), 1);
671 }
672
673 #[test]
674 fn default_buckets_sorted() {
675 let latency = default_latency_buckets();
676 for pair in latency.windows(2) {
677 assert!(pair[0] < pair[1], "latency buckets must be sorted");
678 }
679
680 let rate = default_rate_buckets();
681 for pair in rate.windows(2) {
682 assert!(pair[0] < pair[1], "rate buckets must be sorted");
683 }
684 }
685
686 #[test]
687 fn inference_metrics_default() {
688 let m = InferenceMetrics::default();
689 assert_eq!(m.tokens_generated_total.get(), 0);
690 assert_eq!(m.requests_total.get(), 0);
691 assert_eq!(m.errors_total.get(), 0);
692 assert!(m.active_requests.get().abs() < f64::EPSILON);
693 }
694
695 #[test]
696 fn render_prometheus_counter_format() {
697 let m = InferenceMetrics::new();
698 m.requests_total.inc_by(42);
699 let output = m.render_prometheus();
700
701 assert!(output.contains("# HELP oxibonsai_requests_total Total inference requests"));
702 assert!(output.contains("# TYPE oxibonsai_requests_total counter"));
703 assert!(output.contains("oxibonsai_requests_total 42"));
704 }
705
706 #[test]
707 fn render_prometheus_gauge_format() {
708 let m = InferenceMetrics::new();
709 m.active_requests.set(3.0);
710 let output = m.render_prometheus();
711
712 assert!(output.contains("# HELP oxibonsai_active_requests"));
713 assert!(output.contains("# TYPE oxibonsai_active_requests gauge"));
714 assert!(output.contains("oxibonsai_active_requests 3"));
715 }
716
717 #[test]
718 fn render_prometheus_histogram_format() {
719 let m = InferenceMetrics::new();
720 m.request_duration_seconds.observe(0.002);
721 m.request_duration_seconds.observe(0.05);
722 let output = m.render_prometheus();
723
724 assert!(output.contains("# HELP oxibonsai_request_duration_seconds"));
725 assert!(output.contains("# TYPE oxibonsai_request_duration_seconds histogram"));
726 assert!(output.contains("oxibonsai_request_duration_seconds_bucket{le=\"0.001\"} 0"));
727 assert!(output.contains("oxibonsai_request_duration_seconds_bucket{le=\"+Inf\"} 2"));
728 assert!(output.contains("oxibonsai_request_duration_seconds_count 2"));
729 }
730
731 #[test]
732 fn render_prometheus_full_output_parseable() {
733 let m = InferenceMetrics::new();
734 m.tokens_generated_total.inc_by(100);
735 m.requests_total.inc_by(5);
736 m.errors_total.inc();
737 m.prompt_tokens_total.inc_by(50);
738 m.active_requests.set(2.0);
739 m.kv_cache_utilization.set(0.75);
740 m.model_memory_bytes.set(1_073_741_824.0);
741 m.request_duration_seconds.observe(0.1);
742 m.prefill_duration_seconds.observe(0.01);
743 m.decode_token_duration_seconds.observe(0.001);
744 m.tokens_per_second.observe(42.0);
745
746 let output = m.render_prometheus();
747
748 let help_count = output.lines().filter(|l| l.starts_with("# HELP")).count();
750 let type_count = output.lines().filter(|l| l.starts_with("# TYPE")).count();
751 assert_eq!(help_count, type_count);
752
753 assert_eq!(help_count, 16);
758 }
759
760 #[test]
761 fn update_request_rate_writes_gauges() {
762 use crate::request_metrics::AggregateRateSnapshot;
763 let m = InferenceMetrics::new();
764 let snap = AggregateRateSnapshot {
765 completed_requests: 10,
766 mean_tokens_per_second: 42.5,
767 tbt_p50_seconds: 0.020,
768 tbt_p95_seconds: 0.080,
769 mean_queue_wait_seconds: 0.005,
770 };
771 m.update_request_rate(&snap);
772 assert!((m.request_tokens_per_second.get() - 42.5).abs() < 1e-6);
773 assert!((m.inter_token_latency_p50_seconds.get() - 0.020).abs() < 1e-6);
774 assert!((m.inter_token_latency_p95_seconds.get() - 0.080).abs() < 1e-6);
775 assert!((m.queue_wait_seconds.get() - 0.005).abs() < 1e-6);
776 }
777
778 #[test]
779 fn update_kv_cache_level_writes_gauge() {
780 use crate::kv_cache_policy::KvCacheLevel;
781 let m = InferenceMetrics::new();
782 m.update_kv_cache_level(KvCacheLevel::Fp16);
783 assert!(m.kv_cache_compression_level.get().abs() < 1e-6);
784 m.update_kv_cache_level(KvCacheLevel::Q8);
785 assert!((m.kv_cache_compression_level.get() - 1.0).abs() < 1e-6);
786 m.update_kv_cache_level(KvCacheLevel::Fp8);
787 assert!((m.kv_cache_compression_level.get() - 2.0).abs() < 1e-6);
788 m.update_kv_cache_level(KvCacheLevel::Q4);
789 assert!((m.kv_cache_compression_level.get() - 3.0).abs() < 1e-6);
790 }
791
792 #[test]
793 fn render_prometheus_includes_new_gauges() {
794 let m = InferenceMetrics::new();
795 let output = m.render_prometheus();
796 assert!(output.contains("oxibonsai_request_tokens_per_second"));
797 assert!(output.contains("oxibonsai_inter_token_latency_p50_seconds"));
798 assert!(output.contains("oxibonsai_inter_token_latency_p95_seconds"));
799 assert!(output.contains("oxibonsai_queue_wait_seconds"));
800 assert!(output.contains("oxibonsai_kv_cache_compression_level"));
801 }
802
803 #[test]
804 fn format_f64_prometheus_integers() {
805 assert_eq!(format_f64_prometheus(0.0), "0");
806 assert_eq!(format_f64_prometheus(42.0), "42");
807 assert_eq!(format_f64_prometheus(1000.0), "1000");
808 }
809
810 #[test]
811 fn format_f64_prometheus_fractions() {
812 assert_eq!(format_f64_prometheus(0.001), "0.001");
813 assert_eq!(format_f64_prometheus(0.5), "0.5");
814 assert_eq!(format_f64_prometheus(2.5), "2.5");
815 }
816
817 #[test]
818 fn histogram_deduplicates_and_sorts_buckets() {
819 let h = Histogram::new("dedup", "test", vec![5.0, 1.0, 5.0, 3.0, 1.0]);
820 assert_eq!(h.bucket_boundaries(), &[1.0, 3.0, 5.0]);
821 }
822}