use std::path::{Path, PathBuf};
use std::time::Instant;
use iridium::features::runtime;
use iridium::features::storage::api as storage_api;
fn temp_dir(prefix: &str) -> PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
prefix,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
);
dir.push(stamp);
std::fs::create_dir_all(&dir).expect("create temp dir");
dir
}
fn percentile(mut values: Vec<u128>, p: f64) -> u128 {
if values.is_empty() {
return 0;
}
values.sort_unstable();
let idx = (((values.len() - 1) as f64) * p).ceil() as usize;
values[idx.min(values.len() - 1)]
}
fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(default)
}
fn env_f64(name: &str, default: f64) -> f64 {
std::env::var(name)
.ok()
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(default)
}
fn write_artifact(path: &Path, content: &str) {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).expect("create artifact dir");
}
std::fs::write(path, content).expect("write artifact");
}
#[test]
fn thread_per_core_gate() {
let nodes = env_u64("NODES", 4000);
let iterations = env_u64("ITERATIONS", 20);
let limit = env_u64("LIMIT", 500);
let max_shard_local_p95_us = env_u64("MAX_SHARD_LOCAL_P95_US", 50_000);
let max_cross_shard_p95_us = env_u64("MAX_CROSS_SHARD_P95_US", 100_000);
let min_fanout_throughput_ratio = env_f64("MIN_FANOUT_THROUGHPUT_RATIO", 0.4);
let out_json = std::env::var("OUT_JSON")
.unwrap_or_else(|_| "artifacts/thread_per_core_gate_report.json".to_string());
let out_md = std::env::var("OUT_MD")
.unwrap_or_else(|_| "artifacts/thread_per_core_gate_report.md".to_string());
let base = temp_dir("thread_per_core_gate");
let request0 = storage_api::ThreadCoreRequest {
core_id: 0,
shard_count: 2,
};
let request1 = storage_api::ThreadCoreRequest {
core_id: 1,
shard_count: 2,
};
let lanes = storage_api::ThreadCoreLaneConfig::default();
let config0 = storage_api::StorageConfig {
buffer_pool_pages: 64,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
};
let config1 = storage_api::StorageConfig {
buffer_pool_pages: 64,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
};
let mut handle0 =
storage_api::open_store_for_request(config0, &request0, &lanes).expect("open shard 0");
let mut handle1 =
storage_api::open_store_for_request(config1, &request1, &lanes).expect("open shard 1");
for node_id in 1..=nodes {
if storage_api::request_owns_node(&request0, node_id) {
storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1])
.expect("put shard0");
} else {
storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1])
.expect("put shard1");
}
}
storage_api::flush(&mut handle0).expect("flush shard0");
storage_api::flush(&mut handle1).expect("flush shard1");
let typed = iridium::features::query::validate(
&iridium::features::query::parse(&format!("MATCH (n) RETURN n LIMIT {}", limit))
.expect("parse"),
&iridium::features::query::Catalog,
)
.expect("validate");
let plan = runtime::explain(&typed).expect("explain");
let params = runtime::ExecuteParams {
scan_start: 1,
scan_end_exclusive: nodes + 1,
morsel_size: 128,
parallel_workers: 1,
};
let mut shard_local_lat = Vec::new();
let mut cross_shard_lat = Vec::new();
let start_local = Instant::now();
for _ in 0..iterations {
let out = runtime::execute_with_request(&plan, ¶ms, &mut handle0, &request0)
.expect("local execute");
shard_local_lat.push(out.latency_micros);
}
let local_elapsed = start_local.elapsed().as_secs_f64();
let local_ops_per_sec = if local_elapsed > 0.0 {
iterations as f64 / local_elapsed
} else {
0.0
};
let start_fanout = Instant::now();
for _ in 0..iterations {
let mut shards = vec![
runtime::FanoutShardExecution {
request: request0.clone(),
handle: &mut handle0,
},
runtime::FanoutShardExecution {
request: request1.clone(),
handle: &mut handle1,
},
];
let out = runtime::execute_fanout(&plan, ¶ms, &mut shards).expect("fanout execute");
cross_shard_lat.push(out.latency_micros);
}
let fanout_elapsed = start_fanout.elapsed().as_secs_f64();
let fanout_ops_per_sec = if fanout_elapsed > 0.0 {
iterations as f64 / fanout_elapsed
} else {
0.0
};
let ids_a = {
let mut shards = vec![
runtime::FanoutShardExecution {
request: request0.clone(),
handle: &mut handle0,
},
runtime::FanoutShardExecution {
request: request1.clone(),
handle: &mut handle1,
},
];
runtime::execute_fanout(&plan, ¶ms, &mut shards)
.expect("fanout order a")
.rows
.into_iter()
.map(|row| row.node_id)
.collect::<Vec<u64>>()
};
let ids_b = {
let mut shards = vec![
runtime::FanoutShardExecution {
request: request1.clone(),
handle: &mut handle1,
},
runtime::FanoutShardExecution {
request: request0.clone(),
handle: &mut handle0,
},
];
runtime::execute_fanout(&plan, ¶ms, &mut shards)
.expect("fanout order b")
.rows
.into_iter()
.map(|row| row.node_id)
.collect::<Vec<u64>>()
};
let deterministic_merge_pass = ids_a == ids_b;
let shard_local_p95 = percentile(shard_local_lat, 0.95);
let cross_shard_p95 = percentile(cross_shard_lat, 0.95);
let shard_local_p95_pass = shard_local_p95 <= max_shard_local_p95_us as u128;
let cross_shard_p95_pass = cross_shard_p95 <= max_cross_shard_p95_us as u128;
let throughput_ratio = if local_ops_per_sec > 0.0 {
fanout_ops_per_sec / local_ops_per_sec
} else {
0.0
};
let throughput_pass = throughput_ratio >= min_fanout_throughput_ratio;
let passed =
deterministic_merge_pass && shard_local_p95_pass && cross_shard_p95_pass && throughput_pass;
let json = format!(
concat!(
"{{\n",
" \"gate\": \"thread_per_core_gate\",\n",
" \"passed\": {},\n",
" \"config\": {{\"nodes\": {}, \"iterations\": {}, \"limit\": {}}},\n",
" \"metrics\": {{\n",
" \"shard_local_p95_us\": {},\n",
" \"cross_shard_p95_us\": {},\n",
" \"local_ops_per_sec\": {:.3},\n",
" \"fanout_ops_per_sec\": {:.3},\n",
" \"fanout_throughput_ratio\": {:.4}\n",
" }},\n",
" \"checks\": {{\n",
" \"deterministic_merge_pass\": {},\n",
" \"shard_local_p95_pass\": {},\n",
" \"cross_shard_p95_pass\": {},\n",
" \"throughput_pass\": {}\n",
" }}\n",
"}}\n"
),
passed,
nodes,
iterations,
limit,
shard_local_p95,
cross_shard_p95,
local_ops_per_sec,
fanout_ops_per_sec,
throughput_ratio,
deterministic_merge_pass,
shard_local_p95_pass,
cross_shard_p95_pass,
throughput_pass
);
let md = format!(
concat!(
"# Thread-Per-Core Gate\n\n",
"- passed: {}\n",
"- nodes: {}\n",
"- iterations: {}\n",
"- limit: {}\n\n",
"## Metrics\n",
"- shard_local_p95_us: {}\n",
"- cross_shard_p95_us: {}\n",
"- local_ops_per_sec: {:.3}\n",
"- fanout_ops_per_sec: {:.3}\n",
"- fanout_throughput_ratio: {:.4}\n\n",
"## Checks\n",
"- deterministic_merge_pass: {}\n",
"- shard_local_p95_pass: {}\n",
"- cross_shard_p95_pass: {}\n",
"- throughput_pass: {}\n"
),
passed,
nodes,
iterations,
limit,
shard_local_p95,
cross_shard_p95,
local_ops_per_sec,
fanout_ops_per_sec,
throughput_ratio,
deterministic_merge_pass,
shard_local_p95_pass,
cross_shard_p95_pass,
throughput_pass
);
write_artifact(Path::new(&out_json), &json);
write_artifact(Path::new(&out_md), &md);
assert!(passed, "thread-per-core gate failed; see {}", out_json);
}