pub mod arrivals;
pub mod env;
pub mod profile;
pub mod report;
pub mod stats;
pub mod trace;
pub use env::{Env, EnvHash};
pub use profile::{
configure_global_profile, flush_global_profile, global_profile, parse_profile_event_value,
parse_profile_jsonl_str, profile_fields_from_json, ProfileEvent, ProfileJsonlWriter,
ProfileMetadata, ProfileSinkConfig,
};
pub use stats::{ci95_half_width, percentile, student_t_975, PercentileStats, ScalarStats};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Scenario {
ClosedLoop,
OpenLoop,
SharedPrefix,
Cli,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct Slo {
pub ttft_p99_ms: f64,
pub tpot_p99_ms: f64,
pub e2e_p99_ms: f64,
}
impl Default for Slo {
fn default() -> Self {
Self {
ttft_p99_ms: 500.0,
tpot_p99_ms: 50.0,
e2e_p99_ms: 30_000.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricSet {
pub p50: PercentileStats,
pub p75: PercentileStats,
pub p95: PercentileStats,
pub p99: PercentileStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BenchReport {
pub model: String,
pub backend: String,
pub scenario: Scenario,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub concurrency: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_rate: Option<f64>,
pub n_prompt: u32,
pub n_gen: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub actual_input_tokens: Option<TokenLengthStats>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub actual_input_tokens_per_request: Option<Vec<Vec<u32>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_token_count_source: Option<String>,
pub n_repeats: u32,
pub n_requests_per_run: u32,
pub warmup_requests: u32,
pub ttft_ms: MetricSet,
pub tpot_ms: MetricSet,
pub itl_ms: MetricSet,
pub e2e_ms: MetricSet,
pub output_throughput_tps: ScalarStats,
pub total_throughput_tps: ScalarStats,
pub request_throughput_rps: ScalarStats,
pub goodput_rps: ScalarStats,
pub slo: Slo,
pub completed_per_run: Vec<u32>,
pub errored_per_run: Vec<u32>,
#[serde(default)]
pub bad_output_per_run: Vec<u32>,
#[serde(default)]
pub malformed_stream_per_run: Vec<u32>,
#[serde(default)]
pub missing_done_per_run: Vec<u32>,
#[serde(default)]
pub duplicate_done_per_run: Vec<u32>,
#[serde(default)]
pub zero_output_tokens_per_run: Vec<u32>,
#[serde(default)]
pub stream_bulk_flush_per_run: Vec<u32>,
#[serde(default)]
pub http_500_per_run: Vec<u32>,
#[serde(default)]
pub panic_per_run: Vec<u32>,
#[serde(default)]
pub quality_issues_per_run: Vec<QualityIssueCounts>,
pub env: Env,
pub env_hash: EnvHash,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokenLengthStats {
pub requested: u32,
pub min: u32,
pub max: u32,
pub mean: f64,
}
#[derive(Debug, Clone)]
pub struct RequestRecord {
pub success: bool,
pub ttft_ms: f64,
pub e2e_ms: f64,
pub input_tokens: u32,
pub output_tokens: u32,
pub output_token_count_source: OutputTokenCountSource,
pub quality_issues: QualityIssueCounts,
pub itl_ms: Vec<f64>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct QualityIssueCounts {
pub bad_output: u32,
pub malformed_stream: u32,
pub missing_done: u32,
pub duplicate_done: u32,
pub zero_output_tokens: u32,
pub stream_bulk_flush: u32,
pub http_500: u32,
pub panic: u32,
}
impl QualityIssueCounts {
pub fn add_assign(&mut self, other: &Self) {
self.bad_output += other.bad_output;
self.malformed_stream += other.malformed_stream;
self.missing_done += other.missing_done;
self.duplicate_done += other.duplicate_done;
self.zero_output_tokens += other.zero_output_tokens;
self.stream_bulk_flush += other.stream_bulk_flush;
self.http_500 += other.http_500;
self.panic += other.panic;
}
pub fn request_error_count(&self) -> u32 {
self.bad_output
+ self.malformed_stream
+ self.missing_done
+ self.duplicate_done
+ self.zero_output_tokens
+ self.http_500
+ self.panic
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OutputTokenCountSource {
Usage,
StreamChunks,
None,
}
impl OutputTokenCountSource {
pub fn as_str(self) -> &'static str {
match self {
Self::Usage => "usage",
Self::StreamChunks => "stream_chunks",
Self::None => "none",
}
}
}
impl RequestRecord {
pub fn tpot_ms(&self) -> Option<f64> {
if self.output_tokens < 2 {
return None;
}
Some((self.e2e_ms - self.ttft_ms) / (self.output_tokens - 1) as f64)
}
pub fn meets_slo(&self, slo: &Slo) -> bool {
if !self.success {
return false;
}
let ttft_ok = self.ttft_ms <= slo.ttft_p99_ms;
let e2e_ok = self.e2e_ms <= slo.e2e_p99_ms;
let tpot_ok = self.tpot_ms().map(|t| t <= slo.tpot_p99_ms).unwrap_or(true);
ttft_ok && e2e_ok && tpot_ok
}
}
#[derive(Debug, Clone)]
pub struct RunRecord {
pub records: Vec<RequestRecord>,
pub duration_s: f64,
}
impl RunRecord {
pub fn n_completed(&self) -> u32 {
self.records.iter().filter(|r| r.success).count() as u32
}
pub fn n_errored(&self) -> u32 {
self.records.iter().filter(|r| !r.success).count() as u32
}
}
#[allow(clippy::too_many_arguments)]
pub fn compute_metrics(
model: String,
backend: String,
scenario: Scenario,
concurrency: Option<u32>,
request_rate: Option<f64>,
n_prompt: u32,
n_gen: u32,
warmup_requests: u32,
slo: Slo,
runs: Vec<RunRecord>,
env: Env,
) -> BenchReport {
assert!(!runs.is_empty(), "compute_metrics: n_repeats must be ≥ 1");
let n_repeats = runs.len() as u32;
let n_requests_per_run = runs[0].records.len() as u32;
let mut ttft_p50 = Vec::with_capacity(runs.len());
let mut ttft_p75 = Vec::with_capacity(runs.len());
let mut ttft_p95 = Vec::with_capacity(runs.len());
let mut ttft_p99 = Vec::with_capacity(runs.len());
let mut tpot_p50 = Vec::with_capacity(runs.len());
let mut tpot_p75 = Vec::with_capacity(runs.len());
let mut tpot_p95 = Vec::with_capacity(runs.len());
let mut tpot_p99 = Vec::with_capacity(runs.len());
let mut itl_p50 = Vec::with_capacity(runs.len());
let mut itl_p75 = Vec::with_capacity(runs.len());
let mut itl_p95 = Vec::with_capacity(runs.len());
let mut itl_p99 = Vec::with_capacity(runs.len());
let mut e2e_p50 = Vec::with_capacity(runs.len());
let mut e2e_p75 = Vec::with_capacity(runs.len());
let mut e2e_p95 = Vec::with_capacity(runs.len());
let mut e2e_p99 = Vec::with_capacity(runs.len());
let mut output_thr = Vec::with_capacity(runs.len());
let mut total_thr = Vec::with_capacity(runs.len());
let mut req_thr = Vec::with_capacity(runs.len());
let mut good_thr = Vec::with_capacity(runs.len());
let mut completed_per_run = Vec::with_capacity(runs.len());
let mut errored_per_run = Vec::with_capacity(runs.len());
let mut quality_issues_per_run = Vec::with_capacity(runs.len());
let mut bad_output_per_run = Vec::with_capacity(runs.len());
let mut malformed_stream_per_run = Vec::with_capacity(runs.len());
let mut missing_done_per_run = Vec::with_capacity(runs.len());
let mut duplicate_done_per_run = Vec::with_capacity(runs.len());
let mut zero_output_tokens_per_run = Vec::with_capacity(runs.len());
let mut stream_bulk_flush_per_run = Vec::with_capacity(runs.len());
let mut http_500_per_run = Vec::with_capacity(runs.len());
let mut panic_per_run = Vec::with_capacity(runs.len());
for run in &runs {
let success: Vec<&RequestRecord> = run.records.iter().filter(|r| r.success).collect();
completed_per_run.push(success.len() as u32);
errored_per_run.push((run.records.len() - success.len()) as u32);
let mut quality = QualityIssueCounts::default();
for record in &run.records {
quality.add_assign(&record.quality_issues);
}
bad_output_per_run.push(quality.bad_output);
malformed_stream_per_run.push(quality.malformed_stream);
missing_done_per_run.push(quality.missing_done);
duplicate_done_per_run.push(quality.duplicate_done);
zero_output_tokens_per_run.push(quality.zero_output_tokens);
stream_bulk_flush_per_run.push(quality.stream_bulk_flush);
http_500_per_run.push(quality.http_500);
panic_per_run.push(quality.panic);
quality_issues_per_run.push(quality);
let ttfts: Vec<f64> = success.iter().map(|r| r.ttft_ms).collect();
let tpots: Vec<f64> = success.iter().filter_map(|r| r.tpot_ms()).collect();
let e2es: Vec<f64> = success.iter().map(|r| r.e2e_ms).collect();
let itls: Vec<f64> = success
.iter()
.flat_map(|r| r.itl_ms.iter().copied())
.collect();
ttft_p50.push(percentile(&ttfts, 0.50));
ttft_p75.push(percentile(&ttfts, 0.75));
ttft_p95.push(percentile(&ttfts, 0.95));
ttft_p99.push(percentile(&ttfts, 0.99));
tpot_p50.push(percentile(&tpots, 0.50));
tpot_p75.push(percentile(&tpots, 0.75));
tpot_p95.push(percentile(&tpots, 0.95));
tpot_p99.push(percentile(&tpots, 0.99));
itl_p50.push(percentile(&itls, 0.50));
itl_p75.push(percentile(&itls, 0.75));
itl_p95.push(percentile(&itls, 0.95));
itl_p99.push(percentile(&itls, 0.99));
e2e_p50.push(percentile(&e2es, 0.50));
e2e_p75.push(percentile(&e2es, 0.75));
e2e_p95.push(percentile(&e2es, 0.95));
e2e_p99.push(percentile(&e2es, 0.99));
let total_in: u64 = success.iter().map(|r| r.input_tokens as u64).sum();
let total_out: u64 = success.iter().map(|r| r.output_tokens as u64).sum();
let dur = run.duration_s.max(f64::EPSILON);
output_thr.push(total_out as f64 / dur);
total_thr.push((total_in + total_out) as f64 / dur);
req_thr.push(success.len() as f64 / dur);
let good = success.iter().filter(|r| r.meets_slo(&slo)).count();
good_thr.push(good as f64 / dur);
}
let env_hash = env.hash();
BenchReport {
model,
backend,
scenario,
concurrency,
request_rate,
n_prompt,
n_gen,
actual_input_tokens: None,
actual_input_tokens_per_request: None,
output_token_count_source: None,
n_repeats,
n_requests_per_run,
warmup_requests,
ttft_ms: MetricSet {
p50: ScalarStats::from_samples(&ttft_p50),
p75: ScalarStats::from_samples(&ttft_p75),
p95: ScalarStats::from_samples(&ttft_p95),
p99: ScalarStats::from_samples(&ttft_p99),
},
tpot_ms: MetricSet {
p50: ScalarStats::from_samples(&tpot_p50),
p75: ScalarStats::from_samples(&tpot_p75),
p95: ScalarStats::from_samples(&tpot_p95),
p99: ScalarStats::from_samples(&tpot_p99),
},
itl_ms: MetricSet {
p50: ScalarStats::from_samples(&itl_p50),
p75: ScalarStats::from_samples(&itl_p75),
p95: ScalarStats::from_samples(&itl_p95),
p99: ScalarStats::from_samples(&itl_p99),
},
e2e_ms: MetricSet {
p50: ScalarStats::from_samples(&e2e_p50),
p75: ScalarStats::from_samples(&e2e_p75),
p95: ScalarStats::from_samples(&e2e_p95),
p99: ScalarStats::from_samples(&e2e_p99),
},
output_throughput_tps: ScalarStats::from_samples(&output_thr),
total_throughput_tps: ScalarStats::from_samples(&total_thr),
request_throughput_rps: ScalarStats::from_samples(&req_thr),
goodput_rps: ScalarStats::from_samples(&good_thr),
slo,
completed_per_run,
errored_per_run,
bad_output_per_run,
malformed_stream_per_run,
missing_done_per_run,
duplicate_done_per_run,
zero_output_tokens_per_run,
stream_bulk_flush_per_run,
http_500_per_run,
panic_per_run,
quality_issues_per_run,
env,
env_hash,
}
}
#[cfg(test)]
mod tests {
use super::*;
fn req(success: bool, ttft: f64, e2e: f64, in_tok: u32, out_tok: u32) -> RequestRecord {
RequestRecord {
success,
ttft_ms: ttft,
e2e_ms: e2e,
input_tokens: in_tok,
output_tokens: out_tok,
output_token_count_source: if out_tok > 0 {
OutputTokenCountSource::Usage
} else {
OutputTokenCountSource::None
},
quality_issues: QualityIssueCounts::default(),
itl_ms: vec![],
}
}
#[test]
fn tpot_undefined_for_short_response() {
let r = req(true, 100.0, 100.0, 5, 1);
assert_eq!(r.tpot_ms(), None);
let r = req(true, 100.0, 200.0, 5, 2);
assert_eq!(r.tpot_ms(), Some(100.0));
}
#[test]
fn slo_short_response_treated_as_tpot_ok() {
let slo = Slo::default();
let r = req(true, 100.0, 200.0, 5, 1);
assert!(r.meets_slo(&slo));
}
#[test]
fn slo_failure_modes() {
let slo = Slo::default();
assert!(!req(true, 1000.0, 1100.0, 5, 10).meets_slo(&slo));
assert!(!req(true, 100.0, 40_000.0, 5, 10).meets_slo(&slo));
assert!(!req(false, 100.0, 200.0, 5, 10).meets_slo(&slo));
assert!(req(true, 100.0, 200.0, 5, 10).meets_slo(&slo));
}
fn make_run(records: Vec<RequestRecord>, duration_s: f64) -> RunRecord {
RunRecord {
records,
duration_s,
}
}
#[test]
fn aggregate_three_repeats() {
let mk_run = || {
make_run(
vec![
req(true, 100.0, 200.0, 10, 10),
req(true, 120.0, 240.0, 10, 10),
req(true, 140.0, 280.0, 10, 10),
req(true, 160.0, 320.0, 10, 10),
],
10.0,
)
};
let report = compute_metrics(
"test".into(),
"cpu".into(),
Scenario::ClosedLoop,
Some(4),
None,
10,
10,
0,
Slo::default(),
vec![mk_run(), mk_run(), mk_run()],
Env::default(),
);
assert_eq!(report.n_repeats, 3);
assert_eq!(report.n_requests_per_run, 4);
assert_eq!(report.bad_output_per_run, vec![0, 0, 0]);
assert_eq!(report.malformed_stream_per_run, vec![0, 0, 0]);
assert_eq!(report.ttft_ms.p50.stddev, 0.0);
assert!((report.ttft_ms.p50.mean - 130.0).abs() < 1e-9);
assert!((report.output_throughput_tps.mean - 4.0).abs() < 1e-9);
assert!((report.request_throughput_rps.mean - 0.4).abs() < 1e-9);
assert!((report.goodput_rps.mean - 0.4).abs() < 1e-9);
assert!(report.env_hash.as_str().starts_with("sha256:"));
}
#[test]
fn goodput_excludes_slo_violators() {
let run = make_run(
vec![
req(true, 100.0, 200.0, 10, 10), req(true, 1000.0, 1100.0, 10, 10), req(true, 100.0, 40_000.0, 10, 10), req(false, 100.0, 200.0, 10, 10), ],
10.0,
);
let report = compute_metrics(
"test".into(),
"cpu".into(),
Scenario::OpenLoop,
None,
Some(10.0),
10,
10,
0,
Slo::default(),
vec![run],
Env::default(),
);
assert!((report.request_throughput_rps.mean - 0.3).abs() < 1e-9);
assert!((report.goodput_rps.mean - 0.1).abs() < 1e-9);
}
#[test]
fn json_round_trip() {
let run = make_run(
vec![
req(true, 100.0, 200.0, 10, 10),
req(true, 120.0, 240.0, 10, 10),
],
5.0,
);
let report = compute_metrics(
"qwen3:0.6b".into(),
"metal".into(),
Scenario::ClosedLoop,
Some(2),
None,
256,
128,
10,
Slo::default(),
vec![run.clone(), run.clone(), run],
Env::default(),
);
let json = serde_json::to_string_pretty(&report).unwrap();
let parsed: BenchReport = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.model, "qwen3:0.6b");
assert_eq!(parsed.backend, "metal");
assert_eq!(parsed.n_repeats, 3);
assert_eq!(parsed.concurrency, Some(2));
assert_eq!(parsed.request_rate, None);
assert_eq!(parsed.quality_issues_per_run.len(), 3);
}
#[test]
fn aggregates_quality_issues_per_run() {
let mut bad = req(false, 100.0, 200.0, 10, 0);
bad.quality_issues.bad_output = 1;
bad.quality_issues.missing_done = 1;
let mut malformed = req(false, 100.0, 200.0, 10, 0);
malformed.quality_issues.malformed_stream = 1;
malformed.quality_issues.http_500 = 1;
let report = compute_metrics(
"test".into(),
"cpu".into(),
Scenario::ClosedLoop,
Some(2),
None,
10,
10,
0,
Slo::default(),
vec![make_run(vec![bad], 1.0), make_run(vec![malformed], 1.0)],
Env::default(),
);
assert_eq!(report.bad_output_per_run, vec![1, 0]);
assert_eq!(report.malformed_stream_per_run, vec![0, 1]);
assert_eq!(report.missing_done_per_run, vec![1, 0]);
assert_eq!(report.http_500_per_run, vec![0, 1]);
}
}