use std::time::Instant;
pub(crate) fn itl_stats(xs: &[u32]) -> (Option<f32>, Option<u32>) {
if xs.is_empty() {
return (None, None);
}
let sum: u64 = xs.iter().map(|&x| x as u64).sum();
let mean = sum as f32 / xs.len() as f32;
let mut sorted: Vec<u32> = xs.to_vec();
sorted.sort_unstable();
let idx = (((sorted.len() - 1) as f32) * 0.95).round() as usize;
let p95 = sorted[idx];
(Some(mean), Some(p95))
}
#[derive(Debug, Clone)]
pub(crate) struct StreamingTelemetryFields {
pub generation_time_ms: u64,
pub tokens_per_second: f32,
pub ttft_ms: Option<u64>,
pub mean_itl_ms: Option<f32>,
pub p95_itl_ms: Option<u32>,
pub emitted_chunks: Option<u32>,
pub inter_chunk_ms: Vec<u32>,
pub decode_tps: Option<f32>,
pub prefill_tps: Option<f32>,
}
pub(crate) fn compute_streaming_fields(
start: Instant,
chunk_timestamps: &[Instant],
prompt_token_count: usize,
tokens_generated: usize,
) -> StreamingTelemetryFields {
let elapsed = start.elapsed();
let ttft_ms = chunk_timestamps
.first()
.map(|t0| t0.duration_since(start).as_millis() as u64);
let inter_chunk_ms: Vec<u32> = chunk_timestamps
.windows(2)
.map(|w| w[1].duration_since(w[0]).as_millis() as u32)
.collect();
let (mean_itl_ms, p95_itl_ms) = itl_stats(&inter_chunk_ms);
let decode_tps = mean_itl_ms.and_then(|m| if m > 0.0 { Some(1000.0 / m) } else { None });
let prefill_tps = ttft_ms.and_then(|t| {
if t > 0 && prompt_token_count > 0 {
Some(prompt_token_count as f32 * 1000.0 / t as f32)
} else {
None
}
});
let tokens_per_second = if elapsed.as_secs_f32() > 0.0 {
tokens_generated as f32 / elapsed.as_secs_f32()
} else {
0.0
};
StreamingTelemetryFields {
generation_time_ms: elapsed.as_millis() as u64,
tokens_per_second,
ttft_ms,
mean_itl_ms,
p95_itl_ms,
emitted_chunks: Some(chunk_timestamps.len() as u32),
inter_chunk_ms,
decode_tps,
prefill_tps,
}
}
pub(crate) struct StreamingTelemetry {
start: Instant,
chunk_timestamps: Vec<Instant>,
prompt_token_count: usize,
}
impl StreamingTelemetry {
pub fn new(prompt_token_count: usize) -> Self {
Self {
start: Instant::now(),
chunk_timestamps: Vec::new(),
prompt_token_count,
}
}
pub fn record_chunk(&mut self) {
self.chunk_timestamps.push(Instant::now());
}
pub fn finalize(&self, tokens_generated: usize) -> StreamingTelemetryFields {
compute_streaming_fields(
self.start,
&self.chunk_timestamps,
self.prompt_token_count,
tokens_generated,
)
}
}
#[cfg(test)]
mod tests {
use super::{compute_streaming_fields, itl_stats};
use std::time::{Duration, Instant};
#[test]
fn empty_input_returns_none() {
assert_eq!(itl_stats(&[]), (None, None));
}
#[test]
fn single_value() {
let (mean, p95) = itl_stats(&[10]);
assert_eq!(mean, Some(10.0));
assert_eq!(p95, Some(10));
}
#[test]
fn multiple_values_sorted() {
let (mean, p95) = itl_stats(&[10, 20, 30, 40]);
assert_eq!(mean, Some(25.0));
assert_eq!(p95, Some(40));
}
#[test]
fn multiple_values_unsorted() {
let (mean, p95) = itl_stats(&[30, 10, 40, 20]);
assert_eq!(mean, Some(25.0));
assert_eq!(p95, Some(40));
}
#[test]
fn streaming_fields_empty_stream() {
let start = Instant::now();
let fields = compute_streaming_fields(start, &[], 0, 0);
assert_eq!(fields.ttft_ms, None);
assert_eq!(fields.mean_itl_ms, None);
assert_eq!(fields.p95_itl_ms, None);
assert_eq!(fields.emitted_chunks, Some(0));
assert!(fields.inter_chunk_ms.is_empty());
assert_eq!(fields.decode_tps, None);
assert_eq!(fields.prefill_tps, None);
assert_eq!(fields.tokens_per_second, 0.0);
}
#[test]
fn streaming_fields_derives_ttft_itl_decode_tps() {
let start = Instant::now();
let chunks = [
start + Duration::from_millis(40),
start + Duration::from_millis(60),
start + Duration::from_millis(80),
start + Duration::from_millis(100),
];
let fields = compute_streaming_fields(start, &chunks, 16, 4);
assert_eq!(fields.ttft_ms, Some(40));
assert_eq!(fields.mean_itl_ms, Some(20.0));
assert_eq!(fields.p95_itl_ms, Some(20));
assert_eq!(fields.decode_tps, Some(50.0));
assert_eq!(fields.prefill_tps, Some(400.0));
assert_eq!(fields.emitted_chunks, Some(4));
assert_eq!(fields.inter_chunk_ms, vec![20, 20, 20]);
}
#[test]
fn streaming_fields_zero_prompt_suppresses_prefill_tps() {
let start = Instant::now();
let chunks = [start + Duration::from_millis(50)];
let fields = compute_streaming_fields(start, &chunks, 0, 1);
assert_eq!(fields.ttft_ms, Some(50));
assert_eq!(fields.prefill_tps, None);
}
}