use std::fmt::Write as FmtWrite;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
pub struct Counter {
value: AtomicU64,
name: &'static str,
help: &'static str,
}
impl Counter {
pub fn new(name: &'static str, help: &'static str) -> Self {
Self {
value: AtomicU64::new(0),
name,
help,
}
}
pub fn inc(&self) {
self.value.fetch_add(1, Ordering::Relaxed);
}
pub fn inc_by(&self, n: u64) {
self.value.fetch_add(n, Ordering::Relaxed);
}
pub fn get(&self) -> u64 {
self.value.load(Ordering::Relaxed)
}
pub fn name(&self) -> &'static str {
self.name
}
pub fn help(&self) -> &'static str {
self.help
}
}
pub struct Gauge {
value: AtomicU64,
name: &'static str,
help: &'static str,
}
impl Gauge {
pub fn new(name: &'static str, help: &'static str) -> Self {
Self {
value: AtomicU64::new(f64::to_bits(0.0)),
name,
help,
}
}
pub fn set(&self, val: f64) {
self.value.store(f64::to_bits(val), Ordering::Relaxed);
}
pub fn inc(&self) {
self.add(1.0);
}
pub fn dec(&self) {
self.add(-1.0);
}
fn add(&self, delta: f64) {
loop {
let current_bits = self.value.load(Ordering::Relaxed);
let current = f64::from_bits(current_bits);
let new_val = current + delta;
let new_bits = f64::to_bits(new_val);
if self
.value
.compare_exchange_weak(current_bits, new_bits, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
}
pub fn get(&self) -> f64 {
f64::from_bits(self.value.load(Ordering::Relaxed))
}
pub fn name(&self) -> &'static str {
self.name
}
pub fn help(&self) -> &'static str {
self.help
}
}
pub struct Histogram {
buckets: Vec<f64>,
counts: Vec<AtomicU64>,
sum: AtomicU64,
count: AtomicU64,
name: &'static str,
help: &'static str,
}
impl Histogram {
pub fn new(name: &'static str, help: &'static str, mut buckets: Vec<f64>) -> Self {
buckets.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
buckets.dedup();
let counts: Vec<AtomicU64> = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
Self {
buckets,
counts,
sum: AtomicU64::new(f64::to_bits(0.0)),
count: AtomicU64::new(0),
name,
help,
}
}
pub fn observe(&self, value: f64) {
for (i, &boundary) in self.buckets.iter().enumerate() {
if value <= boundary {
self.counts[i].fetch_add(1, Ordering::Relaxed);
}
}
if let Some(inf_bucket) = self.counts.last() {
inf_bucket.fetch_add(1, Ordering::Relaxed);
}
loop {
let current_bits = self.sum.load(Ordering::Relaxed);
let current = f64::from_bits(current_bits);
let new_val = current + value;
let new_bits = f64::to_bits(new_val);
if self
.sum
.compare_exchange_weak(current_bits, new_bits, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
break;
}
}
self.count.fetch_add(1, Ordering::Relaxed);
}
pub fn time<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
let start = Instant::now();
let result = f();
let elapsed = start.elapsed().as_secs_f64();
self.observe(elapsed);
result
}
pub fn name(&self) -> &'static str {
self.name
}
pub fn help(&self) -> &'static str {
self.help
}
pub fn count(&self) -> u64 {
self.count.load(Ordering::Relaxed)
}
pub fn sum(&self) -> f64 {
f64::from_bits(self.sum.load(Ordering::Relaxed))
}
pub fn bucket_count(&self, index: usize) -> u64 {
self.counts
.get(index)
.map(|c| c.load(Ordering::Relaxed))
.unwrap_or(0)
}
pub fn bucket_boundaries(&self) -> &[f64] {
&self.buckets
}
}
pub fn default_latency_buckets() -> Vec<f64> {
vec![
0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
]
}
pub fn default_rate_buckets() -> Vec<f64> {
vec![1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0]
}
pub struct InferenceMetrics {
pub tokens_generated_total: Counter,
pub requests_total: Counter,
pub errors_total: Counter,
pub prompt_tokens_total: Counter,
pub prefill_duration_seconds: Histogram,
pub decode_token_duration_seconds: Histogram,
pub request_duration_seconds: Histogram,
pub tokens_per_second: Histogram,
pub active_requests: Gauge,
pub kv_cache_utilization: Gauge,
pub model_memory_bytes: Gauge,
}
impl InferenceMetrics {
pub fn update_memory_from_rss(&self) {
let rss = crate::memory::get_rss_bytes();
self.model_memory_bytes.set(rss as f64);
}
pub fn new() -> Self {
Self {
tokens_generated_total: Counter::new(
"oxibonsai_tokens_generated_total",
"Total tokens generated",
),
requests_total: Counter::new("oxibonsai_requests_total", "Total inference requests"),
errors_total: Counter::new("oxibonsai_errors_total", "Total inference errors"),
prompt_tokens_total: Counter::new(
"oxibonsai_prompt_tokens_total",
"Total prompt tokens processed",
),
prefill_duration_seconds: Histogram::new(
"oxibonsai_prefill_duration_seconds",
"Prefill (prompt processing) duration in seconds",
default_latency_buckets(),
),
decode_token_duration_seconds: Histogram::new(
"oxibonsai_decode_token_duration_seconds",
"Per-token decode step duration in seconds",
default_latency_buckets(),
),
request_duration_seconds: Histogram::new(
"oxibonsai_request_duration_seconds",
"End-to-end request duration in seconds",
default_latency_buckets(),
),
tokens_per_second: Histogram::new(
"oxibonsai_tokens_per_second",
"Observed tokens per second rate",
default_rate_buckets(),
),
active_requests: Gauge::new(
"oxibonsai_active_requests",
"Number of currently active requests",
),
kv_cache_utilization: Gauge::new(
"oxibonsai_kv_cache_utilization",
"KV cache utilization ratio (0.0 to 1.0)",
),
model_memory_bytes: Gauge::new(
"oxibonsai_model_memory_bytes",
"Model memory usage in bytes",
),
}
}
pub fn render_prometheus(&self) -> String {
let mut out = String::with_capacity(4096);
render_counter(&mut out, &self.tokens_generated_total);
render_counter(&mut out, &self.requests_total);
render_counter(&mut out, &self.errors_total);
render_counter(&mut out, &self.prompt_tokens_total);
render_histogram(&mut out, &self.prefill_duration_seconds);
render_histogram(&mut out, &self.decode_token_duration_seconds);
render_histogram(&mut out, &self.request_duration_seconds);
render_histogram(&mut out, &self.tokens_per_second);
render_gauge(&mut out, &self.active_requests);
render_gauge(&mut out, &self.kv_cache_utilization);
render_gauge(&mut out, &self.model_memory_bytes);
out
}
}
impl Default for InferenceMetrics {
fn default() -> Self {
Self::new()
}
}
fn render_counter(out: &mut String, counter: &Counter) {
let _ = writeln!(out, "# HELP {} {}", counter.name(), counter.help());
let _ = writeln!(out, "# TYPE {} counter", counter.name());
let _ = writeln!(out, "{} {}", counter.name(), counter.get());
let _ = writeln!(out);
}
fn render_gauge(out: &mut String, gauge: &Gauge) {
let _ = writeln!(out, "# HELP {} {}", gauge.name(), gauge.help());
let _ = writeln!(out, "# TYPE {} gauge", gauge.name());
let value = gauge.get();
if value.fract() == 0.0 && value.is_finite() {
let _ = writeln!(out, "{} {}", gauge.name(), value as i64);
} else {
let _ = writeln!(out, "{} {value}", gauge.name());
}
let _ = writeln!(out);
}
fn render_histogram(out: &mut String, hist: &Histogram) {
let _ = writeln!(out, "# HELP {} {}", hist.name(), hist.help());
let _ = writeln!(out, "# TYPE {} histogram", hist.name());
for (i, &boundary) in hist.bucket_boundaries().iter().enumerate() {
let count = hist.bucket_count(i);
let le = format_f64_prometheus(boundary);
let _ = writeln!(out, "{}_bucket{{le=\"{le}\"}} {count}", hist.name());
}
let inf_count = hist.bucket_count(hist.bucket_boundaries().len());
let _ = writeln!(out, "{}_bucket{{le=\"+Inf\"}} {inf_count}", hist.name());
let sum = hist.sum();
let _ = writeln!(out, "{}_sum {}", hist.name(), format_f64_prometheus(sum));
let _ = writeln!(out, "{}_count {}", hist.name(), hist.count());
let _ = writeln!(out);
}
fn format_f64_prometheus(value: f64) -> String {
if value.fract() == 0.0 && value.is_finite() && value.abs() < 1e15 {
format!("{}", value as i64)
} else {
let s = format!("{value:.6}");
let s = s.trim_end_matches('0');
let s = s.trim_end_matches('.');
s.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn counter_basic() {
let c = Counter::new("test_counter", "A test counter");
assert_eq!(c.get(), 0);
c.inc();
assert_eq!(c.get(), 1);
c.inc_by(5);
assert_eq!(c.get(), 6);
c.inc_by(0);
assert_eq!(c.get(), 6);
}
#[test]
fn counter_concurrent() {
use std::sync::Arc;
let c = Arc::new(Counter::new("concurrent_counter", "concurrent test"));
let mut handles = Vec::new();
for _ in 0..10 {
let c = Arc::clone(&c);
handles.push(std::thread::spawn(move || {
for _ in 0..1000 {
c.inc();
}
}));
}
for h in handles {
h.join().expect("thread should not panic");
}
assert_eq!(c.get(), 10_000);
}
#[test]
fn gauge_set_and_get() {
let g = Gauge::new("test_gauge", "A test gauge");
assert!((g.get() - 0.0).abs() < f64::EPSILON);
g.set(42.5);
assert!((g.get() - 42.5).abs() < f64::EPSILON);
}
#[test]
fn gauge_inc_dec() {
let g = Gauge::new("test_gauge_incdec", "inc dec test");
g.inc();
assert!((g.get() - 1.0).abs() < f64::EPSILON);
g.inc();
assert!((g.get() - 2.0).abs() < f64::EPSILON);
g.dec();
assert!((g.get() - 1.0).abs() < f64::EPSILON);
g.dec();
assert!(g.get().abs() < f64::EPSILON);
}
#[test]
fn gauge_concurrent() {
use std::sync::Arc;
let g = Arc::new(Gauge::new("concurrent_gauge", "concurrent gauge"));
let mut handles = Vec::new();
for i in 0..10 {
let g = Arc::clone(&g);
handles.push(std::thread::spawn(move || {
for _ in 0..1000 {
if i < 5 {
g.inc();
} else {
g.dec();
}
}
}));
}
for h in handles {
h.join().expect("thread should not panic");
}
assert!(g.get().abs() < f64::EPSILON);
}
#[test]
fn histogram_observe() {
let h = Histogram::new("test_hist", "A test histogram", vec![1.0, 5.0, 10.0]);
h.observe(0.5);
h.observe(3.0);
h.observe(7.0);
h.observe(15.0);
assert_eq!(h.bucket_count(0), 1);
assert_eq!(h.bucket_count(1), 2);
assert_eq!(h.bucket_count(2), 3);
assert_eq!(h.bucket_count(3), 4);
assert_eq!(h.count(), 4);
let expected_sum = 0.5 + 3.0 + 7.0 + 15.0;
assert!((h.sum() - expected_sum).abs() < 1e-9);
}
#[test]
fn histogram_empty() {
let h = Histogram::new("empty_hist", "empty", vec![1.0, 5.0]);
assert_eq!(h.count(), 0);
assert!(h.sum().abs() < f64::EPSILON);
assert_eq!(h.bucket_count(0), 0);
assert_eq!(h.bucket_count(1), 0);
assert_eq!(h.bucket_count(2), 0); }
#[test]
fn histogram_time_closure() {
let h = Histogram::new("timed_hist", "timed", vec![0.001, 0.01, 0.1, 1.0]);
let result = h.time(|| {
42
});
assert_eq!(result, 42);
assert_eq!(h.count(), 1);
assert!(h.sum() < 1.0);
}
#[test]
fn histogram_boundary_values() {
let h = Histogram::new("boundary_hist", "boundary", vec![1.0, 5.0, 10.0]);
h.observe(5.0);
assert_eq!(h.bucket_count(0), 0);
assert_eq!(h.bucket_count(1), 1);
assert_eq!(h.bucket_count(2), 1);
assert_eq!(h.bucket_count(3), 1);
}
#[test]
fn default_buckets_sorted() {
let latency = default_latency_buckets();
for pair in latency.windows(2) {
assert!(pair[0] < pair[1], "latency buckets must be sorted");
}
let rate = default_rate_buckets();
for pair in rate.windows(2) {
assert!(pair[0] < pair[1], "rate buckets must be sorted");
}
}
#[test]
fn inference_metrics_default() {
let m = InferenceMetrics::default();
assert_eq!(m.tokens_generated_total.get(), 0);
assert_eq!(m.requests_total.get(), 0);
assert_eq!(m.errors_total.get(), 0);
assert!(m.active_requests.get().abs() < f64::EPSILON);
}
#[test]
fn render_prometheus_counter_format() {
let m = InferenceMetrics::new();
m.requests_total.inc_by(42);
let output = m.render_prometheus();
assert!(output.contains("# HELP oxibonsai_requests_total Total inference requests"));
assert!(output.contains("# TYPE oxibonsai_requests_total counter"));
assert!(output.contains("oxibonsai_requests_total 42"));
}
#[test]
fn render_prometheus_gauge_format() {
let m = InferenceMetrics::new();
m.active_requests.set(3.0);
let output = m.render_prometheus();
assert!(output.contains("# HELP oxibonsai_active_requests"));
assert!(output.contains("# TYPE oxibonsai_active_requests gauge"));
assert!(output.contains("oxibonsai_active_requests 3"));
}
#[test]
fn render_prometheus_histogram_format() {
let m = InferenceMetrics::new();
m.request_duration_seconds.observe(0.002);
m.request_duration_seconds.observe(0.05);
let output = m.render_prometheus();
assert!(output.contains("# HELP oxibonsai_request_duration_seconds"));
assert!(output.contains("# TYPE oxibonsai_request_duration_seconds histogram"));
assert!(output.contains("oxibonsai_request_duration_seconds_bucket{le=\"0.001\"} 0"));
assert!(output.contains("oxibonsai_request_duration_seconds_bucket{le=\"+Inf\"} 2"));
assert!(output.contains("oxibonsai_request_duration_seconds_count 2"));
}
#[test]
fn render_prometheus_full_output_parseable() {
let m = InferenceMetrics::new();
m.tokens_generated_total.inc_by(100);
m.requests_total.inc_by(5);
m.errors_total.inc();
m.prompt_tokens_total.inc_by(50);
m.active_requests.set(2.0);
m.kv_cache_utilization.set(0.75);
m.model_memory_bytes.set(1_073_741_824.0);
m.request_duration_seconds.observe(0.1);
m.prefill_duration_seconds.observe(0.01);
m.decode_token_duration_seconds.observe(0.001);
m.tokens_per_second.observe(42.0);
let output = m.render_prometheus();
let help_count = output.lines().filter(|l| l.starts_with("# HELP")).count();
let type_count = output.lines().filter(|l| l.starts_with("# TYPE")).count();
assert_eq!(help_count, type_count);
assert_eq!(help_count, 11);
}
#[test]
fn format_f64_prometheus_integers() {
assert_eq!(format_f64_prometheus(0.0), "0");
assert_eq!(format_f64_prometheus(42.0), "42");
assert_eq!(format_f64_prometheus(1000.0), "1000");
}
#[test]
fn format_f64_prometheus_fractions() {
assert_eq!(format_f64_prometheus(0.001), "0.001");
assert_eq!(format_f64_prometheus(0.5), "0.5");
assert_eq!(format_f64_prometheus(2.5), "2.5");
}
#[test]
fn histogram_deduplicates_and_sorts_buckets() {
let h = Histogram::new("dedup", "test", vec![5.0, 1.0, 5.0, 3.0, 1.0]);
assert_eq!(h.bucket_boundaries(), &[1.0, 3.0, 5.0]);
}
}