1pub mod arrivals;
27pub mod env;
28pub mod profile;
29pub mod report;
30pub mod stats;
31pub mod trace;
32
33pub use env::{Env, EnvHash};
34pub use profile::{
35 configure_global_profile, flush_global_profile, global_profile, parse_profile_event_value,
36 parse_profile_jsonl_str, profile_fields_from_json, ProfileEvent, ProfileJsonlWriter,
37 ProfileMetadata, ProfileSinkConfig,
38};
39pub use stats::{ci95_half_width, percentile, student_t_975, PercentileStats, ScalarStats};
40
41use serde::{Deserialize, Serialize};
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(rename_all = "snake_case")]
46pub enum Scenario {
47 ClosedLoop,
49 OpenLoop,
51 SharedPrefix,
53 Cli,
55}
56
57#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
63pub struct Slo {
64 pub ttft_p99_ms: f64,
65 pub tpot_p99_ms: f64,
66 pub e2e_p99_ms: f64,
67}
68
69impl Default for Slo {
70 fn default() -> Self {
71 Self {
73 ttft_p99_ms: 500.0,
74 tpot_p99_ms: 50.0,
75 e2e_p99_ms: 30_000.0,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MetricSet {
84 pub p50: PercentileStats,
85 pub p75: PercentileStats,
86 pub p95: PercentileStats,
87 pub p99: PercentileStats,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct BenchReport {
95 pub model: String,
96 pub backend: String,
97 pub scenario: Scenario,
98
99 #[serde(default, skip_serializing_if = "Option::is_none")]
101 pub concurrency: Option<u32>,
102 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub request_rate: Option<f64>,
105
106 pub n_prompt: u32,
107 pub n_gen: u32,
108 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub actual_input_tokens: Option<TokenLengthStats>,
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub actual_input_tokens_per_request: Option<Vec<Vec<u32>>>,
112 #[serde(default, skip_serializing_if = "Option::is_none")]
113 pub output_token_count_source: Option<String>,
114 pub n_repeats: u32,
115 pub n_requests_per_run: u32,
116 pub warmup_requests: u32,
117
118 pub ttft_ms: MetricSet,
119 pub tpot_ms: MetricSet,
120 pub itl_ms: MetricSet,
121 pub e2e_ms: MetricSet,
122
123 pub output_throughput_tps: ScalarStats,
124 pub total_throughput_tps: ScalarStats,
125 pub request_throughput_rps: ScalarStats,
126 pub goodput_rps: ScalarStats,
127
128 pub slo: Slo,
129
130 pub completed_per_run: Vec<u32>,
131 pub errored_per_run: Vec<u32>,
132 #[serde(default)]
133 pub bad_output_per_run: Vec<u32>,
134 #[serde(default)]
135 pub malformed_stream_per_run: Vec<u32>,
136 #[serde(default)]
137 pub missing_done_per_run: Vec<u32>,
138 #[serde(default)]
139 pub duplicate_done_per_run: Vec<u32>,
140 #[serde(default)]
141 pub zero_output_tokens_per_run: Vec<u32>,
142 #[serde(default)]
143 pub stream_bulk_flush_per_run: Vec<u32>,
144 #[serde(default)]
145 pub http_500_per_run: Vec<u32>,
146 #[serde(default)]
147 pub panic_per_run: Vec<u32>,
148 #[serde(default)]
149 pub quality_issues_per_run: Vec<QualityIssueCounts>,
150
151 pub env: Env,
152 pub env_hash: EnvHash,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct TokenLengthStats {
157 pub requested: u32,
158 pub min: u32,
159 pub max: u32,
160 pub mean: f64,
161}
162
163#[derive(Debug, Clone)]
165pub struct RequestRecord {
166 pub success: bool,
167 pub ttft_ms: f64,
168 pub e2e_ms: f64,
169 pub input_tokens: u32,
170 pub output_tokens: u32,
171 pub output_token_count_source: OutputTokenCountSource,
172 pub quality_issues: QualityIssueCounts,
173 pub itl_ms: Vec<f64>,
176}
177
178#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
179pub struct QualityIssueCounts {
180 pub bad_output: u32,
181 pub malformed_stream: u32,
182 pub missing_done: u32,
183 pub duplicate_done: u32,
184 pub zero_output_tokens: u32,
185 pub stream_bulk_flush: u32,
186 pub http_500: u32,
187 pub panic: u32,
188}
189
190impl QualityIssueCounts {
191 pub fn add_assign(&mut self, other: &Self) {
192 self.bad_output += other.bad_output;
193 self.malformed_stream += other.malformed_stream;
194 self.missing_done += other.missing_done;
195 self.duplicate_done += other.duplicate_done;
196 self.zero_output_tokens += other.zero_output_tokens;
197 self.stream_bulk_flush += other.stream_bulk_flush;
198 self.http_500 += other.http_500;
199 self.panic += other.panic;
200 }
201
202 pub fn request_error_count(&self) -> u32 {
203 self.bad_output
204 + self.malformed_stream
205 + self.missing_done
206 + self.duplicate_done
207 + self.zero_output_tokens
208 + self.http_500
209 + self.panic
210 }
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214pub enum OutputTokenCountSource {
215 Usage,
216 StreamChunks,
217 None,
218}
219
220impl OutputTokenCountSource {
221 pub fn as_str(self) -> &'static str {
222 match self {
223 Self::Usage => "usage",
224 Self::StreamChunks => "stream_chunks",
225 Self::None => "none",
226 }
227 }
228}
229
230impl RequestRecord {
231 pub fn tpot_ms(&self) -> Option<f64> {
233 if self.output_tokens < 2 {
234 return None;
235 }
236 Some((self.e2e_ms - self.ttft_ms) / (self.output_tokens - 1) as f64)
237 }
238
239 pub fn meets_slo(&self, slo: &Slo) -> bool {
242 if !self.success {
243 return false;
244 }
245 let ttft_ok = self.ttft_ms <= slo.ttft_p99_ms;
246 let e2e_ok = self.e2e_ms <= slo.e2e_p99_ms;
247 let tpot_ok = self.tpot_ms().map(|t| t <= slo.tpot_p99_ms).unwrap_or(true);
248 ttft_ok && e2e_ok && tpot_ok
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct RunRecord {
255 pub records: Vec<RequestRecord>,
256 pub duration_s: f64,
259}
260
261impl RunRecord {
262 pub fn n_completed(&self) -> u32 {
263 self.records.iter().filter(|r| r.success).count() as u32
264 }
265 pub fn n_errored(&self) -> u32 {
266 self.records.iter().filter(|r| !r.success).count() as u32
267 }
268}
269
270#[allow(clippy::too_many_arguments)]
281pub fn compute_metrics(
282 model: String,
283 backend: String,
284 scenario: Scenario,
285 concurrency: Option<u32>,
286 request_rate: Option<f64>,
287 n_prompt: u32,
288 n_gen: u32,
289 warmup_requests: u32,
290 slo: Slo,
291 runs: Vec<RunRecord>,
292 env: Env,
293) -> BenchReport {
294 assert!(!runs.is_empty(), "compute_metrics: n_repeats must be ≥ 1");
295 let n_repeats = runs.len() as u32;
296 let n_requests_per_run = runs[0].records.len() as u32;
297
298 let mut ttft_p50 = Vec::with_capacity(runs.len());
299 let mut ttft_p75 = Vec::with_capacity(runs.len());
300 let mut ttft_p95 = Vec::with_capacity(runs.len());
301 let mut ttft_p99 = Vec::with_capacity(runs.len());
302 let mut tpot_p50 = Vec::with_capacity(runs.len());
303 let mut tpot_p75 = Vec::with_capacity(runs.len());
304 let mut tpot_p95 = Vec::with_capacity(runs.len());
305 let mut tpot_p99 = Vec::with_capacity(runs.len());
306 let mut itl_p50 = Vec::with_capacity(runs.len());
307 let mut itl_p75 = Vec::with_capacity(runs.len());
308 let mut itl_p95 = Vec::with_capacity(runs.len());
309 let mut itl_p99 = Vec::with_capacity(runs.len());
310 let mut e2e_p50 = Vec::with_capacity(runs.len());
311 let mut e2e_p75 = Vec::with_capacity(runs.len());
312 let mut e2e_p95 = Vec::with_capacity(runs.len());
313 let mut e2e_p99 = Vec::with_capacity(runs.len());
314
315 let mut output_thr = Vec::with_capacity(runs.len());
316 let mut total_thr = Vec::with_capacity(runs.len());
317 let mut req_thr = Vec::with_capacity(runs.len());
318 let mut good_thr = Vec::with_capacity(runs.len());
319
320 let mut completed_per_run = Vec::with_capacity(runs.len());
321 let mut errored_per_run = Vec::with_capacity(runs.len());
322 let mut quality_issues_per_run = Vec::with_capacity(runs.len());
323 let mut bad_output_per_run = Vec::with_capacity(runs.len());
324 let mut malformed_stream_per_run = Vec::with_capacity(runs.len());
325 let mut missing_done_per_run = Vec::with_capacity(runs.len());
326 let mut duplicate_done_per_run = Vec::with_capacity(runs.len());
327 let mut zero_output_tokens_per_run = Vec::with_capacity(runs.len());
328 let mut stream_bulk_flush_per_run = Vec::with_capacity(runs.len());
329 let mut http_500_per_run = Vec::with_capacity(runs.len());
330 let mut panic_per_run = Vec::with_capacity(runs.len());
331
332 for run in &runs {
333 let success: Vec<&RequestRecord> = run.records.iter().filter(|r| r.success).collect();
334 completed_per_run.push(success.len() as u32);
335 errored_per_run.push((run.records.len() - success.len()) as u32);
336 let mut quality = QualityIssueCounts::default();
337 for record in &run.records {
338 quality.add_assign(&record.quality_issues);
339 }
340 bad_output_per_run.push(quality.bad_output);
341 malformed_stream_per_run.push(quality.malformed_stream);
342 missing_done_per_run.push(quality.missing_done);
343 duplicate_done_per_run.push(quality.duplicate_done);
344 zero_output_tokens_per_run.push(quality.zero_output_tokens);
345 stream_bulk_flush_per_run.push(quality.stream_bulk_flush);
346 http_500_per_run.push(quality.http_500);
347 panic_per_run.push(quality.panic);
348 quality_issues_per_run.push(quality);
349
350 let ttfts: Vec<f64> = success.iter().map(|r| r.ttft_ms).collect();
351 let tpots: Vec<f64> = success.iter().filter_map(|r| r.tpot_ms()).collect();
352 let e2es: Vec<f64> = success.iter().map(|r| r.e2e_ms).collect();
353 let itls: Vec<f64> = success
354 .iter()
355 .flat_map(|r| r.itl_ms.iter().copied())
356 .collect();
357
358 ttft_p50.push(percentile(&ttfts, 0.50));
359 ttft_p75.push(percentile(&ttfts, 0.75));
360 ttft_p95.push(percentile(&ttfts, 0.95));
361 ttft_p99.push(percentile(&ttfts, 0.99));
362 tpot_p50.push(percentile(&tpots, 0.50));
363 tpot_p75.push(percentile(&tpots, 0.75));
364 tpot_p95.push(percentile(&tpots, 0.95));
365 tpot_p99.push(percentile(&tpots, 0.99));
366 itl_p50.push(percentile(&itls, 0.50));
367 itl_p75.push(percentile(&itls, 0.75));
368 itl_p95.push(percentile(&itls, 0.95));
369 itl_p99.push(percentile(&itls, 0.99));
370 e2e_p50.push(percentile(&e2es, 0.50));
371 e2e_p75.push(percentile(&e2es, 0.75));
372 e2e_p95.push(percentile(&e2es, 0.95));
373 e2e_p99.push(percentile(&e2es, 0.99));
374
375 let total_in: u64 = success.iter().map(|r| r.input_tokens as u64).sum();
376 let total_out: u64 = success.iter().map(|r| r.output_tokens as u64).sum();
377 let dur = run.duration_s.max(f64::EPSILON);
378 output_thr.push(total_out as f64 / dur);
379 total_thr.push((total_in + total_out) as f64 / dur);
380 req_thr.push(success.len() as f64 / dur);
381
382 let good = success.iter().filter(|r| r.meets_slo(&slo)).count();
383 good_thr.push(good as f64 / dur);
384 }
385
386 let env_hash = env.hash();
387 BenchReport {
388 model,
389 backend,
390 scenario,
391 concurrency,
392 request_rate,
393 n_prompt,
394 n_gen,
395 actual_input_tokens: None,
396 actual_input_tokens_per_request: None,
397 output_token_count_source: None,
398 n_repeats,
399 n_requests_per_run,
400 warmup_requests,
401 ttft_ms: MetricSet {
402 p50: ScalarStats::from_samples(&ttft_p50),
403 p75: ScalarStats::from_samples(&ttft_p75),
404 p95: ScalarStats::from_samples(&ttft_p95),
405 p99: ScalarStats::from_samples(&ttft_p99),
406 },
407 tpot_ms: MetricSet {
408 p50: ScalarStats::from_samples(&tpot_p50),
409 p75: ScalarStats::from_samples(&tpot_p75),
410 p95: ScalarStats::from_samples(&tpot_p95),
411 p99: ScalarStats::from_samples(&tpot_p99),
412 },
413 itl_ms: MetricSet {
414 p50: ScalarStats::from_samples(&itl_p50),
415 p75: ScalarStats::from_samples(&itl_p75),
416 p95: ScalarStats::from_samples(&itl_p95),
417 p99: ScalarStats::from_samples(&itl_p99),
418 },
419 e2e_ms: MetricSet {
420 p50: ScalarStats::from_samples(&e2e_p50),
421 p75: ScalarStats::from_samples(&e2e_p75),
422 p95: ScalarStats::from_samples(&e2e_p95),
423 p99: ScalarStats::from_samples(&e2e_p99),
424 },
425 output_throughput_tps: ScalarStats::from_samples(&output_thr),
426 total_throughput_tps: ScalarStats::from_samples(&total_thr),
427 request_throughput_rps: ScalarStats::from_samples(&req_thr),
428 goodput_rps: ScalarStats::from_samples(&good_thr),
429 slo,
430 completed_per_run,
431 errored_per_run,
432 bad_output_per_run,
433 malformed_stream_per_run,
434 missing_done_per_run,
435 duplicate_done_per_run,
436 zero_output_tokens_per_run,
437 stream_bulk_flush_per_run,
438 http_500_per_run,
439 panic_per_run,
440 quality_issues_per_run,
441 env,
442 env_hash,
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 fn req(success: bool, ttft: f64, e2e: f64, in_tok: u32, out_tok: u32) -> RequestRecord {
451 RequestRecord {
452 success,
453 ttft_ms: ttft,
454 e2e_ms: e2e,
455 input_tokens: in_tok,
456 output_tokens: out_tok,
457 output_token_count_source: if out_tok > 0 {
458 OutputTokenCountSource::Usage
459 } else {
460 OutputTokenCountSource::None
461 },
462 quality_issues: QualityIssueCounts::default(),
463 itl_ms: vec![],
464 }
465 }
466
467 #[test]
468 fn tpot_undefined_for_short_response() {
469 let r = req(true, 100.0, 100.0, 5, 1);
470 assert_eq!(r.tpot_ms(), None);
471 let r = req(true, 100.0, 200.0, 5, 2);
472 assert_eq!(r.tpot_ms(), Some(100.0));
473 }
474
475 #[test]
476 fn slo_short_response_treated_as_tpot_ok() {
477 let slo = Slo::default();
478 let r = req(true, 100.0, 200.0, 5, 1);
480 assert!(r.meets_slo(&slo));
481 }
482
483 #[test]
484 fn slo_failure_modes() {
485 let slo = Slo::default();
486 assert!(!req(true, 1000.0, 1100.0, 5, 10).meets_slo(&slo));
488 assert!(!req(true, 100.0, 40_000.0, 5, 10).meets_slo(&slo));
490 assert!(!req(false, 100.0, 200.0, 5, 10).meets_slo(&slo));
492 assert!(req(true, 100.0, 200.0, 5, 10).meets_slo(&slo));
494 }
495
496 fn make_run(records: Vec<RequestRecord>, duration_s: f64) -> RunRecord {
497 RunRecord {
498 records,
499 duration_s,
500 }
501 }
502
503 #[test]
504 fn aggregate_three_repeats() {
505 let mk_run = || {
507 make_run(
508 vec![
509 req(true, 100.0, 200.0, 10, 10),
510 req(true, 120.0, 240.0, 10, 10),
511 req(true, 140.0, 280.0, 10, 10),
512 req(true, 160.0, 320.0, 10, 10),
513 ],
514 10.0,
515 )
516 };
517 let report = compute_metrics(
518 "test".into(),
519 "cpu".into(),
520 Scenario::ClosedLoop,
521 Some(4),
522 None,
523 10,
524 10,
525 0,
526 Slo::default(),
527 vec![mk_run(), mk_run(), mk_run()],
528 Env::default(),
529 );
530 assert_eq!(report.n_repeats, 3);
531 assert_eq!(report.n_requests_per_run, 4);
532 assert_eq!(report.bad_output_per_run, vec![0, 0, 0]);
533 assert_eq!(report.malformed_stream_per_run, vec![0, 0, 0]);
534 assert_eq!(report.ttft_ms.p50.stddev, 0.0);
536 assert!((report.ttft_ms.p50.mean - 130.0).abs() < 1e-9);
538 assert!((report.output_throughput_tps.mean - 4.0).abs() < 1e-9);
540 assert!((report.request_throughput_rps.mean - 0.4).abs() < 1e-9);
542 assert!((report.goodput_rps.mean - 0.4).abs() < 1e-9);
544 assert!(report.env_hash.as_str().starts_with("sha256:"));
546 }
547
548 #[test]
549 fn goodput_excludes_slo_violators() {
550 let run = make_run(
551 vec![
552 req(true, 100.0, 200.0, 10, 10), req(true, 1000.0, 1100.0, 10, 10), req(true, 100.0, 40_000.0, 10, 10), req(false, 100.0, 200.0, 10, 10), ],
557 10.0,
558 );
559 let report = compute_metrics(
560 "test".into(),
561 "cpu".into(),
562 Scenario::OpenLoop,
563 None,
564 Some(10.0),
565 10,
566 10,
567 0,
568 Slo::default(),
569 vec![run],
570 Env::default(),
571 );
572 assert!((report.request_throughput_rps.mean - 0.3).abs() < 1e-9);
574 assert!((report.goodput_rps.mean - 0.1).abs() < 1e-9);
576 }
577
578 #[test]
579 fn json_round_trip() {
580 let run = make_run(
581 vec![
582 req(true, 100.0, 200.0, 10, 10),
583 req(true, 120.0, 240.0, 10, 10),
584 ],
585 5.0,
586 );
587 let report = compute_metrics(
588 "qwen3:0.6b".into(),
589 "metal".into(),
590 Scenario::ClosedLoop,
591 Some(2),
592 None,
593 256,
594 128,
595 10,
596 Slo::default(),
597 vec![run.clone(), run.clone(), run],
598 Env::default(),
599 );
600 let json = serde_json::to_string_pretty(&report).unwrap();
601 let parsed: BenchReport = serde_json::from_str(&json).unwrap();
602 assert_eq!(parsed.model, "qwen3:0.6b");
603 assert_eq!(parsed.backend, "metal");
604 assert_eq!(parsed.n_repeats, 3);
605 assert_eq!(parsed.concurrency, Some(2));
606 assert_eq!(parsed.request_rate, None);
607 assert_eq!(parsed.quality_issues_per_run.len(), 3);
608 }
609
610 #[test]
611 fn aggregates_quality_issues_per_run() {
612 let mut bad = req(false, 100.0, 200.0, 10, 0);
613 bad.quality_issues.bad_output = 1;
614 bad.quality_issues.missing_done = 1;
615 let mut malformed = req(false, 100.0, 200.0, 10, 0);
616 malformed.quality_issues.malformed_stream = 1;
617 malformed.quality_issues.http_500 = 1;
618 let report = compute_metrics(
619 "test".into(),
620 "cpu".into(),
621 Scenario::ClosedLoop,
622 Some(2),
623 None,
624 10,
625 10,
626 0,
627 Slo::default(),
628 vec![make_run(vec![bad], 1.0), make_run(vec![malformed], 1.0)],
629 Env::default(),
630 );
631 assert_eq!(report.bad_output_per_run, vec![1, 0]);
632 assert_eq!(report.malformed_stream_per_run, vec![0, 1]);
633 assert_eq!(report.missing_done_per_run, vec![1, 0]);
634 assert_eq!(report.http_500_per_run, vec![0, 1]);
635 }
636}