iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, Instant};

use iridium::features::query;
use iridium::features::runtime::{self, ExecuteParams};
use iridium::features::storage::api as storage_api;

fn main() -> Result<(), String> {
    let cfg = ProfileConfig::from_env()?;
    let case = env::args()
        .nth(1)
        .ok_or_else(|| "missing profile case".to_string())
        .and_then(|value| {
            ProfileCase::from_str(&value).ok_or_else(|| format!("unknown profile case '{value}'"))
        })?;

    let dir = cfg.data_root.join(case.dir_name());
    let query_text = case.query_text(&cfg);
    let mut handle = open_store_in(&dir)?;
    let ast = query::parse(&query_text).map_err(debug_err)?;
    let typed = query::validate(&ast, &query::Catalog).map_err(debug_err)?;
    let plan = runtime::explain(&typed).map_err(debug_err)?;
    let params = ExecuteParams {
        scan_start: 0,
        scan_end_exclusive: cfg.nodes + 8,
        morsel_size: cfg.morsel_size,
        parallel_workers: cfg.parallel_workers,
    };
    let warmup = runtime::execute(&plan, &params, &mut handle).map_err(debug_err)?;
    let expected_rows = warmup.rows.len();

    if let Some(ready_file) = cfg.ready_file.as_ref() {
        if let Some(parent) = ready_file.parent() {
            fs::create_dir_all(parent).map_err(|err| err.to_string())?;
        }
        fs::write(ready_file, b"ready\n").map_err(|err| err.to_string())?;
    }
    wait_for_start_signal(cfg.start_file.as_deref())?;

    let deadline = Instant::now() + Duration::from_secs(cfg.profile_seconds.max(1));
    let mut iterations = 0usize;
    let mut latencies = Vec::new();
    let mut scanned_nodes = Vec::new();
    let mut rerank_batches = Vec::new();
    while Instant::now() < deadline {
        let stream = runtime::execute(&plan, &params, &mut handle).map_err(debug_err)?;
        if stream.rows.len() != expected_rows {
            return Err(format!(
                "row count drift in {}: expected {}, got {}",
                case.as_str(),
                expected_rows,
                stream.rows.len()
            ));
        }
        iterations += 1;
        latencies.push(stream.latency_micros);
        scanned_nodes.push(stream.scanned_nodes);
        rerank_batches.push(stream.rerank_batches);
    }

    println!(
        concat!(
            "{{",
            "\"case\":\"{}\",",
            "\"iterations\":{},",
            "\"avg_latency_micros\":{:.2},",
            "\"p50_latency_micros\":{},",
            "\"p95_latency_micros\":{},",
            "\"avg_scanned_nodes\":{:.2},",
            "\"rerank_batches_avg\":{:.2}",
            "}}"
        ),
        case.as_str(),
        iterations,
        average_u128(&latencies),
        percentile_u128(&latencies, 50.0),
        percentile_u128(&latencies, 95.0),
        average_u64(&scanned_nodes),
        average_u64(&rerank_batches),
    );
    Ok(())
}

#[derive(Clone, Copy)]
enum ProfileCase {
    RuntimeUniqueAnn,
    RuntimeAmbiguousFallback,
    RuntimeScanF32,
    RuntimeScanQuantizedI8,
}

impl ProfileCase {
    fn from_str(value: &str) -> Option<Self> {
        match value {
            "runtime_unique_ann" => Some(Self::RuntimeUniqueAnn),
            "runtime_ambiguous_fallback" => Some(Self::RuntimeAmbiguousFallback),
            "runtime_scan_f32" => Some(Self::RuntimeScanF32),
            "runtime_scan_quantized_i8" => Some(Self::RuntimeScanQuantizedI8),
            _ => None,
        }
    }

    fn as_str(self) -> &'static str {
        match self {
            Self::RuntimeUniqueAnn => "runtime_unique_ann",
            Self::RuntimeAmbiguousFallback => "runtime_ambiguous_fallback",
            Self::RuntimeScanF32 => "runtime_scan_f32",
            Self::RuntimeScanQuantizedI8 => "runtime_scan_quantized_i8",
        }
    }

    fn dir_name(self) -> &'static str {
        self.as_str()
    }

    fn query_text(self, cfg: &ProfileConfig) -> String {
        match self {
            Self::RuntimeUniqueAnn | Self::RuntimeAmbiguousFallback => format!(
                "MATCH (n) WHERE vector.cosine(n.embedding, {}) > -1 RETURN n LIMIT {}",
                inline_vector_param("q", &generate_vector(cfg.seed, 0, cfg.dimension)),
                cfg.limit
            ),
            Self::RuntimeScanF32 | Self::RuntimeScanQuantizedI8 => format!(
                "MATCH (n) WHERE vector.euclidean(n.embedding, {}) < 1000000 RETURN n LIMIT {}",
                inline_vector_param("q", &generate_vector(cfg.seed, 0, cfg.dimension)),
                cfg.nodes
            ),
        }
    }
}

struct ProfileConfig {
    nodes: u64,
    dimension: usize,
    limit: usize,
    morsel_size: usize,
    parallel_workers: usize,
    seed: u64,
    profile_seconds: u64,
    data_root: PathBuf,
    ready_file: Option<PathBuf>,
    start_file: Option<PathBuf>,
}

impl ProfileConfig {
    fn from_env() -> Result<Self, String> {
        Ok(Self {
            nodes: parse_env_u64("BENCH_NODES", 20_000)?,
            dimension: parse_env_usize("BENCH_DIMENSION", 64)?,
            limit: parse_env_usize("BENCH_LIMIT", 50)?,
            morsel_size: parse_env_usize("BENCH_MORSEL_SIZE", 256)?,
            parallel_workers: parse_env_usize("BENCH_PARALLEL_WORKERS", 0)?,
            seed: parse_env_u64("BENCH_SEED", 7)?,
            profile_seconds: parse_env_u64("BENCH_STEADY_STATE_SECONDS", 10)?,
            data_root: env::var("BENCH_DATA_ROOT")
                .map(PathBuf::from)
                .unwrap_or_else(|_| std::env::temp_dir().join("iridium-vector-ann-bench")),
            ready_file: env::var("PROFILE_READY_FILE").ok().map(PathBuf::from),
            start_file: env::var("PROFILE_START_FILE").ok().map(PathBuf::from),
        })
    }
}

fn wait_for_start_signal(start_file: Option<&Path>) -> Result<(), String> {
    let Some(start_file) = start_file else {
        return Ok(());
    };
    while !start_file.exists() {
        thread::sleep(Duration::from_millis(10));
    }
    Ok(())
}

fn open_store_in(base_dir: &Path) -> Result<storage_api::StorageHandle, String> {
    storage_api::open_store(storage_api::StorageConfig {
        buffer_pool_pages: 128,
        wal_dir: base_dir.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base_dir.join("ir.manifest"),
        sstable_dir: base_dir.join("sst"),
    })
    .map_err(debug_err)
}

fn parse_env_u64(name: &str, default: u64) -> Result<u64, String> {
    match env::var(name) {
        Ok(value) => value
            .parse::<u64>()
            .map_err(|_| format!("invalid {name}: expected u64")),
        Err(_) => Ok(default),
    }
}

fn parse_env_usize(name: &str, default: usize) -> Result<usize, String> {
    match env::var(name) {
        Ok(value) => value
            .parse::<usize>()
            .map_err(|_| format!("invalid {name}: expected usize")),
        Err(_) => Ok(default),
    }
}

fn generate_vector(seed: u64, salt: u64, dimension: usize) -> Vec<f32> {
    let mut state = seed ^ salt.wrapping_mul(0x9E37_79B9_7F4A_7C15);
    let mut values = Vec::with_capacity(dimension);
    for _ in 0..dimension {
        state = state
            .wrapping_mul(6364136223846793005)
            .wrapping_add(1442695040888963407);
        let scaled = ((state >> 40) as u32) as f32 / (u32::MAX as f32);
        values.push((scaled * 2.0) - 1.0);
    }
    values
}

fn inline_vector_param(name: &str, values: &[f32]) -> String {
    let mut out = format!("${name}");
    for value in values {
        out.push(':');
        out.push_str(&format!("{value:.6}"));
    }
    out
}

fn average_u128(values: &[u128]) -> f64 {
    if values.is_empty() {
        return 0.0;
    }
    values.iter().map(|value| *value as f64).sum::<f64>() / values.len() as f64
}

fn average_u64(values: &[u64]) -> f64 {
    if values.is_empty() {
        return 0.0;
    }
    values.iter().map(|value| *value as f64).sum::<f64>() / values.len() as f64
}

fn percentile_u128(values: &[u128], pct: f64) -> u128 {
    if values.is_empty() {
        return 0;
    }
    let mut sorted = values.to_vec();
    sorted.sort_unstable();
    let position = (((pct / 100.0) * sorted.len() as f64).ceil() as usize).saturating_sub(1);
    sorted[position.min(sorted.len() - 1)]
}

fn debug_err(err: impl std::fmt::Debug) -> String {
    format!("{err:?}")
}