use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use anyhow::{Context, Result};
use arrow::array::{
Array, BooleanArray, Float64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
UInt32Array,
};
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use rusqlite::{params, Connection};
use uuid::Uuid;
use super::{schema, BenchFilter, Warehouse};
use crate::bench::{BenchResult, BenchRun, TestOutcome};
pub struct LocalWarehouse {
root: PathBuf,
catalog: Mutex<Connection>,
}
impl LocalWarehouse {
pub fn open(root: &Path) -> Result<Self> {
fs::create_dir_all(root)
.with_context(|| format!("create warehouse root {}", root.display()))?;
let db_path = root.join("catalog.db");
let conn = Connection::open(&db_path)
.with_context(|| format!("open catalog {}", db_path.display()))?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS warehouse_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
repo TEXT NOT NULL,
machine TEXT NOT NULL,
partition_path TEXT NOT NULL,
file_path TEXT NOT NULL UNIQUE,
row_count INTEGER NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_files_table ON warehouse_files(table_name);
CREATE INDEX IF NOT EXISTS idx_files_repo ON warehouse_files(table_name, repo);
CREATE INDEX IF NOT EXISTS idx_files_mach ON warehouse_files(table_name, repo, machine);",
)?;
Ok(Self { root: root.to_path_buf(), catalog: Mutex::new(conn) })
}
fn partition_path(repo: &str, machine: &str, ts: DateTime<Utc>) -> String {
format!(
"repo={repo}/machine={machine}/{}",
ts.format("%Y-%m")
)
}
fn write_parquet(
&self,
table: &str,
repo: &str,
machine: &str,
partition: &str,
run_id: Uuid,
batch: &RecordBatch,
) -> Result<()> {
let dir = self.root.join(table).join(partition);
fs::create_dir_all(&dir)
.with_context(|| format!("mkdir {}", dir.display()))?;
let file_path = dir.join(format!("{run_id}.parquet"));
let file = File::create(&file_path)
.with_context(|| format!("create {}", file_path.display()))?;
let mut writer = ArrowWriter::try_new(file, batch.schema(), None)
.with_context(|| format!("open parquet writer {}", file_path.display()))?;
writer.write(batch)
.with_context(|| format!("write batch {}", file_path.display()))?;
writer.close()
.with_context(|| format!("finalise {}", file_path.display()))?;
let rel = file_path.strip_prefix(&self.root)
.unwrap_or(&file_path)
.to_string_lossy()
.to_string();
let conn = self.catalog.lock().unwrap();
conn.execute(
"INSERT INTO warehouse_files
(table_name, repo, machine, partition_path, file_path, row_count, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)",
params![
table,
repo,
machine,
partition,
rel,
batch.num_rows() as i64,
Utc::now().to_rfc3339(),
],
)?;
Ok(())
}
fn list_files(&self, table: &str, filter: &BenchFilter) -> Result<Vec<PathBuf>> {
let conn = self.catalog.lock().unwrap();
let mut q = String::from(
"SELECT file_path FROM warehouse_files WHERE table_name = ?",
);
let mut args: Vec<String> = vec![table.to_string()];
if let Some(r) = &filter.repo { q.push_str(" AND repo = ?"); args.push(r.clone()); }
if let Some(m) = &filter.machine { q.push_str(" AND machine = ?"); args.push(m.clone()); }
q.push_str(" ORDER BY id ASC");
let mut stmt = conn.prepare(&q)?;
let params_dyn: Vec<&dyn rusqlite::ToSql> =
args.iter().map(|s| s as &dyn rusqlite::ToSql).collect();
let rows = stmt
.query_map(params_dyn.as_slice(), |r| r.get::<_, String>(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;
Ok(rows.into_iter().map(|s| self.root.join(s)).collect())
}
}
impl Warehouse for LocalWarehouse {
fn append_bench_run(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
if run.machine.trim().is_empty() {
anyhow::bail!("BenchRun.machine is required");
}
let run_id = Uuid::new_v4();
let ts = resolve_timestamp(run)?;
let partition = Self::partition_path(repo, &run.machine, ts);
let header = build_bench_runs_batch(run_id, repo, ts, run)?;
self.write_parquet("bench_runs", repo, &run.machine, &partition, run_id, &header)?;
let results = build_bench_results_batch(run_id, run)?;
if results.num_rows() > 0 {
self.write_parquet(
"bench_results", repo, &run.machine, &partition, run_id, &results,
)?;
}
let tests = build_test_outcomes_batch(run_id, run);
if tests.num_rows() > 0 {
self.write_parquet(
"test_outcomes", repo, &run.machine, &partition, run_id, &tests,
)?;
}
Ok(run_id)
}
fn query_bench_runs(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
let header_files = self.list_files("bench_runs", filter)?;
let mut runs: Vec<(Uuid, BenchRun)> = Vec::new();
for path in &header_files {
for run in read_bench_runs_file(path)? {
runs.push(run);
}
}
let mut by_id: std::collections::HashMap<Uuid, BenchRun> =
runs.into_iter().collect();
let res_files = self.list_files("bench_results", filter)?;
for path in &res_files {
for (run_id, name, metric, value) in read_bench_results_file(path)? {
if let Some(run) = by_id.get_mut(&run_id) {
let entry = run.results.iter_mut().find(|r| r.name == name);
let target = if let Some(e) = entry {
e
} else {
run.results.push(BenchResult { name: name.clone(), metrics: Default::default() });
run.results.last_mut().unwrap()
};
target.metrics.insert(
metric,
serde_json::Value::from(value),
);
}
}
}
let test_files = self.list_files("test_outcomes", filter)?;
for path in &test_files {
for (run_id, t) in read_test_outcomes_file(path)? {
if let Some(run) = by_id.get_mut(&run_id) {
run.tests.push(t);
}
}
}
let mut out: Vec<BenchRun> = by_id.into_values().collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = filter.limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
Ok(out)
}
}
fn resolve_timestamp(run: &BenchRun) -> Result<DateTime<Utc>> {
if let Some(s) = &run.timestamp {
return DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.with_context(|| format!("parse timestamp {s}"));
}
let nd = NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.with_context(|| format!("parse date {}", run.date))?;
let ndt: NaiveDateTime = nd.and_hms_opt(0, 0, 0).unwrap();
Ok(Utc.from_utc_datetime(&ndt))
}
fn build_bench_runs_batch(
run_id: Uuid,
repo: &str,
ts: DateTime<Utc>,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = schema::bench_runs();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![run_id.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()])
.with_timezone("UTC"),
),
Arc::new(StringArray::from(vec![run.date.clone()])),
Arc::new(StringArray::from(vec![run.version.clone()])),
Arc::new(StringArray::from(vec![run.machine.clone()])),
Arc::new(UInt32Array::from(vec![run.cores])),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_results_batch(run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = schema::bench_results();
let mut ids = Vec::new();
let mut names = Vec::new();
let mut metrics = Vec::new();
let mut values = Vec::new();
let id_str = run_id.to_string();
for r in &run.results {
for (k, v) in &r.metrics {
let f = match v {
serde_json::Value::Number(n) => n.as_f64(),
_ => continue,
};
if let Some(f) = f {
ids.push(id_str.clone());
names.push(r.name.clone());
metrics.push(k.clone());
values.push(f);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(StringArray::from(metrics)),
Arc::new(Float64Array::from(values)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_test_outcomes_batch(run_id: Uuid, run: &BenchRun) -> RecordBatch {
let s = schema::test_outcomes();
let id_str = run_id.to_string();
let ids: Vec<String> = run.tests.iter().map(|_| id_str.clone()).collect();
let names: Vec<String> = run.tests.iter().map(|t| t.name.clone()).collect();
let passed: Vec<bool> = run.tests.iter().map(|t| t.passed).collect();
let durations: Vec<Option<f64>> = run.tests.iter().map(|t| t.duration_ms).collect();
let messages: Vec<Option<String>> = run.tests.iter().map(|t| t.message.clone()).collect();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(BooleanArray::from(passed)),
Arc::new(Float64Array::from(durations)),
Arc::new(StringArray::from(messages)),
];
RecordBatch::try_new(s, cols).expect("schema matches")
}
fn open_reader(path: &Path) -> Result<parquet::arrow::arrow_reader::ParquetRecordBatchReader> {
let file = File::open(path).with_context(|| format!("open {}", path.display()))?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)
.with_context(|| format!("read parquet {}", path.display()))?;
Ok(builder.build()?)
}
fn read_bench_runs_file(path: &Path) -> Result<Vec<(Uuid, BenchRun)>> {
let mut out = Vec::new();
for batch in open_reader(path)? {
let batch = batch?;
let ids = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let _repos = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let ts = batch.column(2).as_any().downcast_ref::<TimestampMicrosecondArray>().unwrap();
let dates = batch.column(3).as_any().downcast_ref::<StringArray>().unwrap();
let versions = batch.column(4).as_any().downcast_ref::<StringArray>().unwrap();
let machines = batch.column(5).as_any().downcast_ref::<StringArray>().unwrap();
let cores = batch.column(6).as_any().downcast_ref::<UInt32Array>().unwrap();
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
let micros = ts.value(i);
let ts_dt = Utc.timestamp_micros(micros).single()
.context("invalid micro timestamp")?;
out.push((
uid,
BenchRun {
date: dates.value(i).to_string(),
timestamp: Some(ts_dt.to_rfc3339()),
version: versions.value(i).to_string(),
machine: machines.value(i).to_string(),
cores: cores.value(i),
results: Vec::new(),
tests: Vec::new(),
},
));
}
}
Ok(out)
}
fn read_bench_results_file(path: &Path) -> Result<Vec<(Uuid, String, String, f64)>> {
let mut out = Vec::new();
for batch in open_reader(path)? {
let batch = batch?;
let ids = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let names = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let metrics = batch.column(2).as_any().downcast_ref::<StringArray>().unwrap();
let values = batch.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
for i in 0..batch.num_rows() {
out.push((
Uuid::parse_str(ids.value(i))?,
names.value(i).to_string(),
metrics.value(i).to_string(),
values.value(i),
));
}
}
Ok(out)
}
fn read_test_outcomes_file(path: &Path) -> Result<Vec<(Uuid, TestOutcome)>> {
let mut out = Vec::new();
for batch in open_reader(path)? {
let batch = batch?;
let ids = batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
let names = batch.column(1).as_any().downcast_ref::<StringArray>().unwrap();
let passed = batch.column(2).as_any().downcast_ref::<BooleanArray>().unwrap();
let durations = batch.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
let messages = batch.column(4).as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..batch.num_rows() {
out.push((
Uuid::parse_str(ids.value(i))?,
TestOutcome {
name: names.value(i).to_string(),
passed: passed.value(i),
duration_ms: if durations.is_null(i) { None } else { Some(durations.value(i)) },
message: if messages.is_null(i) { None } else { Some(messages.value(i).to_string()) },
},
));
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn sample_run(machine: &str, ops: f64) -> BenchRun {
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(ops));
BenchRun {
date: "2026-05-30".into(),
timestamp: Some("2026-05-30T21:00:00Z".into()),
version: "0.1.0".into(),
machine: machine.into(),
cores: 32,
results: vec![BenchResult { name: "x".into(), metrics }],
tests: vec![TestOutcome { name: "smoke".into(), passed: true, duration_ms: Some(1.5), message: None }],
}
}
#[test]
fn roundtrip_bench_run() {
let dir = tempfile::tempdir().unwrap();
let wh = LocalWarehouse::open(dir.path()).unwrap();
let _id = wh.append_bench_run("holger", &sample_run("ryzen", 123.0)).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
let r = &runs[0];
assert_eq!(r.machine, "ryzen");
assert_eq!(r.results.len(), 1);
assert_eq!(r.results[0].name, "x");
let ops = r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 123.0).abs() < 1e-9);
assert_eq!(r.tests.len(), 1);
assert!(r.tests[0].passed);
}
#[test]
fn filter_by_machine() {
let dir = tempfile::tempdir().unwrap();
let wh = LocalWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("holger", &sample_run("threadripper", 2.0)).unwrap();
let filter = BenchFilter { repo: Some("holger".into()), machine: Some("threadripper".into()), limit: None };
let runs = wh.query_bench_runs(&filter).unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].machine, "threadripper");
}
}