Skip to main content

oxibonsai_runtime/
request_metrics.rs

1//! Per-request token-rate and latency metrics.
2//!
3//! [`RequestRateTracker`] records per-token timing for a single in-flight
4//! request and produces:
5//! - EMA-smoothed tokens per second
6//! - p50 / p95 inter-token latency (TBT — time-between-tokens)
7//! - queue-wait time (admission → first token)
8//!
9//! These per-request rollups are aggregated by [`RequestRateAggregator`]
10//! across a window of recent requests, exposing global p50/p95 inter-token
11//! latency and a smoothed average tokens-per-second.
12//!
13//! ## Design
14//!
15//! - **TBT samples** are stored in a small ring buffer (default 128 slots).
16//!   Quantiles are computed by sorting on demand: O(n log n) but n is
17//!   bounded so the cost is constant.
18//! - **EMA tokens/sec** uses an exponentially-weighted moving average with
19//!   alpha = 0.20 (20% new sample, 80% prior smoothed value).
20//! - **Aggregator** keeps the last `window` request rollups in a fixed-size
21//!   ring; the workload-level p50/p95 is computed from the union of the
22//!   most recent rollups.
23//!
24//! ## Usage
25//!
26//! ```
27//! use oxibonsai_runtime::request_metrics::RequestRateTracker;
28//!
29//! let mut t = RequestRateTracker::new();
30//! t.record_admission();              // request received at queue
31//! std::thread::sleep(std::time::Duration::from_micros(100));
32//! t.record_first_token();            // first token emitted
33//! for _ in 0..10 {
34//!     std::thread::sleep(std::time::Duration::from_micros(50));
35//!     t.record_token();              // subsequent tokens
36//! }
37//! let snap = t.snapshot();
38//! assert!(snap.tokens_emitted >= 11);
39//! assert!(snap.tokens_per_second > 0.0);
40//! ```
41
42use std::sync::Mutex;
43use std::time::Instant;
44
45const DEFAULT_WINDOW_TBT_SAMPLES: usize = 128;
46const DEFAULT_AGGREGATOR_WINDOW: usize = 256;
47const DEFAULT_TPS_ALPHA: f64 = 0.20;
48
49// ─── Per-request tracker ───────────────────────────────────────────────────
50
51/// Snapshot of a single request's rate metrics at one point in time.
52#[derive(Debug, Clone, Copy, PartialEq)]
53pub struct RequestRateSnapshot {
54    /// Number of tokens emitted so far.
55    pub tokens_emitted: u64,
56    /// EMA-smoothed tokens per second.
57    pub tokens_per_second: f64,
58    /// Median (p50) inter-token latency in seconds.
59    pub tbt_p50_seconds: f64,
60    /// 95th-percentile inter-token latency in seconds.
61    pub tbt_p95_seconds: f64,
62    /// Queue wait time (admission → first token) in seconds, or `None`
63    /// if no token has been emitted yet.
64    pub queue_wait_seconds: Option<f64>,
65    /// Total time elapsed since admission, in seconds.
66    pub elapsed_seconds: f64,
67}
68
69/// Per-request rate tracker.
70///
71/// Not thread-safe — intended to be owned by a single request handler.
72/// Callers needing concurrent access should wrap in a `Mutex`.
73#[derive(Debug, Clone)]
74pub struct RequestRateTracker {
75    admission: Option<Instant>,
76    first_token: Option<Instant>,
77    last_token: Option<Instant>,
78    tokens_emitted: u64,
79    tps_ema: f64,
80    tps_alpha: f64,
81    /// Ring buffer of recent inter-token deltas in seconds.
82    tbt_samples: Vec<f64>,
83    tbt_capacity: usize,
84    tbt_next_idx: usize,
85    tbt_filled: usize,
86}
87
88impl RequestRateTracker {
89    /// Create a tracker with default settings (window = 128 TBT samples,
90    /// alpha = 0.20).
91    pub fn new() -> Self {
92        Self::with_params(DEFAULT_WINDOW_TBT_SAMPLES, DEFAULT_TPS_ALPHA)
93    }
94
95    /// Create a tracker with custom parameters.
96    ///
97    /// `tbt_capacity` is clamped to at least 1; `alpha` is clamped to
98    /// `[0.0, 1.0]`.
99    pub fn with_params(tbt_capacity: usize, alpha: f64) -> Self {
100        let cap = tbt_capacity.max(1);
101        Self {
102            admission: None,
103            first_token: None,
104            last_token: None,
105            tokens_emitted: 0,
106            tps_ema: 0.0,
107            tps_alpha: alpha.clamp(0.0, 1.0),
108            tbt_samples: vec![0.0; cap],
109            tbt_capacity: cap,
110            tbt_next_idx: 0,
111            tbt_filled: 0,
112        }
113    }
114
115    /// Mark the request as admitted (e.g. dequeued from the request queue).
116    pub fn record_admission(&mut self) {
117        self.admission = Some(Instant::now());
118    }
119
120    /// Mark the first token as emitted. This implicitly counts the token,
121    /// so callers should not also call `record_token` for the first token.
122    pub fn record_first_token(&mut self) {
123        let now = Instant::now();
124        self.first_token = Some(now);
125        self.last_token = Some(now);
126        self.tokens_emitted = self.tokens_emitted.saturating_add(1);
127    }
128
129    /// Mark a subsequent (non-first) token as emitted.
130    ///
131    /// If `record_first_token` was never called, this also doubles as
132    /// the first-token marker.
133    pub fn record_token(&mut self) {
134        let now = Instant::now();
135        if self.first_token.is_none() {
136            self.first_token = Some(now);
137        }
138        if let Some(prev) = self.last_token {
139            let delta = (now - prev).as_secs_f64();
140            self.push_tbt_sample(delta);
141            // Update tokens/sec EMA from instantaneous rate.
142            if delta > 0.0 {
143                let inst = 1.0 / delta;
144                if self.tokens_emitted < 2 {
145                    // Seed the EMA with the first observation.
146                    self.tps_ema = inst;
147                } else {
148                    self.tps_ema = self.tps_alpha * inst + (1.0 - self.tps_alpha) * self.tps_ema;
149                }
150            }
151        }
152        self.last_token = Some(now);
153        self.tokens_emitted = self.tokens_emitted.saturating_add(1);
154    }
155
156    /// Take a snapshot of the current state without disturbing the tracker.
157    pub fn snapshot(&self) -> RequestRateSnapshot {
158        let now = Instant::now();
159        let elapsed = self
160            .admission
161            .map(|t| (now - t).as_secs_f64())
162            .unwrap_or(0.0);
163        let queue_wait = self.queue_wait_seconds();
164        let (p50, p95) = self.tbt_quantiles();
165
166        RequestRateSnapshot {
167            tokens_emitted: self.tokens_emitted,
168            tokens_per_second: self.tps_ema,
169            tbt_p50_seconds: p50,
170            tbt_p95_seconds: p95,
171            queue_wait_seconds: queue_wait,
172            elapsed_seconds: elapsed,
173        }
174    }
175
176    /// Queue wait time (admission → first token), if both are recorded.
177    pub fn queue_wait_seconds(&self) -> Option<f64> {
178        match (self.admission, self.first_token) {
179            (Some(a), Some(f)) => Some((f - a).as_secs_f64()),
180            _ => None,
181        }
182    }
183
184    /// Number of tokens emitted so far.
185    pub fn tokens_emitted(&self) -> u64 {
186        self.tokens_emitted
187    }
188
189    /// EMA-smoothed tokens per second.
190    pub fn tokens_per_second(&self) -> f64 {
191        self.tps_ema
192    }
193
194    /// Compute (p50, p95) inter-token latency in seconds from the current
195    /// ring-buffer contents.
196    fn tbt_quantiles(&self) -> (f64, f64) {
197        if self.tbt_filled == 0 {
198            return (0.0, 0.0);
199        }
200        let n = self.tbt_filled.min(self.tbt_capacity);
201        let mut buf: Vec<f64> = self.tbt_samples[..n].to_vec();
202        buf.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
203        let p50 = quantile_sorted(&buf, 0.50);
204        let p95 = quantile_sorted(&buf, 0.95);
205        (p50, p95)
206    }
207
208    fn push_tbt_sample(&mut self, delta: f64) {
209        self.tbt_samples[self.tbt_next_idx] = delta;
210        self.tbt_next_idx = (self.tbt_next_idx + 1) % self.tbt_capacity;
211        if self.tbt_filled < self.tbt_capacity {
212            self.tbt_filled += 1;
213        }
214    }
215}
216
217impl Default for RequestRateTracker {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223// ─── Aggregator ────────────────────────────────────────────────────────────
224
225/// Workload-level rollup of recent request rate snapshots.
226#[derive(Debug, Clone, Copy, PartialEq)]
227pub struct AggregateRateSnapshot {
228    /// Number of completed requests in the window.
229    pub completed_requests: u64,
230    /// Mean tokens-per-second across the window.
231    pub mean_tokens_per_second: f64,
232    /// p50 inter-token latency across the window.
233    pub tbt_p50_seconds: f64,
234    /// p95 inter-token latency across the window.
235    pub tbt_p95_seconds: f64,
236    /// Mean queue wait time across the window.
237    pub mean_queue_wait_seconds: f64,
238}
239
240/// Aggregator over the most recent `window` per-request snapshots.
241///
242/// Thread-safe via an internal mutex.
243#[derive(Debug)]
244pub struct RequestRateAggregator {
245    inner: Mutex<RingState>,
246}
247
248#[derive(Debug)]
249struct RingState {
250    samples: Vec<RequestRateSnapshot>,
251    capacity: usize,
252    next_idx: usize,
253    filled: usize,
254    completed: u64,
255}
256
257impl RequestRateAggregator {
258    /// Create an aggregator with the default window of 256 requests.
259    pub fn new() -> Self {
260        Self::with_window(DEFAULT_AGGREGATOR_WINDOW)
261    }
262
263    /// Create an aggregator with a custom window size (clamped to >= 1).
264    pub fn with_window(window: usize) -> Self {
265        let cap = window.max(1);
266        Self {
267            inner: Mutex::new(RingState {
268                samples: Vec::with_capacity(cap),
269                capacity: cap,
270                next_idx: 0,
271                filled: 0,
272                completed: 0,
273            }),
274        }
275    }
276
277    /// Record a completed request's snapshot.
278    pub fn record(&self, snap: RequestRateSnapshot) {
279        let mut g = match self.inner.lock() {
280            Ok(g) => g,
281            Err(poisoned) => poisoned.into_inner(),
282        };
283        if g.samples.len() < g.capacity {
284            g.samples.push(snap);
285        } else {
286            let idx = g.next_idx;
287            g.samples[idx] = snap;
288        }
289        g.next_idx = (g.next_idx + 1) % g.capacity;
290        if g.filled < g.capacity {
291            g.filled += 1;
292        }
293        g.completed = g.completed.saturating_add(1);
294    }
295
296    /// Compute the workload-level snapshot from the current window.
297    pub fn snapshot(&self) -> AggregateRateSnapshot {
298        let g = match self.inner.lock() {
299            Ok(g) => g,
300            Err(poisoned) => poisoned.into_inner(),
301        };
302        let n = g.filled;
303        if n == 0 {
304            return AggregateRateSnapshot {
305                completed_requests: 0,
306                mean_tokens_per_second: 0.0,
307                tbt_p50_seconds: 0.0,
308                tbt_p95_seconds: 0.0,
309                mean_queue_wait_seconds: 0.0,
310            };
311        }
312
313        let mut tps_sum = 0.0;
314        let mut wait_sum = 0.0;
315        let mut wait_n = 0;
316        let mut tbt_p50: Vec<f64> = Vec::with_capacity(n);
317        let mut tbt_p95: Vec<f64> = Vec::with_capacity(n);
318
319        for s in &g.samples[..n] {
320            tps_sum += s.tokens_per_second;
321            if let Some(w) = s.queue_wait_seconds {
322                wait_sum += w;
323                wait_n += 1;
324            }
325            tbt_p50.push(s.tbt_p50_seconds);
326            tbt_p95.push(s.tbt_p95_seconds);
327        }
328
329        tbt_p50.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
330        tbt_p95.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
331
332        let mean_tps = tps_sum / n as f64;
333        let mean_wait = if wait_n == 0 {
334            0.0
335        } else {
336            wait_sum / wait_n as f64
337        };
338
339        // Window-level p50/p95: take the median and 95th-percentile of
340        // per-request medians/95s — a reasonable proxy for global
341        // latency without needing all per-token samples.
342        let p50_window = quantile_sorted(&tbt_p50, 0.50);
343        let p95_window = quantile_sorted(&tbt_p95, 0.95);
344
345        AggregateRateSnapshot {
346            completed_requests: g.completed,
347            mean_tokens_per_second: mean_tps,
348            tbt_p50_seconds: p50_window,
349            tbt_p95_seconds: p95_window,
350            mean_queue_wait_seconds: mean_wait,
351        }
352    }
353
354    /// Number of requests recorded since construction (not capped by window).
355    pub fn completed(&self) -> u64 {
356        match self.inner.lock() {
357            Ok(g) => g.completed,
358            Err(poisoned) => poisoned.into_inner().completed,
359        }
360    }
361
362    /// Drop all recorded samples. Counters are not reset.
363    pub fn clear(&self) {
364        let mut g = match self.inner.lock() {
365            Ok(g) => g,
366            Err(poisoned) => poisoned.into_inner(),
367        };
368        g.samples.clear();
369        g.next_idx = 0;
370        g.filled = 0;
371    }
372}
373
374impl Default for RequestRateAggregator {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380// ─── Quantile helper ───────────────────────────────────────────────────────
381
382/// Compute the `q` quantile of a *sorted* slice in `[0.0, 1.0]`.
383///
384/// Returns 0.0 for an empty slice. Uses linear interpolation between
385/// neighbouring samples.
386fn quantile_sorted(sorted: &[f64], q: f64) -> f64 {
387    if sorted.is_empty() {
388        return 0.0;
389    }
390    let q = q.clamp(0.0, 1.0);
391    if sorted.len() == 1 {
392        return sorted[0];
393    }
394    let pos = q * (sorted.len() - 1) as f64;
395    let lo = pos.floor() as usize;
396    let hi = pos.ceil() as usize;
397    if lo == hi {
398        sorted[lo]
399    } else {
400        let frac = pos - lo as f64;
401        sorted[lo] * (1.0 - frac) + sorted[hi] * frac
402    }
403}
404
405// ─── Tests ─────────────────────────────────────────────────────────────────
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use std::thread::sleep;
411    use std::time::Duration;
412
413    fn ms(n: u64) -> Duration {
414        Duration::from_millis(n)
415    }
416
417    #[test]
418    fn fresh_tracker_has_zero_tokens() {
419        let t = RequestRateTracker::new();
420        let s = t.snapshot();
421        assert_eq!(s.tokens_emitted, 0);
422        assert!(s.tokens_per_second.abs() < f64::EPSILON);
423        assert!(s.queue_wait_seconds.is_none());
424    }
425
426    #[test]
427    fn first_token_records_count() {
428        let mut t = RequestRateTracker::new();
429        t.record_admission();
430        t.record_first_token();
431        assert_eq!(t.tokens_emitted(), 1);
432        assert!(t.queue_wait_seconds().is_some());
433    }
434
435    #[test]
436    fn queue_wait_measured() {
437        let mut t = RequestRateTracker::new();
438        t.record_admission();
439        sleep(ms(2));
440        t.record_first_token();
441        let wait = t.queue_wait_seconds().expect("wait recorded");
442        assert!(wait >= 0.001, "queue wait should be >= 1ms, got {wait}");
443    }
444
445    #[test]
446    fn token_rate_increases_with_decoding() {
447        let mut t = RequestRateTracker::new();
448        t.record_admission();
449        t.record_first_token();
450        for _ in 0..5 {
451            sleep(ms(2));
452            t.record_token();
453        }
454        let s = t.snapshot();
455        assert_eq!(s.tokens_emitted, 6);
456        assert!(s.tokens_per_second > 0.0);
457        assert!(s.tbt_p50_seconds > 0.0);
458        assert!(s.tbt_p95_seconds >= s.tbt_p50_seconds);
459    }
460
461    #[test]
462    fn tbt_quantiles_match_expectations() {
463        let mut t = RequestRateTracker::with_params(64, 0.20);
464        t.record_admission();
465        t.record_first_token();
466        // 20 fast (~1ms) tokens followed by a tail of 5 slow (~10ms) tokens.
467        // With only 25 total samples, p95 (~position 22.8) lands inside the
468        // slow tail — exercising the quantile interpolation path.
469        for _ in 0..20 {
470            sleep(ms(1));
471            t.record_token();
472        }
473        for _ in 0..5 {
474            sleep(ms(10));
475            t.record_token();
476        }
477        let s = t.snapshot();
478        assert!(s.tbt_p95_seconds >= s.tbt_p50_seconds);
479        // The slow tail dominates p95 (timing is OS-scheduler dependent so we
480        // use a relaxed threshold).
481        assert!(
482            s.tbt_p95_seconds >= 0.003,
483            "p95 should reflect slow tail; got {}",
484            s.tbt_p95_seconds
485        );
486    }
487
488    #[test]
489    fn tbt_ring_buffer_overwrites_oldest() {
490        let mut t = RequestRateTracker::with_params(4, 0.20);
491        t.record_admission();
492        t.record_first_token();
493        for _ in 0..10 {
494            sleep(ms(1));
495            t.record_token();
496        }
497        let s = t.snapshot();
498        // Capacity 4; most recent 4 deltas are kept.
499        assert!(s.tbt_p50_seconds > 0.0);
500    }
501
502    #[test]
503    fn quantile_sorted_basic() {
504        assert!((quantile_sorted(&[], 0.5) - 0.0).abs() < f64::EPSILON);
505        assert!((quantile_sorted(&[5.0], 0.5) - 5.0).abs() < f64::EPSILON);
506        let v = vec![1.0, 2.0, 3.0, 4.0, 5.0];
507        assert!((quantile_sorted(&v, 0.0) - 1.0).abs() < f64::EPSILON);
508        assert!((quantile_sorted(&v, 1.0) - 5.0).abs() < f64::EPSILON);
509        assert!((quantile_sorted(&v, 0.5) - 3.0).abs() < f64::EPSILON);
510    }
511
512    #[test]
513    fn aggregator_records_and_aggregates() {
514        let agg = RequestRateAggregator::with_window(8);
515        for i in 0..5 {
516            let snap = RequestRateSnapshot {
517                tokens_emitted: 100,
518                tokens_per_second: 50.0 + i as f64,
519                tbt_p50_seconds: 0.020,
520                tbt_p95_seconds: 0.050,
521                queue_wait_seconds: Some(0.010),
522                elapsed_seconds: 2.0,
523            };
524            agg.record(snap);
525        }
526        let agg_snap = agg.snapshot();
527        assert_eq!(agg_snap.completed_requests, 5);
528        assert!(agg_snap.mean_tokens_per_second >= 50.0);
529        assert!(agg_snap.tbt_p50_seconds > 0.0);
530        assert!(agg_snap.tbt_p95_seconds >= agg_snap.tbt_p50_seconds);
531        assert!(agg_snap.mean_queue_wait_seconds > 0.0);
532    }
533
534    #[test]
535    fn aggregator_handles_empty() {
536        let agg = RequestRateAggregator::new();
537        let s = agg.snapshot();
538        assert_eq!(s.completed_requests, 0);
539        assert!(s.mean_tokens_per_second.abs() < f64::EPSILON);
540        assert!(s.tbt_p50_seconds.abs() < f64::EPSILON);
541        assert!(s.tbt_p95_seconds.abs() < f64::EPSILON);
542    }
543
544    #[test]
545    fn aggregator_window_overwrites() {
546        let agg = RequestRateAggregator::with_window(4);
547        for i in 0..10 {
548            let snap = RequestRateSnapshot {
549                tokens_emitted: 1,
550                tokens_per_second: i as f64,
551                tbt_p50_seconds: 0.01,
552                tbt_p95_seconds: 0.02,
553                queue_wait_seconds: None,
554                elapsed_seconds: 0.0,
555            };
556            agg.record(snap);
557        }
558        let s = agg.snapshot();
559        assert_eq!(s.completed_requests, 10);
560        // Only last 4 (6,7,8,9) contribute to mean
561        assert!((s.mean_tokens_per_second - 7.5).abs() < 1e-6);
562    }
563
564    #[test]
565    fn aggregator_clear() {
566        let agg = RequestRateAggregator::new();
567        agg.record(RequestRateSnapshot {
568            tokens_emitted: 1,
569            tokens_per_second: 100.0,
570            tbt_p50_seconds: 0.01,
571            tbt_p95_seconds: 0.02,
572            queue_wait_seconds: None,
573            elapsed_seconds: 0.0,
574        });
575        assert_eq!(agg.completed(), 1);
576        agg.clear();
577        let s = agg.snapshot();
578        assert_eq!(s.mean_tokens_per_second, 0.0);
579        // `completed` is a lifetime counter, not affected by clear.
580        assert_eq!(agg.completed(), 1);
581    }
582
583    #[test]
584    fn record_token_without_first_token_works() {
585        let mut t = RequestRateTracker::new();
586        t.record_admission();
587        // Skip explicit first_token; record_token should adopt the role.
588        t.record_token();
589        sleep(ms(2));
590        t.record_token();
591        assert_eq!(t.tokens_emitted(), 2);
592        assert!(t.queue_wait_seconds().is_some());
593    }
594
595    #[test]
596    fn aggregator_is_thread_safe() {
597        use std::sync::Arc;
598        use std::thread;
599
600        let agg = Arc::new(RequestRateAggregator::with_window(64));
601        let mut handles = Vec::new();
602        for tid in 0..4 {
603            let agg = Arc::clone(&agg);
604            handles.push(thread::spawn(move || {
605                for i in 0..50 {
606                    agg.record(RequestRateSnapshot {
607                        tokens_emitted: 1,
608                        tokens_per_second: (tid * 100 + i) as f64,
609                        tbt_p50_seconds: 0.01,
610                        tbt_p95_seconds: 0.02,
611                        queue_wait_seconds: None,
612                        elapsed_seconds: 0.0,
613                    });
614                }
615            }));
616        }
617        for h in handles {
618            h.join().expect("worker panicked");
619        }
620        assert_eq!(agg.completed(), 4 * 50);
621    }
622}