#![expect(
dead_code,
reason = "bench helper module; each bench binary exercises a different subset"
)]
use std::fmt::Write as _;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use sysinfo::{ProcessesToUpdate, System};
pub(crate) const MEASUREMENTS_DDL: &str =
"id INT NOT NULL, sensor_id INT, value DOUBLE PRECISION, timestamp BIGINT";
pub(crate) const BYTES_PER_ROW: usize = 24;
#[inline]
pub(crate) fn gen_id(start_id: i64, i: i64) -> i32 {
(start_id + i) as i32
}
#[inline]
pub(crate) fn gen_sensor_id(id: i32) -> i32 {
id % 10
}
#[inline]
pub(crate) fn gen_value(id: i32) -> f64 {
f64::from(id) * 0.1
}
#[inline]
pub(crate) fn gen_timestamp(id: i32) -> i64 {
1_700_000_000_000i64 + i64::from(id) * 1000
}
pub(crate) const SAMPLE_INTERVAL_MS: u64 = 100;
#[derive(Debug, Clone, Default)]
pub(crate) struct ResourceStats {
pub cpu_samples: Vec<f32>,
pub memory_samples: Vec<u64>,
pub sample_count: usize,
}
impl ResourceStats {
pub(crate) fn cpu_avg(&self) -> f32 {
if self.cpu_samples.is_empty() {
0.0
} else {
self.cpu_samples.iter().sum::<f32>() / self.cpu_samples.len() as f32
}
}
pub(crate) fn cpu_max(&self) -> f32 {
self.cpu_samples.iter().copied().fold(0.0f32, f32::max)
}
pub(crate) fn memory_avg_mb(&self) -> f64 {
if self.memory_samples.is_empty() {
0.0
} else {
let avg =
self.memory_samples.iter().sum::<u64>() as f64 / self.memory_samples.len() as f64;
avg / (1024.0 * 1024.0)
}
}
pub(crate) fn memory_max_mb(&self) -> f64 {
self.memory_samples.iter().copied().max().unwrap_or(0) as f64 / (1024.0 * 1024.0)
}
pub(crate) fn memory_min_mb(&self) -> f64 {
self.memory_samples.iter().copied().min().unwrap_or(0) as f64 / (1024.0 * 1024.0)
}
}
pub(crate) struct ResourceMonitor {
running: Arc<AtomicBool>,
stats: Arc<Mutex<ResourceStats>>,
handle: Option<thread::JoinHandle<()>>,
}
impl ResourceMonitor {
pub(crate) fn start() -> Self {
let running = Arc::new(AtomicBool::new(true));
let stats = Arc::new(Mutex::new(ResourceStats::default()));
let running_c = Arc::clone(&running);
let stats_c = Arc::clone(&stats);
let handle = thread::spawn(move || {
let mut sys = System::new_all();
let Ok(pid) = sysinfo::get_current_pid() else {
return;
};
while running_c.load(Ordering::Relaxed) {
sys.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
if let Some(proc_) = sys.process(pid) {
let cpu = proc_.cpu_usage();
let mem = proc_.memory();
if let Ok(mut s) = stats_c.lock() {
s.cpu_samples.push(cpu);
s.memory_samples.push(mem);
s.sample_count += 1;
}
}
thread::sleep(Duration::from_millis(SAMPLE_INTERVAL_MS));
}
});
ResourceMonitor {
running,
stats,
handle: Some(handle),
}
}
pub(crate) fn stop(mut self) -> ResourceStats {
self.running.store(false, Ordering::Relaxed);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
self.stats.lock().map(|s| s.clone()).unwrap_or_default()
}
}
pub(crate) fn fmt_count(n: u64) -> String {
if n >= 1_000_000_000 {
format!("{:.2}B", n as f64 / 1_000_000_000.0)
} else if n >= 1_000_000 {
format!("{:.2}M", n as f64 / 1_000_000.0)
} else if n >= 1_000 {
format!("{:.1}K", n as f64 / 1_000.0)
} else {
n.to_string()
}
}
pub(crate) fn fmt_rate(rows_per_sec: f64) -> String {
if rows_per_sec >= 1e9 {
format!("{:.2} B/s", rows_per_sec / 1e9)
} else if rows_per_sec >= 1e6 {
format!("{:.2} M/s", rows_per_sec / 1e6)
} else if rows_per_sec >= 1e3 {
format!("{:.2} K/s", rows_per_sec / 1e3)
} else {
format!("{rows_per_sec:.0}/s")
}
}
pub(crate) fn fmt_mb(bytes: usize, elapsed_secs: f64) -> String {
if elapsed_secs <= 0.0 {
return "—".to_string();
}
let mb = bytes as f64 / (1024.0 * 1024.0);
format!("{:.1} MB/s", mb / elapsed_secs)
}
pub(crate) fn fmt_size(bytes: usize) -> String {
if bytes >= 1_000_000_000 {
format!("{:.2} GB", bytes as f64 / 1_000_000_000.0)
} else if bytes >= 1_000_000 {
format!("{:.2} MB", bytes as f64 / 1_000_000.0)
} else if bytes >= 1_000 {
format!("{:.2} KB", bytes as f64 / 1_000.0)
} else {
format!("{bytes} B")
}
}
#[derive(Debug, Clone)]
pub(crate) struct HostEnv {
pub os: String,
pub os_version: String,
pub arch: String,
pub cpu_brand: String,
pub cpu_cores_physical: usize,
pub cpu_cores_logical: usize,
pub total_memory_gb: f64,
pub rustc_version: String,
pub hyperdb_api_version: &'static str,
}
impl HostEnv {
pub(crate) fn detect() -> Self {
let mut sys = System::new_all();
sys.refresh_all();
let cpus = sys.cpus();
let cpu_brand = cpus
.first()
.map(|c| c.brand().to_string())
.unwrap_or_default();
let physical = sysinfo::System::physical_core_count().unwrap_or(cpus.len());
let logical = cpus.len();
let total_memory_gb = sys.total_memory() as f64 / (1024.0 * 1024.0 * 1024.0);
HostEnv {
os: System::name().unwrap_or_else(|| "unknown".to_string()),
os_version: System::os_version().unwrap_or_else(|| "unknown".to_string()),
arch: std::env::consts::ARCH.to_string(),
cpu_brand,
cpu_cores_physical: physical,
cpu_cores_logical: logical,
total_memory_gb,
rustc_version: rustc_version_runtime(),
hyperdb_api_version: env!("CARGO_PKG_VERSION"),
}
}
pub(crate) fn short(&self) -> String {
format!(
"{} {} · {} · {}p/{}l · {:.1} GB · {}",
self.os,
self.os_version,
self.cpu_brand,
self.cpu_cores_physical,
self.cpu_cores_logical,
self.total_memory_gb,
self.rustc_version,
)
}
pub(crate) fn markdown(&self) -> String {
format!(
"- **OS:** {} {} ({})\n\
- **CPU:** {} ({} physical / {} logical cores)\n\
- **Memory:** {:.1} GB\n\
- **Rust:** {}\n\
- **hyperdb-api version:** {}\n",
self.os,
self.os_version,
self.arch,
self.cpu_brand,
self.cpu_cores_physical,
self.cpu_cores_logical,
self.total_memory_gb,
self.rustc_version,
self.hyperdb_api_version,
)
}
}
fn rustc_version_runtime() -> String {
if let Ok(out) = std::process::Command::new("rustc")
.arg("--version")
.output()
{
if out.status.success() {
if let Ok(s) = String::from_utf8(out.stdout) {
return s.trim().to_string();
}
}
}
"rustc (unknown)".to_string()
}
#[derive(Debug, Clone)]
pub(crate) struct BenchRecord {
pub workload: String,
pub flavor: &'static str,
pub variant: String,
pub rows: u64,
pub bytes: usize,
pub elapsed_secs: f64,
}
impl BenchRecord {
pub(crate) fn rows_per_sec(&self) -> f64 {
if self.elapsed_secs <= 0.0 {
0.0
} else {
self.rows as f64 / self.elapsed_secs
}
}
pub(crate) fn mb_per_sec(&self) -> f64 {
if self.elapsed_secs <= 0.0 {
0.0
} else {
(self.bytes as f64 / (1024.0 * 1024.0)) / self.elapsed_secs
}
}
}
pub(crate) fn records_to_markdown(records: &[BenchRecord]) -> String {
use std::collections::BTreeMap;
let mut grouped: BTreeMap<(String, String), Vec<&BenchRecord>> = BTreeMap::new();
for r in records {
grouped
.entry((r.workload.clone(), r.variant.clone()))
.or_default()
.push(r);
}
let mut out = String::new();
out.push_str(
"| Workload | Variant | Flavor | Rows | Time (s) | Rows/sec | MB/sec |\n\
|---|---|---|---:|---:|---:|---:|\n",
);
for ((workload, variant), recs) in &grouped {
let mut ordered = recs.clone();
ordered.sort_by_key(|r| match r.flavor {
"sync" => 0,
"async" => 1,
_ => 2,
});
for r in ordered {
let _ = writeln!(
out,
"| {} | {} | {} | {} | {:.3} | {} | {:.1} |\n",
workload,
variant,
r.flavor,
fmt_count(r.rows),
r.elapsed_secs,
fmt_rate(r.rows_per_sec()),
r.mb_per_sec()
);
}
}
out
}
pub(crate) fn records_to_json(records: &[BenchRecord], env: &HostEnv) -> String {
let mut out = String::new();
out.push_str("{\n \"host\": {");
let _ = write!(out, "\"os\": {:?}, ", env.os);
let _ = write!(out, "\"os_version\": {:?}, ", env.os_version);
let _ = write!(out, "\"arch\": {:?}, ", env.arch);
let _ = write!(out, "\"cpu\": {:?}, ", env.cpu_brand);
let _ = write!(out, "\"physical_cores\": {}, ", env.cpu_cores_physical);
let _ = write!(out, "\"logical_cores\": {}, ", env.cpu_cores_logical);
let _ = write!(out, "\"memory_gb\": {:.2}, ", env.total_memory_gb);
let _ = write!(out, "\"rustc\": {:?}, ", env.rustc_version);
let _ = write!(
out,
"\"hyperdb_api_version\": {:?}",
env.hyperdb_api_version
);
out.push_str("},\n \"records\": [\n");
for (i, r) in records.iter().enumerate() {
let _ = writeln!(out, " {{\"workload\": {:?}, \"variant\": {:?}, \"flavor\": {:?}, \"rows\": {}, \"bytes\": {}, \"elapsed_secs\": {:.6}, \"rows_per_sec\": {:.3}, \"mb_per_sec\": {:.3}}}{}\n",
r.workload,
r.variant,
r.flavor,
r.rows,
r.bytes,
r.elapsed_secs,
r.rows_per_sec(),
r.mb_per_sec(),
if i + 1 < records.len() { "," } else { "" });
}
out.push_str(" ]\n}\n");
out
}