use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct MovingAverage {
values: RwLock<VecDeque<f32>>,
window_size: usize,
running_sum: RwLock<f32>,
}
impl MovingAverage {
pub fn new(window_size: usize) -> Self {
Self {
values: RwLock::new(VecDeque::with_capacity(window_size)),
window_size,
running_sum: RwLock::new(0.0),
}
}
pub fn add(&self, value: f32) {
let mut values = self.values.write();
let mut sum = self.running_sum.write();
if values.len() >= self.window_size {
if let Some(old) = values.pop_front() {
*sum -= old;
}
}
values.push_back(value);
*sum += value;
}
pub fn average(&self) -> f32 {
let values = self.values.read();
let sum = self.running_sum.read();
if values.is_empty() {
0.0
} else {
*sum / values.len() as f32
}
}
pub fn min(&self) -> f32 {
let values = self.values.read();
values.iter().cloned().fold(f32::INFINITY, f32::min)
}
pub fn max(&self) -> f32 {
let values = self.values.read();
values.iter().cloned().fold(f32::NEG_INFINITY, f32::max)
}
pub fn std_dev(&self) -> f32 {
let values = self.values.read();
if values.len() < 2 {
return 0.0;
}
let mean = self.average();
let variance: f32 =
values.iter().map(|v| (v - mean).powi(2)).sum::<f32>() / (values.len() - 1) as f32;
variance.sqrt()
}
pub fn count(&self) -> usize {
self.values.read().len()
}
pub fn clear(&self) {
let mut values = self.values.write();
let mut sum = self.running_sum.write();
values.clear();
*sum = 0.0;
}
pub fn percentile(&self, p: f32) -> f32 {
let values = self.values.read();
if values.is_empty() {
return 0.0;
}
let mut sorted: Vec<f32> = values.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((p / 100.0) * (sorted.len() - 1) as f32).round() as usize;
sorted[idx.min(sorted.len() - 1)]
}
}
impl Default for MovingAverage {
fn default() -> Self {
Self::new(100)
}
}
impl Clone for MovingAverage {
fn clone(&self) -> Self {
let values = self.values.read();
let sum = self.running_sum.read();
Self {
values: RwLock::new(values.clone()),
window_size: self.window_size,
running_sum: RwLock::new(*sum),
}
}
}
#[derive(Debug)]
pub struct LatencyHistogram {
buckets: Vec<f32>,
counts: Vec<AtomicU64>,
total: AtomicU64,
sum: RwLock<f64>,
}
impl LatencyHistogram {
pub fn new() -> Self {
Self::with_buckets(vec![
1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 2000.0, 5000.0,
])
}
pub fn with_buckets(buckets: Vec<f32>) -> Self {
let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
Self {
buckets,
counts,
total: AtomicU64::new(0),
sum: RwLock::new(0.0),
}
}
pub fn record(&self, latency_ms: f32) {
let bucket_idx = self
.buckets
.iter()
.position(|&b| latency_ms <= b)
.unwrap_or(self.buckets.len() - 1);
self.counts[bucket_idx].fetch_add(1, Ordering::Relaxed);
self.total.fetch_add(1, Ordering::Relaxed);
let mut sum = self.sum.write();
*sum += latency_ms as f64;
}
pub fn mean(&self) -> f32 {
let total = self.total.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let sum = self.sum.read();
(*sum / total as f64) as f32
}
pub fn percentile(&self, p: f32) -> f32 {
let total = self.total.load(Ordering::Relaxed);
if total == 0 {
return 0.0;
}
let target = (p / 100.0 * total as f32) as u64;
let mut cumulative = 0u64;
for (i, count) in self.counts.iter().enumerate() {
let bucket_count = count.load(Ordering::Relaxed);
cumulative += bucket_count;
if cumulative >= target {
if i == 0 {
return self.buckets[0];
}
let prev_cumulative = cumulative - bucket_count;
let fraction = (target - prev_cumulative) as f32 / bucket_count.max(1) as f32;
let prev_bucket = if i > 0 { self.buckets[i - 1] } else { 0.0 };
return prev_bucket + fraction * (self.buckets[i] - prev_bucket);
}
}
*self.buckets.last().unwrap_or(&0.0)
}
pub fn bucket_counts(&self) -> Vec<(f32, u64)> {
self.buckets
.iter()
.zip(self.counts.iter())
.map(|(b, c)| (*b, c.load(Ordering::Relaxed)))
.collect()
}
pub fn reset(&self) {
for count in &self.counts {
count.store(0, Ordering::Relaxed);
}
self.total.store(0, Ordering::Relaxed);
*self.sum.write() = 0.0;
}
pub fn count(&self) -> u64 {
self.total.load(Ordering::Relaxed)
}
}
impl Default for LatencyHistogram {
fn default() -> Self {
Self::new()
}
}
impl Clone for LatencyHistogram {
fn clone(&self) -> Self {
let counts: Vec<AtomicU64> = self
.counts
.iter()
.map(|c| AtomicU64::new(c.load(Ordering::Relaxed)))
.collect();
let sum = *self.sum.read();
Self {
buckets: self.buckets.clone(),
counts,
total: AtomicU64::new(self.total.load(Ordering::Relaxed)),
sum: RwLock::new(sum),
}
}
}
#[derive(Debug)]
pub struct InferenceMetrics {
pub ttft_ms: MovingAverage,
pub tps: MovingAverage,
kv_cache_hits: AtomicU64,
kv_cache_misses: AtomicU64,
memory_usage_bytes: AtomicUsize,
peak_memory_bytes: AtomicUsize,
active_requests: AtomicUsize,
total_requests: AtomicU64,
total_tokens: AtomicU64,
pub latency_histogram: LatencyHistogram,
queue_depth: AtomicUsize,
start_time: Instant,
last_update: RwLock<Instant>,
pub inter_token_latency_ms: MovingAverage,
pub batch_sizes: MovingAverage,
}
impl InferenceMetrics {
pub fn new() -> Self {
Self {
ttft_ms: MovingAverage::new(100),
tps: MovingAverage::new(100),
kv_cache_hits: AtomicU64::new(0),
kv_cache_misses: AtomicU64::new(0),
memory_usage_bytes: AtomicUsize::new(0),
peak_memory_bytes: AtomicUsize::new(0),
active_requests: AtomicUsize::new(0),
total_requests: AtomicU64::new(0),
total_tokens: AtomicU64::new(0),
latency_histogram: LatencyHistogram::new(),
queue_depth: AtomicUsize::new(0),
start_time: Instant::now(),
last_update: RwLock::new(Instant::now()),
inter_token_latency_ms: MovingAverage::new(100),
batch_sizes: MovingAverage::new(50),
}
}
pub fn record_ttft(&self, ttft_ms: f32) {
self.ttft_ms.add(ttft_ms);
self.latency_histogram.record(ttft_ms);
*self.last_update.write() = Instant::now();
}
pub fn record_tps(&self, tokens: usize, duration: Duration) {
if duration.as_secs_f32() > 0.0 {
let tps = tokens as f32 / duration.as_secs_f32();
self.tps.add(tps);
}
self.total_tokens
.fetch_add(tokens as u64, Ordering::Relaxed);
*self.last_update.write() = Instant::now();
}
pub fn record_inter_token_latency(&self, latency_ms: f32) {
self.inter_token_latency_ms.add(latency_ms);
}
pub fn record_batch_size(&self, size: usize) {
self.batch_sizes.add(size as f32);
}
pub fn record_kv_cache_hit(&self) {
self.kv_cache_hits.fetch_add(1, Ordering::Relaxed);
}
pub fn record_kv_cache_miss(&self) {
self.kv_cache_misses.fetch_add(1, Ordering::Relaxed);
}
pub fn kv_cache_hit_rate(&self) -> f32 {
let hits = self.kv_cache_hits.load(Ordering::Relaxed);
let misses = self.kv_cache_misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 {
1.0 } else {
hits as f32 / total as f32
}
}
pub fn update_memory_usage(&self, bytes: usize) {
self.memory_usage_bytes.store(bytes, Ordering::Relaxed);
let current_peak = self.peak_memory_bytes.load(Ordering::Relaxed);
if bytes > current_peak {
self.peak_memory_bytes.store(bytes, Ordering::Relaxed);
}
}
pub fn memory_usage_bytes(&self) -> usize {
self.memory_usage_bytes.load(Ordering::Relaxed)
}
pub fn peak_memory_bytes(&self) -> usize {
self.peak_memory_bytes.load(Ordering::Relaxed)
}
pub fn request_started(&self) {
self.active_requests.fetch_add(1, Ordering::Relaxed);
self.total_requests.fetch_add(1, Ordering::Relaxed);
}
pub fn request_completed(&self) {
self.active_requests.fetch_sub(1, Ordering::Relaxed);
}
pub fn active_requests(&self) -> usize {
self.active_requests.load(Ordering::Relaxed)
}
pub fn total_requests(&self) -> u64 {
self.total_requests.load(Ordering::Relaxed)
}
pub fn total_tokens(&self) -> u64 {
self.total_tokens.load(Ordering::Relaxed)
}
pub fn set_queue_depth(&self, depth: usize) {
self.queue_depth.store(depth, Ordering::Relaxed);
}
pub fn queue_depth(&self) -> usize {
self.queue_depth.load(Ordering::Relaxed)
}
pub fn uptime(&self) -> Duration {
self.start_time.elapsed()
}
pub fn time_since_update(&self) -> Duration {
self.last_update.read().elapsed()
}
pub fn snapshot(&self) -> MetricsSnapshot {
MetricsSnapshot {
ttft_avg_ms: self.ttft_ms.average(),
ttft_p50_ms: self.ttft_ms.percentile(50.0),
ttft_p95_ms: self.ttft_ms.percentile(95.0),
ttft_p99_ms: self.ttft_ms.percentile(99.0),
tps_avg: self.tps.average(),
tps_min: self.tps.min(),
tps_max: self.tps.max(),
kv_cache_hit_rate: self.kv_cache_hit_rate(),
memory_usage_bytes: self.memory_usage_bytes(),
peak_memory_bytes: self.peak_memory_bytes(),
active_requests: self.active_requests(),
total_requests: self.total_requests(),
total_tokens: self.total_tokens(),
queue_depth: self.queue_depth(),
uptime_secs: self.uptime().as_secs_f32(),
inter_token_latency_avg_ms: self.inter_token_latency_ms.average(),
avg_batch_size: self.batch_sizes.average(),
}
}
pub fn reset(&self) {
self.ttft_ms.clear();
self.tps.clear();
self.kv_cache_hits.store(0, Ordering::Relaxed);
self.kv_cache_misses.store(0, Ordering::Relaxed);
self.peak_memory_bytes.store(
self.memory_usage_bytes.load(Ordering::Relaxed),
Ordering::Relaxed,
);
self.total_requests.store(0, Ordering::Relaxed);
self.total_tokens.store(0, Ordering::Relaxed);
self.latency_histogram.reset();
self.inter_token_latency_ms.clear();
self.batch_sizes.clear();
}
}
impl Default for InferenceMetrics {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsSnapshot {
pub ttft_avg_ms: f32,
pub ttft_p50_ms: f32,
pub ttft_p95_ms: f32,
pub ttft_p99_ms: f32,
pub tps_avg: f32,
pub tps_min: f32,
pub tps_max: f32,
pub kv_cache_hit_rate: f32,
pub memory_usage_bytes: usize,
pub peak_memory_bytes: usize,
pub active_requests: usize,
pub total_requests: u64,
pub total_tokens: u64,
pub queue_depth: usize,
pub uptime_secs: f32,
pub inter_token_latency_avg_ms: f32,
pub avg_batch_size: f32,
}
impl MetricsSnapshot {
pub fn is_healthy(&self, max_ttft_ms: f32, min_tps: f32) -> bool {
self.ttft_avg_ms <= max_ttft_ms && self.tps_avg >= min_tps
}
pub fn throughput_efficiency(&self, target_tps: f32) -> f32 {
if target_tps <= 0.0 {
return 1.0;
}
(self.tps_avg / target_tps).min(1.0)
}
pub fn latency_score(&self, target_ttft_ms: f32) -> f32 {
if self.ttft_avg_ms <= 0.0 {
return 1.0;
}
(target_ttft_ms / self.ttft_avg_ms).min(1.0)
}
}
pub struct MetricsCollector {
metrics: InferenceMetrics,
history: RwLock<VecDeque<(Instant, MetricsSnapshot)>>,
max_history: usize,
snapshot_interval: Duration,
last_snapshot: RwLock<Instant>,
}
impl MetricsCollector {
pub fn new(max_history: usize, snapshot_interval: Duration) -> Self {
Self {
metrics: InferenceMetrics::new(),
history: RwLock::new(VecDeque::with_capacity(max_history)),
max_history,
snapshot_interval,
last_snapshot: RwLock::new(Instant::now()),
}
}
pub fn metrics(&self) -> &InferenceMetrics {
&self.metrics
}
pub fn record_ttft(&self, ttft_ms: f32) {
self.metrics.record_ttft(ttft_ms);
self.maybe_snapshot();
}
pub fn record_tps(&self, tokens: usize, duration: Duration) {
self.metrics.record_tps(tokens, duration);
self.maybe_snapshot();
}
fn maybe_snapshot(&self) {
let last = *self.last_snapshot.read();
if last.elapsed() >= self.snapshot_interval {
self.take_snapshot();
}
}
pub fn take_snapshot(&self) {
let snapshot = self.metrics.snapshot();
let now = Instant::now();
let mut history = self.history.write();
if history.len() >= self.max_history {
history.pop_front();
}
history.push_back((now, snapshot));
*self.last_snapshot.write() = now;
}
pub fn get_history(&self, count: usize) -> Vec<MetricsSnapshot> {
let history = self.history.read();
history
.iter()
.rev()
.take(count)
.map(|(_, s)| s.clone())
.collect()
}
pub fn ttft_trend(&self) -> f32 {
let history = self.history.read();
if history.len() < 2 {
return 0.0;
}
let recent: Vec<f32> = history
.iter()
.rev()
.take(10)
.map(|(_, s)| s.ttft_avg_ms)
.collect();
if recent.len() < 2 {
return 0.0;
}
let n = recent.len() as f32;
let sum_x: f32 = (0..recent.len()).map(|i| i as f32).sum();
let sum_y: f32 = recent.iter().sum();
let sum_xy: f32 = recent.iter().enumerate().map(|(i, y)| i as f32 * y).sum();
let sum_xx: f32 = (0..recent.len()).map(|i| (i * i) as f32).sum();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x);
-slope
}
pub fn tps_trend(&self) -> f32 {
let history = self.history.read();
if history.len() < 2 {
return 0.0;
}
let recent: Vec<f32> = history
.iter()
.rev()
.take(10)
.map(|(_, s)| s.tps_avg)
.collect();
if recent.len() < 2 {
return 0.0;
}
let n = recent.len() as f32;
let sum_x: f32 = (0..recent.len()).map(|i| i as f32).sum();
let sum_y: f32 = recent.iter().sum();
let sum_xy: f32 = recent.iter().enumerate().map(|(i, y)| i as f32 * y).sum();
let sum_xx: f32 = (0..recent.len()).map(|i| (i * i) as f32).sum();
(n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new(1000, Duration::from_secs(60))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_moving_average() {
let ma = MovingAverage::new(3);
ma.add(1.0);
ma.add(2.0);
ma.add(3.0);
assert!((ma.average() - 2.0).abs() < 0.01);
ma.add(4.0);
assert!((ma.average() - 3.0).abs() < 0.01);
}
#[test]
fn test_moving_average_percentile() {
let ma = MovingAverage::new(10);
for i in 1..=10 {
ma.add(i as f32);
}
let p50 = ma.percentile(50.0);
assert!(p50 >= 5.0 && p50 <= 6.0);
let p90 = ma.percentile(90.0);
assert!(p90 >= 9.0);
}
#[test]
fn test_latency_histogram() {
let hist = LatencyHistogram::new();
hist.record(5.0);
hist.record(15.0);
hist.record(50.0);
assert_eq!(hist.count(), 3);
assert!((hist.mean() - 23.33).abs() < 1.0);
}
#[test]
fn test_inference_metrics() {
let metrics = InferenceMetrics::new();
metrics.record_ttft(10.0);
metrics.record_ttft(20.0);
assert!((metrics.ttft_ms.average() - 15.0).abs() < 0.01);
metrics.record_kv_cache_hit();
metrics.record_kv_cache_hit();
metrics.record_kv_cache_miss();
assert!((metrics.kv_cache_hit_rate() - 0.667).abs() < 0.01);
}
#[test]
fn test_metrics_snapshot() {
let metrics = InferenceMetrics::new();
metrics.record_ttft(10.0);
metrics.record_tps(100, Duration::from_secs(1));
metrics.update_memory_usage(1024 * 1024);
metrics.request_started();
let snapshot = metrics.snapshot();
assert!((snapshot.ttft_avg_ms - 10.0).abs() < 0.01);
assert!((snapshot.tps_avg - 100.0).abs() < 0.01);
assert_eq!(snapshot.memory_usage_bytes, 1024 * 1024);
assert_eq!(snapshot.active_requests, 1);
}
#[test]
fn test_metrics_collector() {
let collector = MetricsCollector::new(100, Duration::from_millis(10));
for i in 1..=5 {
collector.record_ttft(i as f32 * 10.0);
}
collector.take_snapshot();
let history = collector.get_history(1);
assert_eq!(history.len(), 1);
}
#[test]
fn test_snapshot_health_check() {
let snapshot = MetricsSnapshot {
ttft_avg_ms: 50.0,
ttft_p50_ms: 45.0,
ttft_p95_ms: 80.0,
ttft_p99_ms: 100.0,
tps_avg: 150.0,
tps_min: 100.0,
tps_max: 200.0,
kv_cache_hit_rate: 0.95,
memory_usage_bytes: 1024 * 1024,
peak_memory_bytes: 2 * 1024 * 1024,
active_requests: 5,
total_requests: 1000,
total_tokens: 100000,
queue_depth: 2,
uptime_secs: 3600.0,
inter_token_latency_avg_ms: 5.0,
avg_batch_size: 8.0,
};
assert!(snapshot.is_healthy(100.0, 100.0));
assert!(!snapshot.is_healthy(30.0, 100.0)); assert!(!snapshot.is_healthy(100.0, 200.0)); }
}