use std::cmp::Ordering;
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use iridium::features::query;
use iridium::features::runtime::{self, ExecuteParams};
use iridium::features::storage::api as storage_api;
#[derive(Clone, Copy)]
enum VectorEncodingKind {
F32,
QuantizedI8,
}
impl VectorEncodingKind {
fn as_str(self) -> &'static str {
match self {
Self::F32 => "f32",
Self::QuantizedI8 => "quantized_i8",
}
}
}
struct BenchmarkConfig {
out_json: PathBuf,
out_md: PathBuf,
nodes: u64,
dimension: usize,
iterations: usize,
limit: usize,
morsel_size: usize,
parallel_workers: usize,
seed: u64,
data_root: PathBuf,
}
struct SizeCase {
encoding: &'static str,
bytes: u64,
}
struct QueryCaseResult {
name: &'static str,
mode: &'static str,
encoding: &'static str,
avg_latency_micros: f64,
p50_latency_micros: u128,
p95_latency_micros: u128,
p99_latency_micros: u128,
avg_scanned_nodes: f64,
rerank_batches_avg: f64,
}
struct QueryCaseStats {
latencies: Vec<u128>,
scanned: Vec<u64>,
rerank_batches: Vec<u64>,
}
struct BenchmarkReport {
config: BenchmarkConfig,
size_cases: Vec<SizeCase>,
query_cases: Vec<QueryCaseResult>,
}
fn main() -> Result<(), String> {
let cfg = load_config()?;
fs::create_dir_all(&cfg.data_root).map_err(|err| err.to_string())?;
if let Some(parent) = cfg.out_json.parent() {
fs::create_dir_all(parent).map_err(|err| err.to_string())?;
}
if let Some(parent) = cfg.out_md.parent() {
fs::create_dir_all(parent).map_err(|err| err.to_string())?;
}
let scan_f32 = run_scan_case(&cfg, VectorEncodingKind::F32)?;
let scan_quantized = run_scan_case(&cfg, VectorEncodingKind::QuantizedI8)?;
let ann_unique = run_ann_case(&cfg, false)?;
let ann_ambiguous = run_ann_case(&cfg, true)?;
let report = BenchmarkReport {
config: cfg,
size_cases: vec![
SizeCase {
encoding: "f32",
bytes: scan_f32.0,
},
SizeCase {
encoding: "quantized_i8",
bytes: scan_quantized.0,
},
],
query_cases: vec![scan_f32.1, scan_quantized.1, ann_unique, ann_ambiguous],
};
fs::write(&report.config.out_json, report.to_json_pretty()).map_err(|err| err.to_string())?;
fs::write(&report.config.out_md, report.to_markdown()).map_err(|err| err.to_string())?;
println!(
"vector_followup_bench_json: {}",
report.config.out_json.display()
);
println!(
"vector_followup_bench_md: {}",
report.config.out_md.display()
);
Ok(())
}
fn load_config() -> Result<BenchmarkConfig, String> {
let artifacts_dir = env::var("ARTIFACTS_DIR").unwrap_or_else(|_| "./artifacts".to_string());
let report_prefix =
env::var("REPORT_PREFIX").unwrap_or_else(|_| "vector_followup_pass2_bench".to_string());
let out_json = env::var("OUT_JSON")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(format!("{artifacts_dir}/{report_prefix}_report.json")));
let out_md = env::var("OUT_MD")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(format!("{artifacts_dir}/{report_prefix}_report.md")));
let data_root = env::var("DATA_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|_| std::env::temp_dir().join(report_prefix.clone()));
Ok(BenchmarkConfig {
out_json,
out_md,
nodes: parse_env_u64("NODES", 4_000)?,
dimension: parse_env_usize("DIMENSION", 64)?,
iterations: parse_env_usize("ITERATIONS", 25)?,
limit: parse_env_usize("LIMIT", 50)?,
morsel_size: parse_env_usize("MORSEL_SIZE", 256)?,
parallel_workers: parse_env_usize("PARALLEL_WORKERS", 0)?,
seed: parse_env_u64("SEED", 7)?,
data_root,
})
}
fn parse_env_u64(name: &str, default: u64) -> Result<u64, String> {
match env::var(name) {
Ok(value) => value
.parse::<u64>()
.map_err(|_| format!("invalid {name}: expected u64")),
Err(_) => Ok(default),
}
}
fn parse_env_usize(name: &str, default: usize) -> Result<usize, String> {
match env::var(name) {
Ok(value) => value
.parse::<usize>()
.map_err(|_| format!("invalid {name}: expected usize")),
Err(_) => Ok(default),
}
}
fn run_scan_case(
cfg: &BenchmarkConfig,
encoding: VectorEncodingKind,
) -> Result<(u64, QueryCaseResult), String> {
let case_dir = cfg.data_root.join(format!("scan_{}", encoding.as_str()));
reset_dir(&case_dir)?;
let mut handle = open_store_in(&case_dir)?;
let query_vector = generate_vector(cfg.seed, 0, cfg.dimension);
for node_id in 1..=cfg.nodes {
storage_api::put_full_node(&mut handle, node_id, 1, &[]).map_err(debug_err)?;
let vector = generate_vector(cfg.seed, node_id, cfg.dimension);
let payload = match encoding {
VectorEncodingKind::F32 => storage_api::encode_vector_payload_f32(
1,
storage_api::VectorMetric::Euclidean,
&vector,
false,
),
VectorEncodingKind::QuantizedI8 => storage_api::encode_vector_payload_quantized_i8(
1,
storage_api::VectorMetric::Euclidean,
&vector,
false,
)?,
};
let delta = storage_api::encode_delta(node_id, 2, &payload);
storage_api::put_vector_delta(&mut handle, &delta).map_err(debug_err)?;
}
storage_api::flush(&mut handle).map_err(debug_err)?;
drop(handle);
let size_bytes = dir_size_bytes(&case_dir)?;
let mut reopened = open_store_in(&case_dir)?;
let query = format!(
"MATCH (n) WHERE vector.euclidean(n.embedding, {}) < 1000000 RETURN n LIMIT {}",
inline_vector_param("q", &query_vector),
cfg.nodes
);
let stats = measure_query_case(
&mut reopened,
&query,
ExecuteParams {
scan_start: 0,
scan_end_exclusive: cfg.nodes + 8,
morsel_size: cfg.morsel_size,
parallel_workers: cfg.parallel_workers,
},
cfg.iterations,
)?;
Ok((
size_bytes,
QueryCaseResult {
name: match encoding {
VectorEncodingKind::F32 => "scan_f32",
VectorEncodingKind::QuantizedI8 => "scan_quantized_i8",
},
mode: "scan",
encoding: encoding.as_str(),
avg_latency_micros: average_u128(&stats.latencies),
p50_latency_micros: percentile_u128(&stats.latencies, 50.0),
p95_latency_micros: percentile_u128(&stats.latencies, 95.0),
p99_latency_micros: percentile_u128(&stats.latencies, 99.0),
avg_scanned_nodes: average_u64(&stats.scanned),
rerank_batches_avg: average_u64(&stats.rerank_batches),
},
))
}
fn run_ann_case(cfg: &BenchmarkConfig, ambiguous: bool) -> Result<QueryCaseResult, String> {
let case_dir = cfg.data_root.join(if ambiguous {
"ann_ambiguous_spaces"
} else {
"ann_unique_space"
});
reset_dir(&case_dir)?;
let mut handle = open_store_in(&case_dir)?;
let query_vector = generate_vector(cfg.seed, 0, cfg.dimension);
for node_id in 1..=cfg.nodes {
storage_api::put_full_node(&mut handle, node_id, 1, &[]).map_err(debug_err)?;
let vector = generate_vector(cfg.seed, node_id, cfg.dimension);
let space_id = if ambiguous && node_id % 2 == 0 { 2 } else { 1 };
let payload = storage_api::encode_vector_payload_f32(
space_id,
storage_api::VectorMetric::Cosine,
&vector,
false,
);
let delta = storage_api::encode_delta(node_id, 2, &payload);
storage_api::put_vector_delta(&mut handle, &delta).map_err(debug_err)?;
}
storage_api::flush(&mut handle).map_err(debug_err)?;
drop(handle);
let mut reopened = open_store_in(&case_dir)?;
let query = format!(
"MATCH (n) WHERE vector.cosine(n.embedding, {}) > -1 RETURN n LIMIT {}",
inline_vector_param("q", &query_vector),
cfg.limit
);
let stats = measure_query_case(
&mut reopened,
&query,
ExecuteParams {
scan_start: 0,
scan_end_exclusive: cfg.nodes + 8,
morsel_size: cfg.morsel_size,
parallel_workers: cfg.parallel_workers,
},
cfg.iterations,
)?;
Ok(QueryCaseResult {
name: if ambiguous {
"cosine_ambiguous_space_scan_fallback"
} else {
"cosine_unique_space_ann"
},
mode: if ambiguous { "scan_fallback" } else { "ann" },
encoding: "f32",
avg_latency_micros: average_u128(&stats.latencies),
p50_latency_micros: percentile_u128(&stats.latencies, 50.0),
p95_latency_micros: percentile_u128(&stats.latencies, 95.0),
p99_latency_micros: percentile_u128(&stats.latencies, 99.0),
avg_scanned_nodes: average_u64(&stats.scanned),
rerank_batches_avg: average_u64(&stats.rerank_batches),
})
}
fn measure_query_case(
handle: &mut storage_api::StorageHandle,
query_text: &str,
params: ExecuteParams,
iterations: usize,
) -> Result<QueryCaseStats, String> {
let ast = query::parse(query_text).map_err(debug_err)?;
let typed = query::validate(&ast, &query::Catalog).map_err(debug_err)?;
let plan = runtime::explain(&typed).map_err(debug_err)?;
let warmup = runtime::execute(&plan, ¶ms, handle).map_err(debug_err)?;
let expected_rows = warmup.rows.len();
let mut latencies = Vec::with_capacity(iterations);
let mut scanned = Vec::with_capacity(iterations);
let mut rerank_batches = Vec::with_capacity(iterations);
for _ in 0..iterations {
let stream = runtime::execute(&plan, ¶ms, handle).map_err(debug_err)?;
if stream.rows.len() != expected_rows {
return Err(format!(
"query row count drifted during benchmark: expected {}, got {}",
expected_rows,
stream.rows.len()
));
}
latencies.push(stream.latency_micros);
scanned.push(stream.scanned_nodes);
rerank_batches.push(stream.rerank_batches);
}
Ok(QueryCaseStats {
latencies,
scanned,
rerank_batches,
})
}
fn open_store_in(base_dir: &Path) -> Result<storage_api::StorageHandle, String> {
storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 128,
wal_dir: base_dir.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base_dir.join("ir.manifest"),
sstable_dir: base_dir.join("sst"),
})
.map_err(debug_err)
}
fn reset_dir(path: &Path) -> Result<(), String> {
if path.exists() {
fs::remove_dir_all(path).map_err(|err| err.to_string())?;
}
fs::create_dir_all(path).map_err(|err| err.to_string())
}
fn dir_size_bytes(path: &Path) -> Result<u64, String> {
let mut total = 0_u64;
for entry in fs::read_dir(path).map_err(|err| err.to_string())? {
let entry = entry.map_err(|err| err.to_string())?;
let meta = entry.metadata().map_err(|err| err.to_string())?;
if meta.is_dir() {
total = total.saturating_add(dir_size_bytes(&entry.path())?);
} else if meta.is_file() {
total = total.saturating_add(meta.len());
}
}
Ok(total)
}
fn generate_vector(seed: u64, salt: u64, dimension: usize) -> Vec<f32> {
let mut state = seed ^ salt.wrapping_mul(0x9E37_79B9_7F4A_7C15);
let mut values = Vec::with_capacity(dimension);
for _ in 0..dimension {
state = state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
let scaled = ((state >> 40) as u32) as f32 / (u32::MAX as f32);
values.push((scaled * 2.0) - 1.0);
}
values
}
fn inline_vector_param(name: &str, values: &[f32]) -> String {
let mut out = format!("${name}");
for value in values {
out.push(':');
out.push_str(&format!("{value:.6}"));
}
out
}
fn average_u128(values: &[u128]) -> f64 {
if values.is_empty() {
return 0.0;
}
let total: f64 = values.iter().map(|value| *value as f64).sum();
total / values.len() as f64
}
fn average_u64(values: &[u64]) -> f64 {
if values.is_empty() {
return 0.0;
}
let total: f64 = values.iter().map(|value| *value as f64).sum();
total / values.len() as f64
}
fn percentile_u128(values: &[u128], pct: f64) -> u128 {
if values.is_empty() {
return 0;
}
let mut sorted = values.to_vec();
sorted.sort_unstable();
let position = (((pct / 100.0) * sorted.len() as f64).ceil() as usize).saturating_sub(1);
sorted[position.min(sorted.len() - 1)]
}
fn debug_err(err: impl std::fmt::Debug) -> String {
format!("{err:?}")
}
impl BenchmarkReport {
fn to_json_pretty(&self) -> String {
let size_rows = self
.size_cases
.iter()
.map(|case| {
format!(
" {{\"encoding\":\"{}\",\"bytes\":{}}}",
case.encoding, case.bytes
)
})
.collect::<Vec<_>>()
.join(",\n");
let query_rows = self
.query_cases
.iter()
.map(|case| {
format!(
concat!(
" {{",
"\"name\":\"{}\",",
"\"mode\":\"{}\",",
"\"encoding\":\"{}\",",
"\"avg_latency_micros\":{:.2},",
"\"p50_latency_micros\":{},",
"\"p95_latency_micros\":{},",
"\"p99_latency_micros\":{},",
"\"avg_scanned_nodes\":{:.2},",
"\"rerank_batches_avg\":{:.2}",
"}}"
),
case.name,
case.mode,
case.encoding,
case.avg_latency_micros,
case.p50_latency_micros,
case.p95_latency_micros,
case.p99_latency_micros,
case.avg_scanned_nodes,
case.rerank_batches_avg
)
})
.collect::<Vec<_>>()
.join(",\n");
format!(
concat!(
"{{\n",
" \"config\": {{\n",
" \"nodes\": {},\n",
" \"dimension\": {},\n",
" \"iterations\": {},\n",
" \"limit\": {},\n",
" \"morsel_size\": {},\n",
" \"parallel_workers\": {},\n",
" \"seed\": {},\n",
" \"data_root\": \"{}\"\n",
" }},\n",
" \"size_cases\": [\n",
"{}\n",
" ],\n",
" \"query_cases\": [\n",
"{}\n",
" ]\n",
"}}\n"
),
self.config.nodes,
self.config.dimension,
self.config.iterations,
self.config.limit,
self.config.morsel_size,
self.config.parallel_workers,
self.config.seed,
self.config.data_root.display(),
size_rows,
query_rows
)
}
fn to_markdown(&self) -> String {
let f32_bytes = self
.size_cases
.iter()
.find(|case| case.encoding == "f32")
.map(|case| case.bytes)
.unwrap_or(0);
let quantized_bytes = self
.size_cases
.iter()
.find(|case| case.encoding == "quantized_i8")
.map(|case| case.bytes)
.unwrap_or(0);
let size_ratio = if f32_bytes > 0 {
quantized_bytes as f64 / f32_bytes as f64
} else {
0.0
};
let scan_f32 = self
.query_cases
.iter()
.find(|case| case.name == "scan_f32")
.unwrap();
let scan_q = self
.query_cases
.iter()
.find(|case| case.name == "scan_quantized_i8")
.unwrap();
let ann_unique = self
.query_cases
.iter()
.find(|case| case.name == "cosine_unique_space_ann")
.unwrap();
let ann_fallback = self
.query_cases
.iter()
.find(|case| case.name == "cosine_ambiguous_space_scan_fallback")
.unwrap();
let quantized_scan_ratio = if scan_f32.avg_latency_micros > 0.0 {
scan_q.avg_latency_micros / scan_f32.avg_latency_micros
} else {
0.0
};
let ann_latency_ratio = if ann_unique.avg_latency_micros > 0.0 {
ann_fallback.avg_latency_micros / ann_unique.avg_latency_micros
} else {
0.0
};
let ann_scan_ratio = if ann_unique.avg_scanned_nodes > 0.0 {
ann_fallback.avg_scanned_nodes / ann_unique.avg_scanned_nodes
} else {
0.0
};
format!(
concat!(
"# Vector Follow-Up Pass 2 Benchmark Report\n\n",
"- nodes: {}\n",
"- dimension: {}\n",
"- iterations: {}\n",
"- limit: {}\n\n",
"## Storage Size\n",
"- structured_f32_bytes: {}\n",
"- structured_quantized_i8_bytes: {}\n",
"- quantized_over_f32_ratio: {:.4}\n\n",
"## Scan Latency\n",
"- f32_avg_latency_us: {:.2}\n",
"- quantized_i8_avg_latency_us: {:.2}\n",
"- quantized_over_f32_latency_ratio: {:.4}\n",
"- f32_p95_latency_us: {}\n",
"- quantized_i8_p95_latency_us: {}\n\n",
"## ANN Routing\n",
"- unique_space_ann_avg_latency_us: {:.2}\n",
"- ambiguous_space_scan_fallback_avg_latency_us: {:.2}\n",
"- fallback_over_ann_latency_ratio: {:.4}\n",
"- unique_space_ann_avg_scanned_nodes: {:.2}\n",
"- ambiguous_space_scan_fallback_avg_scanned_nodes: {:.2}\n",
"- fallback_over_ann_scanned_ratio: {:.4}\n"
),
self.config.nodes,
self.config.dimension,
self.config.iterations,
self.config.limit,
f32_bytes,
quantized_bytes,
size_ratio,
scan_f32.avg_latency_micros,
scan_q.avg_latency_micros,
quantized_scan_ratio,
scan_f32.p95_latency_micros,
scan_q.p95_latency_micros,
ann_unique.avg_latency_micros,
ann_fallback.avg_latency_micros,
ann_latency_ratio,
ann_unique.avg_scanned_nodes,
ann_fallback.avg_scanned_nodes,
ann_scan_ratio
)
}
}
impl PartialEq for QueryCaseResult {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
impl Eq for QueryCaseResult {}
impl PartialOrd for QueryCaseResult {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for QueryCaseResult {
fn cmp(&self, other: &Self) -> Ordering {
self.name.cmp(other.name)
}
}