use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use netsky_db::{Db, Direction, McpToolCallRecord, MessageRecord};
use tempfile::TempDir;
use turso::{Builder, Connection, Value, params_from_iter};
const WRITE_SECONDS: u64 = 3;
const READ_SECONDS: u64 = 5;
const WRITE_WORKERS: usize = 16;
const READ_WORKERS: usize = 4;
const READ_ROWS_PER_TABLE: usize = 20_000;
const READ_QUERIES: [&str; 4] = [
"SELECT source, direction, COUNT(*) AS n FROM messages GROUP BY source, direction",
"SELECT runtime, SUM(input_tokens) AS ins, SUM(output_tokens) AS outs FROM token_usage GROUP BY runtime",
"SELECT event_type, COUNT(*) AS n FROM iroh_events GROUP BY event_type",
"SELECT source, tool, COUNT(*) AS n, AVG(duration_ms) AS avg_ms FROM mcp_tool_calls GROUP BY source, tool",
];
#[test]
#[ignore = "load bench"]
fn load_profile() -> Result<(), Box<dyn std::error::Error>> {
println!("# netsky-db load bench");
println!();
println!("write duration seconds: {WRITE_SECONDS}");
println!("read duration seconds: {READ_SECONDS}");
println!("read rows per populated table: {READ_ROWS_PER_TABLE}");
println!();
println!("## writer-heavy");
println!();
println!(
"| target ops/s | persisted ops/s | rows persisted | rows attempted | spool rows | p50 ms | p99 ms | failure |"
);
println!("| --- | --- | --- | --- | --- | --- | --- | --- |");
for target_rate in [1_000_u64, 10_000] {
let sample = run_write_workload(target_rate)?;
println!(
"| {} | {:.1} | {} | {} | {} | {:.3} | {:.3} | {} |",
target_rate,
sample.persisted_throughput,
sample.persisted_rows,
sample.attempted_rows,
sample.spool_rows,
sample.p50_ms,
sample.p99_ms,
sample.failure.as_deref().unwrap_or("none")
);
}
println!();
println!("## reader-heavy");
println!();
println!("queries: mixed analytics over messages, token_usage, iroh_events, mcp_tool_calls");
println!();
println!("| target qps | achieved qps | queries attempted | p50 ms | p99 ms | failure |");
println!("| --- | --- | --- | --- | --- | --- |");
for target_qps in [1_u64, 2, 4, 8] {
let sample = run_read_workload(target_qps)?;
println!(
"| {} | {:.2} | {} | {:.3} | {:.3} | {} |",
target_qps,
sample.achieved_qps,
sample.query_count,
sample.p50_ms,
sample.p99_ms,
sample.failure.as_deref().unwrap_or("none")
);
if sample.failure.is_some() {
break;
}
}
Ok(())
}
struct WriteSample {
attempted_rows: usize,
persisted_rows: usize,
spool_rows: usize,
persisted_throughput: f64,
p50_ms: f64,
p99_ms: f64,
failure: Option<String>,
}
struct ReadSample {
query_count: usize,
achieved_qps: f64,
p50_ms: f64,
p99_ms: f64,
failure: Option<String>,
}
fn run_write_workload(target_rate: u64) -> Result<WriteSample, Box<dyn std::error::Error>> {
let temp = TempEnv::new()?;
let db_path = temp.path().join("meta.db");
let db = Db::open_path(&db_path)?;
db.migrate()?;
let start_count = persisted_write_rows(&db_path)?;
let total_ops = (target_rate * WRITE_SECONDS) as usize;
let start = Instant::now();
let next_op = Arc::new(AtomicU64::new(0));
let mut workers = Vec::with_capacity(WRITE_WORKERS);
for worker_id in 0..WRITE_WORKERS {
let path = db_path.clone();
let next_op = Arc::clone(&next_op);
workers.push(thread::spawn(move || {
write_worker(path, worker_id, total_ops, target_rate, start, next_op)
}));
}
let mut latencies_ns = Vec::with_capacity(total_ops);
for worker in workers {
let worker_latencies = worker
.join()
.map_err(|_| "write worker panicked")?
.map_err(|error| error.to_string())?;
latencies_ns.extend(worker_latencies);
}
let elapsed = start.elapsed();
let end_count = persisted_write_rows(&db_path)?;
let persisted_rows = end_count.saturating_sub(start_count);
let attempted_rows = latencies_ns.len();
let spool_rows = spool_rows(temp.home());
let failure = if persisted_rows < attempted_rows {
Some("silent write loss to JSONL spool".to_string())
} else if percentile_ms(&latencies_ns, 0.99) > 1_000.0 {
Some("tail latency above 1s".to_string())
} else {
None
};
Ok(WriteSample {
attempted_rows,
persisted_rows,
spool_rows,
persisted_throughput: persisted_rows as f64 / elapsed.as_secs_f64(),
p50_ms: percentile_ms(&latencies_ns, 0.50),
p99_ms: percentile_ms(&latencies_ns, 0.99),
failure,
})
}
fn write_worker(
db_path: PathBuf,
worker_id: usize,
total_ops: usize,
target_rate: u64,
start: Instant,
next_op: Arc<AtomicU64>,
) -> Result<Vec<u64>, Box<dyn std::error::Error + Send + Sync>> {
let db = Db::open_path(&db_path)?;
let mut latencies = Vec::new();
loop {
let seq = next_op.fetch_add(1, Ordering::Relaxed) as usize;
if seq >= total_ops {
return Ok(latencies);
}
let scheduled = pace(start, seq as u64, target_rate);
let ts = sample_ts(seq);
match seq % 10 {
0..=5 => {
db.record_tick(ts, "ticker", r#"{"beat":1}"#)?;
}
6..=8 => {
db.record_message(MessageRecord {
ts_utc: ts,
source: "agent",
direction: if seq.is_multiple_of(2) {
Direction::Inbound
} else {
Direction::Outbound
},
chat_id: Some("bench-chat"),
from_agent: Some("agent1"),
to_agent: Some("agent0"),
body: Some("bench message"),
raw_json: None,
})?;
}
_ => {
db.record_mcp_tool_call(McpToolCallRecord {
ts_utc_start: ts,
ts_utc_end: Some(ts + chrono::Duration::milliseconds(12)),
source: "drive",
tool: "list",
agent: Some("agent0"),
duration_ms: Some(12),
success: true,
error: None,
timeout_race: false,
request_json: Some(r#"{"page":1}"#),
response_json: Some(r#"{"count":1}"#),
})?;
}
}
let _ = worker_id;
latencies.push(Instant::now().duration_since(scheduled).as_nanos() as u64);
}
}
fn run_read_workload(target_qps: u64) -> Result<ReadSample, Box<dyn std::error::Error>> {
let temp = TempEnv::new()?;
let db_path = temp.path().join("meta.db");
let db = Db::open_path(&db_path)?;
db.migrate()?;
let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(seed_read_dataset(&db_path, READ_ROWS_PER_TABLE))?;
let total_queries = (target_qps * READ_SECONDS) as usize;
let start = Instant::now();
let next_query = Arc::new(AtomicU64::new(0));
let mut workers = Vec::with_capacity(READ_WORKERS);
for worker_id in 0..READ_WORKERS {
let path = db_path.clone();
let next_query = Arc::clone(&next_query);
workers.push(thread::spawn(move || {
read_worker(
path,
worker_id,
total_queries,
target_qps,
start,
next_query,
)
}));
}
let mut latencies_ns = Vec::with_capacity(total_queries);
let mut first_error = None;
for worker in workers {
match worker.join().map_err(|_| "read worker panicked")? {
Ok(worker_latencies) => latencies_ns.extend(worker_latencies),
Err(error) => {
first_error.get_or_insert(error.to_string());
}
}
}
let elapsed = start.elapsed();
let p99_ms = percentile_ms(&latencies_ns, 0.99);
let failure = first_error.or_else(|| {
if p99_ms > 5_000.0 {
Some("p99 above 5s".to_string())
} else {
None
}
});
Ok(ReadSample {
query_count: latencies_ns.len(),
achieved_qps: latencies_ns.len() as f64 / elapsed.as_secs_f64(),
p50_ms: percentile_ms(&latencies_ns, 0.50),
p99_ms,
failure,
})
}
fn read_worker(
db_path: PathBuf,
worker_id: usize,
total_queries: usize,
target_qps: u64,
start: Instant,
next_query: Arc<AtomicU64>,
) -> Result<Vec<u64>, Box<dyn std::error::Error + Send + Sync>> {
let db = Db::open_path(&db_path)?;
let mut latencies = Vec::new();
loop {
let seq = next_query.fetch_add(1, Ordering::Relaxed) as usize;
if seq >= total_queries {
return Ok(latencies);
}
let scheduled = pace(start, seq as u64, target_qps);
let sql = READ_QUERIES[(seq + worker_id) % READ_QUERIES.len()];
db.query_batches(sql)?;
latencies.push(Instant::now().duration_since(scheduled).as_nanos() as u64);
}
}
fn pace(start: Instant, seq: u64, rate: u64) -> Instant {
let target = start + Duration::from_secs_f64(seq as f64 / rate as f64);
if let Some(delay) = target.checked_duration_since(Instant::now())
&& !delay.is_zero()
{
thread::sleep(delay);
}
target
}
fn sample_ts(seq: usize) -> DateTime<Utc> {
DateTime::from_timestamp(1_710_000_000 + seq as i64, 0).unwrap_or_else(Utc::now)
}
fn percentile_ms(values_ns: &[u64], percentile: f64) -> f64 {
if values_ns.is_empty() {
return 0.0;
}
let mut values = values_ns.to_vec();
values.sort_unstable();
let index = (((values.len() - 1) as f64) * percentile).round() as usize;
values[index] as f64 / 1_000_000.0
}
fn persisted_write_rows(path: &Path) -> Result<usize, Box<dyn std::error::Error>> {
let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(async move {
let conn = open_conn(path).await?;
let mut total = 0_usize;
for table in ["ticks", "messages", "mcp_tool_calls"] {
total += count_rows(&conn, table).await?;
}
Ok(total)
})
}
async fn seed_read_dataset(
path: &Path,
rows_per_table: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let conn = open_conn(path).await?;
insert_rows(&conn, "messages", rows_per_table, |id| {
serde_json::json!({
"id": id,
"ts_utc": format!("2026-04-17T{:02}:00:00Z", id % 24),
"source": if id % 2 == 0 { "agent" } else { "imessage" },
"direction": if id % 3 == 0 { "inbound" } else { "outbound" },
"chat_id": format!("chat-{}", id % 128),
"from_agent": format!("agent{}", id % 8),
"to_agent": "agent0",
"body": format!("message body {}", id),
"raw_json": null,
})
})
.await?;
insert_rows(&conn, "token_usage", rows_per_table, |id| {
serde_json::json!({
"id": id,
"ts_utc": format!("2026-04-17T{:02}:00:00Z", id % 24),
"session_id": format!("session-{}", id % 4096),
"agent": format!("agent{}", id % 8),
"runtime": if id % 2 == 0 { "claude" } else { "codex" },
"model": if id % 2 == 0 { "claude-opus" } else { "gpt-5.4" },
"input_tokens": 10_000 + (id % 1000),
"output_tokens": 700 + (id % 250),
"cached_input_tokens": 4_000 + (id % 1000),
"cost_usd_micros": 25_000 + (id % 500),
"detail_json": "{\"source\":\"bench\"}",
})
})
.await?;
insert_rows(&conn, "iroh_events", rows_per_table, |id| {
serde_json::json!({
"id": id,
"ts_utc": format!("2026-04-17T{:02}:00:00Z", id % 24),
"event_type": match id % 4 {
0 => "connect",
1 => "evict",
2 => "reconnect",
_ => "handshake_refused",
},
"peer_id_hash": format!("{:016x}", id),
"peer_label": format!("peer-{}", id % 32),
"detail_json": "{\"source\":\"bench\"}",
})
})
.await?;
insert_rows(&conn, "mcp_tool_calls", rows_per_table, |id| {
serde_json::json!({
"id": id,
"ts_utc_start": format!("2026-04-17T{:02}:00:00Z", id % 24),
"ts_utc_end": format!("2026-04-17T{:02}:00:01Z", id % 24),
"source": if id % 2 == 0 { "drive" } else { "calendar" },
"tool": if id % 2 == 0 { "list" } else { "insert" },
"agent": format!("agent{}", id % 8),
"duration_ms": 50 + (id % 300),
"success": id % 16 != 0,
"error": if id % 16 == 0 { "timeout" } else { "" },
"timeout_race": id % 32 == 0,
"request_json": "{\"page\":1}",
"response_json": "{\"items\":1}",
})
})
.await?;
Ok(())
}
async fn insert_rows<F>(
conn: &Connection,
table: &str,
count: usize,
row_json: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(i64) -> serde_json::Value,
{
for id in 1..=count as i64 {
conn.execute(
&format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)"),
params_from_iter([Value::from(id), Value::from(row_json(id).to_string())]),
)
.await?;
}
Ok(())
}
async fn count_rows(conn: &Connection, table: &str) -> Result<usize, Box<dyn std::error::Error>> {
let mut rows = conn
.query(&format!("SELECT COUNT(*) FROM {table}"), ())
.await?;
let row = rows.next().await?.ok_or("missing count row")?;
let value = row.get_value(0)?;
let count = value.as_integer().copied().ok_or("count not integer")?;
Ok(count as usize)
}
async fn open_conn(path: &Path) -> Result<Connection, Box<dyn std::error::Error>> {
let db = Builder::new_local(&path.to_string_lossy()).build().await?;
let conn = db.connect()?;
conn.busy_timeout(Duration::from_secs(10))?;
Ok(conn)
}
fn spool_rows(home: &Path) -> usize {
let dir = home.join(".netsky").join("logs");
let Ok(entries) = fs::read_dir(dir) else {
return 0;
};
let mut total = 0;
for entry in entries.flatten() {
let path = entry.path();
if !path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.starts_with("meta-db-errors-") && name.ends_with(".jsonl"))
{
continue;
}
let Ok(text) = fs::read_to_string(path) else {
continue;
};
total += text.lines().count();
}
total
}
struct TempEnv {
root: TempDir,
home: PathBuf,
old_home: Option<String>,
}
impl TempEnv {
fn new() -> Result<Self, Box<dyn std::error::Error>> {
let root = tempfile::tempdir()?;
let home = root.path().join("home");
fs::create_dir_all(&home)?;
let old_home = std::env::var("HOME").ok();
unsafe {
std::env::set_var("HOME", &home);
}
Ok(Self {
root,
home,
old_home,
})
}
fn path(&self) -> &Path {
self.root.path()
}
fn home(&self) -> &Path {
&self.home
}
}
impl Drop for TempEnv {
fn drop(&mut self) {
if let Some(home) = &self.old_home {
unsafe {
std::env::set_var("HOME", home);
}
}
}
}