use crate::{
config::{Config, Workload},
filesystem::backend_name,
};
use serde_json::json;
use std::time::Duration;
#[derive(Default)]
pub struct Stats {
pub ops: u64,
pub bytes: u64,
pub latency_samples: Vec<Duration>,
}
impl Stats {
#[inline(always)]
pub fn record(&mut self, bytes: u64, latency: Option<Duration>) {
self.ops += 1;
self.bytes += bytes;
if let Some(latency) = latency {
self.latency_samples.push(latency);
}
}
pub fn merge(&mut self, mut other: Self) {
self.ops += other.ops;
self.bytes += other.bytes;
self.latency_samples.append(&mut other.latency_samples);
}
}
struct OperationReport {
ops: u64,
bytes: u64,
ops_per_sec: f64,
mib_per_sec: f64,
p50_latency: Duration,
p95_latency: Duration,
p99_latency: Duration,
}
impl OperationReport {
fn new(workers: Vec<Stats>, elapsed: Duration) -> Self {
let mut merged = Stats::default();
for w in workers {
merged.merge(w);
}
merged.latency_samples.sort_unstable();
let elapsed_secs = elapsed.as_secs_f64().max(f64::EPSILON);
let percentile = |pct: usize| {
if merged.latency_samples.is_empty() {
return Duration::ZERO;
}
merged.latency_samples[(merged.latency_samples.len() - 1) * pct / 100]
};
Self {
ops: merged.ops,
bytes: merged.bytes,
ops_per_sec: merged.ops as f64 / elapsed_secs,
mib_per_sec: (merged.bytes as f64 / (1024.0 * 1024.0)) / elapsed_secs,
p50_latency: percentile(50),
p95_latency: percentile(95),
p99_latency: percentile(99),
}
}
fn print(&self, label: &str) {
println!(
"{label} ops={} bytes={} ops_per_sec={:.0} mib_per_sec={:.1} p50_us={:.1} p95_us={:.1} p99_us={:.1}",
self.ops,
self.bytes,
self.ops_per_sec,
self.mib_per_sec,
self.p50_latency.as_nanos() as f64 / 1_000.0,
self.p95_latency.as_nanos() as f64 / 1_000.0,
self.p99_latency.as_nanos() as f64 / 1_000.0,
);
}
fn to_json(&self) -> serde_json::Value {
json!({
"ops": self.ops,
"bytes": self.bytes,
"ops_per_sec": self.ops_per_sec,
"mib_per_sec": self.mib_per_sec,
"p50_latency_ns": self.p50_latency.as_nanos() as u64,
"p95_latency_ns": self.p95_latency.as_nanos() as u64,
"p99_latency_ns": self.p99_latency.as_nanos() as u64,
})
}
}
pub struct Report {
elapsed: Duration,
read: Option<OperationReport>,
write: Option<OperationReport>,
final_file_size: u64,
}
impl Report {
pub fn new(
elapsed: Duration,
read_workers: Option<Vec<Stats>>,
write_workers: Option<Vec<Stats>>,
final_file_size: u64,
) -> Self {
Self {
elapsed,
read: read_workers.map(|w| OperationReport::new(w, elapsed)),
write: write_workers.map(|w| OperationReport::new(w, elapsed)),
final_file_size,
}
}
pub fn print_human(&self, cfg: &Config) {
println!(
"backend={} workload={} elapsed_s={:.3}",
backend_name(),
cfg.workload,
self.elapsed.as_secs_f64(),
);
println!(
"io_size={} inflight={} worker_threads={} global_queue_interval={} seed={} output={}",
cfg.io_size,
cfg.inflight,
cfg.worker_threads,
cfg.global_queue_interval
.map_or_else(|| "default".to_string(), |value| value.to_string()),
cfg.seed,
cfg.output,
);
if let Some(file_size) = cfg.file_size {
println!("file_size={file_size}");
}
println!("root={}", cfg.root.display());
if let Some(cache) = cfg.cache {
println!("cache={cache}");
}
if cfg.workload.has_writes() {
println!("write_shape={}", cfg.write_shape);
if cfg.workload == Workload::WriteSync {
println!("sync_method={}", cfg.sync_method);
} else {
println!("sync_every={}", cfg.sync_mode);
}
}
if let Some(read) = &self.read {
read.print("read");
}
if let Some(write) = &self.write {
write.print("write");
}
println!("final_file_size={}", self.final_file_size);
}
pub fn print_json(&self, cfg: &Config) {
let json = json!({
"backend": backend_name(),
"workload": cfg.workload.to_string(),
"duration_seconds": cfg.duration().as_secs(),
"io_size": cfg.io_size,
"inflight": cfg.inflight,
"worker_threads": cfg.worker_threads,
"global_queue_interval": cfg.global_queue_interval,
"file_size": cfg.file_size,
"root": cfg.root,
"cache": cfg.cache.map(|mode| mode.to_string()),
"write_shape": cfg.workload.has_writes().then(|| cfg.write_shape.to_string()),
"sync_every": (cfg.workload.has_writes() && cfg.workload != Workload::WriteSync)
.then(|| cfg.sync_mode.to_string()),
"sync_method": (cfg.workload == Workload::WriteSync)
.then(|| cfg.sync_method.to_string()),
"seed": cfg.seed,
"elapsed_ns": self.elapsed.as_nanos() as u64,
"read": self.read.as_ref().map(OperationReport::to_json),
"write": self.write.as_ref().map(OperationReport::to_json),
"final_file_size": self.final_file_size,
});
println!("{json}");
}
}