1use std::sync::Mutex;
43use std::time::Instant;
44
45const DEFAULT_WINDOW_TBT_SAMPLES: usize = 128;
46const DEFAULT_AGGREGATOR_WINDOW: usize = 256;
47const DEFAULT_TPS_ALPHA: f64 = 0.20;
48
49#[derive(Debug, Clone, Copy, PartialEq)]
53pub struct RequestRateSnapshot {
54 pub tokens_emitted: u64,
56 pub tokens_per_second: f64,
58 pub tbt_p50_seconds: f64,
60 pub tbt_p95_seconds: f64,
62 pub queue_wait_seconds: Option<f64>,
65 pub elapsed_seconds: f64,
67}
68
69#[derive(Debug, Clone)]
74pub struct RequestRateTracker {
75 admission: Option<Instant>,
76 first_token: Option<Instant>,
77 last_token: Option<Instant>,
78 tokens_emitted: u64,
79 tps_ema: f64,
80 tps_alpha: f64,
81 tbt_samples: Vec<f64>,
83 tbt_capacity: usize,
84 tbt_next_idx: usize,
85 tbt_filled: usize,
86}
87
88impl RequestRateTracker {
89 pub fn new() -> Self {
92 Self::with_params(DEFAULT_WINDOW_TBT_SAMPLES, DEFAULT_TPS_ALPHA)
93 }
94
95 pub fn with_params(tbt_capacity: usize, alpha: f64) -> Self {
100 let cap = tbt_capacity.max(1);
101 Self {
102 admission: None,
103 first_token: None,
104 last_token: None,
105 tokens_emitted: 0,
106 tps_ema: 0.0,
107 tps_alpha: alpha.clamp(0.0, 1.0),
108 tbt_samples: vec![0.0; cap],
109 tbt_capacity: cap,
110 tbt_next_idx: 0,
111 tbt_filled: 0,
112 }
113 }
114
115 pub fn record_admission(&mut self) {
117 self.admission = Some(Instant::now());
118 }
119
120 pub fn record_first_token(&mut self) {
123 let now = Instant::now();
124 self.first_token = Some(now);
125 self.last_token = Some(now);
126 self.tokens_emitted = self.tokens_emitted.saturating_add(1);
127 }
128
129 pub fn record_token(&mut self) {
134 let now = Instant::now();
135 if self.first_token.is_none() {
136 self.first_token = Some(now);
137 }
138 if let Some(prev) = self.last_token {
139 let delta = (now - prev).as_secs_f64();
140 self.push_tbt_sample(delta);
141 if delta > 0.0 {
143 let inst = 1.0 / delta;
144 if self.tokens_emitted < 2 {
145 self.tps_ema = inst;
147 } else {
148 self.tps_ema = self.tps_alpha * inst + (1.0 - self.tps_alpha) * self.tps_ema;
149 }
150 }
151 }
152 self.last_token = Some(now);
153 self.tokens_emitted = self.tokens_emitted.saturating_add(1);
154 }
155
156 pub fn snapshot(&self) -> RequestRateSnapshot {
158 let now = Instant::now();
159 let elapsed = self
160 .admission
161 .map(|t| (now - t).as_secs_f64())
162 .unwrap_or(0.0);
163 let queue_wait = self.queue_wait_seconds();
164 let (p50, p95) = self.tbt_quantiles();
165
166 RequestRateSnapshot {
167 tokens_emitted: self.tokens_emitted,
168 tokens_per_second: self.tps_ema,
169 tbt_p50_seconds: p50,
170 tbt_p95_seconds: p95,
171 queue_wait_seconds: queue_wait,
172 elapsed_seconds: elapsed,
173 }
174 }
175
176 pub fn queue_wait_seconds(&self) -> Option<f64> {
178 match (self.admission, self.first_token) {
179 (Some(a), Some(f)) => Some((f - a).as_secs_f64()),
180 _ => None,
181 }
182 }
183
184 pub fn tokens_emitted(&self) -> u64 {
186 self.tokens_emitted
187 }
188
189 pub fn tokens_per_second(&self) -> f64 {
191 self.tps_ema
192 }
193
194 fn tbt_quantiles(&self) -> (f64, f64) {
197 if self.tbt_filled == 0 {
198 return (0.0, 0.0);
199 }
200 let n = self.tbt_filled.min(self.tbt_capacity);
201 let mut buf: Vec<f64> = self.tbt_samples[..n].to_vec();
202 buf.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
203 let p50 = quantile_sorted(&buf, 0.50);
204 let p95 = quantile_sorted(&buf, 0.95);
205 (p50, p95)
206 }
207
208 fn push_tbt_sample(&mut self, delta: f64) {
209 self.tbt_samples[self.tbt_next_idx] = delta;
210 self.tbt_next_idx = (self.tbt_next_idx + 1) % self.tbt_capacity;
211 if self.tbt_filled < self.tbt_capacity {
212 self.tbt_filled += 1;
213 }
214 }
215}
216
217impl Default for RequestRateTracker {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[derive(Debug, Clone, Copy, PartialEq)]
227pub struct AggregateRateSnapshot {
228 pub completed_requests: u64,
230 pub mean_tokens_per_second: f64,
232 pub tbt_p50_seconds: f64,
234 pub tbt_p95_seconds: f64,
236 pub mean_queue_wait_seconds: f64,
238}
239
240#[derive(Debug)]
244pub struct RequestRateAggregator {
245 inner: Mutex<RingState>,
246}
247
248#[derive(Debug)]
249struct RingState {
250 samples: Vec<RequestRateSnapshot>,
251 capacity: usize,
252 next_idx: usize,
253 filled: usize,
254 completed: u64,
255}
256
257impl RequestRateAggregator {
258 pub fn new() -> Self {
260 Self::with_window(DEFAULT_AGGREGATOR_WINDOW)
261 }
262
263 pub fn with_window(window: usize) -> Self {
265 let cap = window.max(1);
266 Self {
267 inner: Mutex::new(RingState {
268 samples: Vec::with_capacity(cap),
269 capacity: cap,
270 next_idx: 0,
271 filled: 0,
272 completed: 0,
273 }),
274 }
275 }
276
277 pub fn record(&self, snap: RequestRateSnapshot) {
279 let mut g = match self.inner.lock() {
280 Ok(g) => g,
281 Err(poisoned) => poisoned.into_inner(),
282 };
283 if g.samples.len() < g.capacity {
284 g.samples.push(snap);
285 } else {
286 let idx = g.next_idx;
287 g.samples[idx] = snap;
288 }
289 g.next_idx = (g.next_idx + 1) % g.capacity;
290 if g.filled < g.capacity {
291 g.filled += 1;
292 }
293 g.completed = g.completed.saturating_add(1);
294 }
295
296 pub fn snapshot(&self) -> AggregateRateSnapshot {
298 let g = match self.inner.lock() {
299 Ok(g) => g,
300 Err(poisoned) => poisoned.into_inner(),
301 };
302 let n = g.filled;
303 if n == 0 {
304 return AggregateRateSnapshot {
305 completed_requests: 0,
306 mean_tokens_per_second: 0.0,
307 tbt_p50_seconds: 0.0,
308 tbt_p95_seconds: 0.0,
309 mean_queue_wait_seconds: 0.0,
310 };
311 }
312
313 let mut tps_sum = 0.0;
314 let mut wait_sum = 0.0;
315 let mut wait_n = 0;
316 let mut tbt_p50: Vec<f64> = Vec::with_capacity(n);
317 let mut tbt_p95: Vec<f64> = Vec::with_capacity(n);
318
319 for s in &g.samples[..n] {
320 tps_sum += s.tokens_per_second;
321 if let Some(w) = s.queue_wait_seconds {
322 wait_sum += w;
323 wait_n += 1;
324 }
325 tbt_p50.push(s.tbt_p50_seconds);
326 tbt_p95.push(s.tbt_p95_seconds);
327 }
328
329 tbt_p50.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
330 tbt_p95.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
331
332 let mean_tps = tps_sum / n as f64;
333 let mean_wait = if wait_n == 0 {
334 0.0
335 } else {
336 wait_sum / wait_n as f64
337 };
338
339 let p50_window = quantile_sorted(&tbt_p50, 0.50);
343 let p95_window = quantile_sorted(&tbt_p95, 0.95);
344
345 AggregateRateSnapshot {
346 completed_requests: g.completed,
347 mean_tokens_per_second: mean_tps,
348 tbt_p50_seconds: p50_window,
349 tbt_p95_seconds: p95_window,
350 mean_queue_wait_seconds: mean_wait,
351 }
352 }
353
354 pub fn completed(&self) -> u64 {
356 match self.inner.lock() {
357 Ok(g) => g.completed,
358 Err(poisoned) => poisoned.into_inner().completed,
359 }
360 }
361
362 pub fn clear(&self) {
364 let mut g = match self.inner.lock() {
365 Ok(g) => g,
366 Err(poisoned) => poisoned.into_inner(),
367 };
368 g.samples.clear();
369 g.next_idx = 0;
370 g.filled = 0;
371 }
372}
373
374impl Default for RequestRateAggregator {
375 fn default() -> Self {
376 Self::new()
377 }
378}
379
380fn quantile_sorted(sorted: &[f64], q: f64) -> f64 {
387 if sorted.is_empty() {
388 return 0.0;
389 }
390 let q = q.clamp(0.0, 1.0);
391 if sorted.len() == 1 {
392 return sorted[0];
393 }
394 let pos = q * (sorted.len() - 1) as f64;
395 let lo = pos.floor() as usize;
396 let hi = pos.ceil() as usize;
397 if lo == hi {
398 sorted[lo]
399 } else {
400 let frac = pos - lo as f64;
401 sorted[lo] * (1.0 - frac) + sorted[hi] * frac
402 }
403}
404
405#[cfg(test)]
408mod tests {
409 use super::*;
410 use std::thread::sleep;
411 use std::time::Duration;
412
413 fn ms(n: u64) -> Duration {
414 Duration::from_millis(n)
415 }
416
417 #[test]
418 fn fresh_tracker_has_zero_tokens() {
419 let t = RequestRateTracker::new();
420 let s = t.snapshot();
421 assert_eq!(s.tokens_emitted, 0);
422 assert!(s.tokens_per_second.abs() < f64::EPSILON);
423 assert!(s.queue_wait_seconds.is_none());
424 }
425
426 #[test]
427 fn first_token_records_count() {
428 let mut t = RequestRateTracker::new();
429 t.record_admission();
430 t.record_first_token();
431 assert_eq!(t.tokens_emitted(), 1);
432 assert!(t.queue_wait_seconds().is_some());
433 }
434
435 #[test]
436 fn queue_wait_measured() {
437 let mut t = RequestRateTracker::new();
438 t.record_admission();
439 sleep(ms(2));
440 t.record_first_token();
441 let wait = t.queue_wait_seconds().expect("wait recorded");
442 assert!(wait >= 0.001, "queue wait should be >= 1ms, got {wait}");
443 }
444
445 #[test]
446 fn token_rate_increases_with_decoding() {
447 let mut t = RequestRateTracker::new();
448 t.record_admission();
449 t.record_first_token();
450 for _ in 0..5 {
451 sleep(ms(2));
452 t.record_token();
453 }
454 let s = t.snapshot();
455 assert_eq!(s.tokens_emitted, 6);
456 assert!(s.tokens_per_second > 0.0);
457 assert!(s.tbt_p50_seconds > 0.0);
458 assert!(s.tbt_p95_seconds >= s.tbt_p50_seconds);
459 }
460
461 #[test]
462 fn tbt_quantiles_match_expectations() {
463 let mut t = RequestRateTracker::with_params(64, 0.20);
464 t.record_admission();
465 t.record_first_token();
466 for _ in 0..20 {
470 sleep(ms(1));
471 t.record_token();
472 }
473 for _ in 0..5 {
474 sleep(ms(10));
475 t.record_token();
476 }
477 let s = t.snapshot();
478 assert!(s.tbt_p95_seconds >= s.tbt_p50_seconds);
479 assert!(
482 s.tbt_p95_seconds >= 0.003,
483 "p95 should reflect slow tail; got {}",
484 s.tbt_p95_seconds
485 );
486 }
487
488 #[test]
489 fn tbt_ring_buffer_overwrites_oldest() {
490 let mut t = RequestRateTracker::with_params(4, 0.20);
491 t.record_admission();
492 t.record_first_token();
493 for _ in 0..10 {
494 sleep(ms(1));
495 t.record_token();
496 }
497 let s = t.snapshot();
498 assert!(s.tbt_p50_seconds > 0.0);
500 }
501
502 #[test]
503 fn quantile_sorted_basic() {
504 assert!((quantile_sorted(&[], 0.5) - 0.0).abs() < f64::EPSILON);
505 assert!((quantile_sorted(&[5.0], 0.5) - 5.0).abs() < f64::EPSILON);
506 let v = vec![1.0, 2.0, 3.0, 4.0, 5.0];
507 assert!((quantile_sorted(&v, 0.0) - 1.0).abs() < f64::EPSILON);
508 assert!((quantile_sorted(&v, 1.0) - 5.0).abs() < f64::EPSILON);
509 assert!((quantile_sorted(&v, 0.5) - 3.0).abs() < f64::EPSILON);
510 }
511
512 #[test]
513 fn aggregator_records_and_aggregates() {
514 let agg = RequestRateAggregator::with_window(8);
515 for i in 0..5 {
516 let snap = RequestRateSnapshot {
517 tokens_emitted: 100,
518 tokens_per_second: 50.0 + i as f64,
519 tbt_p50_seconds: 0.020,
520 tbt_p95_seconds: 0.050,
521 queue_wait_seconds: Some(0.010),
522 elapsed_seconds: 2.0,
523 };
524 agg.record(snap);
525 }
526 let agg_snap = agg.snapshot();
527 assert_eq!(agg_snap.completed_requests, 5);
528 assert!(agg_snap.mean_tokens_per_second >= 50.0);
529 assert!(agg_snap.tbt_p50_seconds > 0.0);
530 assert!(agg_snap.tbt_p95_seconds >= agg_snap.tbt_p50_seconds);
531 assert!(agg_snap.mean_queue_wait_seconds > 0.0);
532 }
533
534 #[test]
535 fn aggregator_handles_empty() {
536 let agg = RequestRateAggregator::new();
537 let s = agg.snapshot();
538 assert_eq!(s.completed_requests, 0);
539 assert!(s.mean_tokens_per_second.abs() < f64::EPSILON);
540 assert!(s.tbt_p50_seconds.abs() < f64::EPSILON);
541 assert!(s.tbt_p95_seconds.abs() < f64::EPSILON);
542 }
543
544 #[test]
545 fn aggregator_window_overwrites() {
546 let agg = RequestRateAggregator::with_window(4);
547 for i in 0..10 {
548 let snap = RequestRateSnapshot {
549 tokens_emitted: 1,
550 tokens_per_second: i as f64,
551 tbt_p50_seconds: 0.01,
552 tbt_p95_seconds: 0.02,
553 queue_wait_seconds: None,
554 elapsed_seconds: 0.0,
555 };
556 agg.record(snap);
557 }
558 let s = agg.snapshot();
559 assert_eq!(s.completed_requests, 10);
560 assert!((s.mean_tokens_per_second - 7.5).abs() < 1e-6);
562 }
563
564 #[test]
565 fn aggregator_clear() {
566 let agg = RequestRateAggregator::new();
567 agg.record(RequestRateSnapshot {
568 tokens_emitted: 1,
569 tokens_per_second: 100.0,
570 tbt_p50_seconds: 0.01,
571 tbt_p95_seconds: 0.02,
572 queue_wait_seconds: None,
573 elapsed_seconds: 0.0,
574 });
575 assert_eq!(agg.completed(), 1);
576 agg.clear();
577 let s = agg.snapshot();
578 assert_eq!(s.mean_tokens_per_second, 0.0);
579 assert_eq!(agg.completed(), 1);
581 }
582
583 #[test]
584 fn record_token_without_first_token_works() {
585 let mut t = RequestRateTracker::new();
586 t.record_admission();
587 t.record_token();
589 sleep(ms(2));
590 t.record_token();
591 assert_eq!(t.tokens_emitted(), 2);
592 assert!(t.queue_wait_seconds().is_some());
593 }
594
595 #[test]
596 fn aggregator_is_thread_safe() {
597 use std::sync::Arc;
598 use std::thread;
599
600 let agg = Arc::new(RequestRateAggregator::with_window(64));
601 let mut handles = Vec::new();
602 for tid in 0..4 {
603 let agg = Arc::clone(&agg);
604 handles.push(thread::spawn(move || {
605 for i in 0..50 {
606 agg.record(RequestRateSnapshot {
607 tokens_emitted: 1,
608 tokens_per_second: (tid * 100 + i) as f64,
609 tbt_p50_seconds: 0.01,
610 tbt_p95_seconds: 0.02,
611 queue_wait_seconds: None,
612 elapsed_seconds: 0.0,
613 });
614 }
615 }));
616 }
617 for h in handles {
618 h.join().expect("worker panicked");
619 }
620 assert_eq!(agg.completed(), 4 * 50);
621 }
622}