iridium-db 0.4.0

A high-performance vector-graph hybrid storage and indexing engine
use std::path::{Path, PathBuf};
use std::time::Instant;

use iridium::features::runtime;
use iridium::features::storage::api as storage_api;

fn temp_dir(prefix: &str) -> PathBuf {
    let mut dir = std::env::temp_dir();
    let stamp = format!(
        "{}_{}_{}",
        prefix,
        std::process::id(),
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .expect("time")
            .as_nanos()
    );
    dir.push(stamp);
    std::fs::create_dir_all(&dir).expect("create temp dir");
    dir
}

fn percentile(mut values: Vec<u128>, p: f64) -> u128 {
    if values.is_empty() {
        return 0;
    }
    values.sort_unstable();
    let idx = (((values.len() - 1) as f64) * p).ceil() as usize;
    values[idx.min(values.len() - 1)]
}

fn env_u64(name: &str, default: u64) -> u64 {
    std::env::var(name)
        .ok()
        .and_then(|v| v.parse::<u64>().ok())
        .unwrap_or(default)
}

fn env_f64(name: &str, default: f64) -> f64 {
    std::env::var(name)
        .ok()
        .and_then(|v| v.parse::<f64>().ok())
        .unwrap_or(default)
}

fn write_artifact(path: &Path, content: &str) {
    if let Some(parent) = path.parent() {
        std::fs::create_dir_all(parent).expect("create artifact dir");
    }
    std::fs::write(path, content).expect("write artifact");
}

#[test]
fn thread_per_core_gate() {
    let nodes = env_u64("NODES", 4000);
    let iterations = env_u64("ITERATIONS", 20);
    let limit = env_u64("LIMIT", 500);
    let max_shard_local_p95_us = env_u64("MAX_SHARD_LOCAL_P95_US", 50_000);
    let max_cross_shard_p95_us = env_u64("MAX_CROSS_SHARD_P95_US", 100_000);
    let min_fanout_throughput_ratio = env_f64("MIN_FANOUT_THROUGHPUT_RATIO", 0.4);
    let out_json = std::env::var("OUT_JSON")
        .unwrap_or_else(|_| "artifacts/thread_per_core_gate_report.json".to_string());
    let out_md = std::env::var("OUT_MD")
        .unwrap_or_else(|_| "artifacts/thread_per_core_gate_report.md".to_string());

    let base = temp_dir("thread_per_core_gate");
    let request0 = storage_api::ThreadCoreRequest {
        core_id: 0,
        shard_count: 2,
    };
    let request1 = storage_api::ThreadCoreRequest {
        core_id: 1,
        shard_count: 2,
    };
    let lanes = storage_api::ThreadCoreLaneConfig::default();
    let config0 = storage_api::StorageConfig {
        buffer_pool_pages: 64,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    };
    let config1 = storage_api::StorageConfig {
        buffer_pool_pages: 64,
        wal_dir: base.join("wal"),
        wal_segment_max_bytes: 1 << 20,
        manifest_path: base.join("ir.manifest"),
        sstable_dir: base.join("sst"),
    };

    let mut handle0 =
        storage_api::open_store_for_request(config0, &request0, &lanes).expect("open shard 0");
    let mut handle1 =
        storage_api::open_store_for_request(config1, &request1, &lanes).expect("open shard 1");

    for node_id in 1..=nodes {
        if storage_api::request_owns_node(&request0, node_id) {
            storage_api::put_full_node(&mut handle0, node_id, 1, &[node_id + 1])
                .expect("put shard0");
        } else {
            storage_api::put_full_node(&mut handle1, node_id, 1, &[node_id + 1])
                .expect("put shard1");
        }
    }
    storage_api::flush(&mut handle0).expect("flush shard0");
    storage_api::flush(&mut handle1).expect("flush shard1");

    let typed = iridium::features::query::validate(
        &iridium::features::query::parse(&format!("MATCH (n) RETURN n LIMIT {}", limit))
            .expect("parse"),
        &iridium::features::query::Catalog,
    )
    .expect("validate");
    let plan = runtime::explain(&typed).expect("explain");
    let params = runtime::ExecuteParams {
        scan_start: 1,
        scan_end_exclusive: nodes + 1,
        morsel_size: 128,
        parallel_workers: 1,
    };

    let mut shard_local_lat = Vec::new();
    let mut cross_shard_lat = Vec::new();

    let start_local = Instant::now();
    for _ in 0..iterations {
        let out = runtime::execute_with_request(&plan, &params, &mut handle0, &request0)
            .expect("local execute");
        shard_local_lat.push(out.latency_micros);
    }
    let local_elapsed = start_local.elapsed().as_secs_f64();
    let local_ops_per_sec = if local_elapsed > 0.0 {
        iterations as f64 / local_elapsed
    } else {
        0.0
    };

    let start_fanout = Instant::now();
    for _ in 0..iterations {
        let mut shards = vec![
            runtime::FanoutShardExecution {
                request: request0.clone(),
                handle: &mut handle0,
            },
            runtime::FanoutShardExecution {
                request: request1.clone(),
                handle: &mut handle1,
            },
        ];
        let out = runtime::execute_fanout(&plan, &params, &mut shards).expect("fanout execute");
        cross_shard_lat.push(out.latency_micros);
    }
    let fanout_elapsed = start_fanout.elapsed().as_secs_f64();
    let fanout_ops_per_sec = if fanout_elapsed > 0.0 {
        iterations as f64 / fanout_elapsed
    } else {
        0.0
    };

    let ids_a = {
        let mut shards = vec![
            runtime::FanoutShardExecution {
                request: request0.clone(),
                handle: &mut handle0,
            },
            runtime::FanoutShardExecution {
                request: request1.clone(),
                handle: &mut handle1,
            },
        ];
        runtime::execute_fanout(&plan, &params, &mut shards)
            .expect("fanout order a")
            .rows
            .into_iter()
            .map(|row| row.node_id)
            .collect::<Vec<u64>>()
    };
    let ids_b = {
        let mut shards = vec![
            runtime::FanoutShardExecution {
                request: request1.clone(),
                handle: &mut handle1,
            },
            runtime::FanoutShardExecution {
                request: request0.clone(),
                handle: &mut handle0,
            },
        ];
        runtime::execute_fanout(&plan, &params, &mut shards)
            .expect("fanout order b")
            .rows
            .into_iter()
            .map(|row| row.node_id)
            .collect::<Vec<u64>>()
    };

    let deterministic_merge_pass = ids_a == ids_b;
    let shard_local_p95 = percentile(shard_local_lat, 0.95);
    let cross_shard_p95 = percentile(cross_shard_lat, 0.95);
    let shard_local_p95_pass = shard_local_p95 <= max_shard_local_p95_us as u128;
    let cross_shard_p95_pass = cross_shard_p95 <= max_cross_shard_p95_us as u128;
    let throughput_ratio = if local_ops_per_sec > 0.0 {
        fanout_ops_per_sec / local_ops_per_sec
    } else {
        0.0
    };
    let throughput_pass = throughput_ratio >= min_fanout_throughput_ratio;
    let passed =
        deterministic_merge_pass && shard_local_p95_pass && cross_shard_p95_pass && throughput_pass;

    let json = format!(
        concat!(
            "{{\n",
            "  \"gate\": \"thread_per_core_gate\",\n",
            "  \"passed\": {},\n",
            "  \"config\": {{\"nodes\": {}, \"iterations\": {}, \"limit\": {}}},\n",
            "  \"metrics\": {{\n",
            "    \"shard_local_p95_us\": {},\n",
            "    \"cross_shard_p95_us\": {},\n",
            "    \"local_ops_per_sec\": {:.3},\n",
            "    \"fanout_ops_per_sec\": {:.3},\n",
            "    \"fanout_throughput_ratio\": {:.4}\n",
            "  }},\n",
            "  \"checks\": {{\n",
            "    \"deterministic_merge_pass\": {},\n",
            "    \"shard_local_p95_pass\": {},\n",
            "    \"cross_shard_p95_pass\": {},\n",
            "    \"throughput_pass\": {}\n",
            "  }}\n",
            "}}\n"
        ),
        passed,
        nodes,
        iterations,
        limit,
        shard_local_p95,
        cross_shard_p95,
        local_ops_per_sec,
        fanout_ops_per_sec,
        throughput_ratio,
        deterministic_merge_pass,
        shard_local_p95_pass,
        cross_shard_p95_pass,
        throughput_pass
    );
    let md = format!(
        concat!(
            "# Thread-Per-Core Gate\n\n",
            "- passed: {}\n",
            "- nodes: {}\n",
            "- iterations: {}\n",
            "- limit: {}\n\n",
            "## Metrics\n",
            "- shard_local_p95_us: {}\n",
            "- cross_shard_p95_us: {}\n",
            "- local_ops_per_sec: {:.3}\n",
            "- fanout_ops_per_sec: {:.3}\n",
            "- fanout_throughput_ratio: {:.4}\n\n",
            "## Checks\n",
            "- deterministic_merge_pass: {}\n",
            "- shard_local_p95_pass: {}\n",
            "- cross_shard_p95_pass: {}\n",
            "- throughput_pass: {}\n"
        ),
        passed,
        nodes,
        iterations,
        limit,
        shard_local_p95,
        cross_shard_p95,
        local_ops_per_sec,
        fanout_ops_per_sec,
        throughput_ratio,
        deterministic_merge_pass,
        shard_local_p95_pass,
        cross_shard_p95_pass,
        throughput_pass
    );
    write_artifact(Path::new(&out_json), &json);
    write_artifact(Path::new(&out_md), &md);

    assert!(passed, "thread-per-core gate failed; see {}", out_json);
}