use std::path::{Path, PathBuf};
use std::process::Command;
use iridium::features::client::{embedded_driver, DriverConfig};
use iridium::features::ingest;
use iridium::features::query;
use iridium::features::runtime;
use iridium::features::storage::api;
use iridium::features::storage::compaction;
use plexus_serde::{current_plan_version, serialize_plan, ColDef, ColKind, LogicalType, Op, Plan};
fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
api::encode_vector_payload_f32(1, api::VectorMetric::Cosine, v, false)
}
fn temp_dir(prefix: &str) -> PathBuf {
let mut dir = std::env::temp_dir();
let stamp = format!(
"{}_{}_{}",
prefix,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("time")
.as_nanos()
);
dir.push(stamp);
std::fs::create_dir_all(&dir).expect("create temp dir");
dir
}
fn storage_config(base_dir: &Path) -> api::StorageConfig {
api::StorageConfig {
buffer_pool_pages: 32,
wal_dir: base_dir.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base_dir.join("ir.manifest"),
sstable_dir: base_dir.join("sst"),
}
}
fn serialized_scan_limit_plan(limit: i64) -> Vec<u8> {
let plan = Plan {
version: current_plan_version("iridium-storage-paths"),
ops: vec![
Op::ScanNodes {
labels: Vec::new(),
schema: vec![ColDef {
name: "n".to_string(),
kind: ColKind::Node,
logical_type: LogicalType::NodeRef,
}],
must_labels: Vec::new(),
forbidden_labels: Vec::new(),
est_rows: 64,
selectivity: 1.0,
graph_ref: None,
},
Op::Project {
input: 0,
exprs: Vec::new(),
schema: Vec::new(),
},
Op::Limit {
input: 1,
count: limit,
skip: 0,
cursor: None,
emit_cursor: false,
},
Op::Return { input: 2 },
],
root_op: 3,
};
serialize_plan(&plan).expect("serialize plexus plan")
}
fn ir_bin() -> PathBuf {
if let Some(bin) = std::env::var_os("CARGO_BIN_EXE_ir") {
return PathBuf::from(bin);
}
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("target")
.join("debug")
.join("ir")
}
fn run_ir(cwd: &Path, args: &[&str]) -> std::process::Output {
Command::new(ir_bin())
.args(args)
.current_dir(cwd)
.output()
.expect("run ir")
}
#[test]
fn integration_wal_recovery_path() {
let base = temp_dir("it_wal_recovery");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
let delta = api::encode_delta(42, 1, b"edge");
api::put_edge_delta(&mut handle, &delta).expect("put edge");
drop(handle);
let mut recovered = api::open_store(storage_config(&base)).expect("reopen store");
let applied = api::recover_from_wal(&mut recovered).expect("recover");
assert_eq!(applied, 1);
let node = api::get_logical_node(&mut recovered, 42).expect("get node");
assert_eq!(node.deltas.len(), 1);
}
#[test]
fn integration_flush_and_sstable_read_path() {
let base = temp_dir("it_flush_read");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
api::put_full_node(&mut handle, 7, 1, &[8, 9, 10]).expect("put full");
api::flush(&mut handle).expect("flush");
assert_eq!(handle.l0_runs.len(), 1);
let node = api::get_logical_node(&mut handle, 7).expect("get node");
assert_eq!(node.adjacency(), vec![8, 9, 10]);
}
#[test]
fn integration_traversal_path() {
let base = temp_dir("it_traversal");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
api::put_full_node(&mut handle, 1, 1, &[2]).expect("put node 1");
api::put_full_node(&mut handle, 2, 1, &[3]).expect("put node 2");
api::put_full_node(&mut handle, 3, 1, &[4]).expect("put node 3");
api::flush(&mut handle).expect("flush");
let mut node_id = 1_u64;
let mut visited = Vec::new();
for _ in 0..3 {
let node = api::get_logical_node(&mut handle, node_id).expect("traversal read");
let adjacency = node.adjacency();
assert!(!adjacency.is_empty());
node_id = adjacency[0];
visited.push(node_id);
}
assert_eq!(visited, vec![2, 3, 4]);
}
#[test]
fn integration_compaction_trigger_path() {
let policy = compaction::CompactionPolicy::new(16, 2);
let runs = vec![
compaction::RunInfo {
level: 0,
run_id: 1,
size_bytes: 1024,
max_delta_count: 8,
},
compaction::RunInfo {
level: 0,
run_id: 2,
size_bytes: 1024,
max_delta_count: 9,
},
compaction::RunInfo {
level: 0,
run_id: 3,
size_bytes: 1024,
max_delta_count: 10,
},
];
let stats = compaction::GraphStats {
hot_node_ids: vec![7, 8],
max_pending_deltas: 3,
};
let decision = policy
.select_compaction(&runs, &stats)
.expect("compaction decision");
assert_eq!(decision.source_level, 0);
assert_eq!(decision.target_level, 1);
assert_eq!(decision.run_ids.len(), 3);
}
#[test]
fn integration_cli_smoke_paths() {
let base = temp_dir("it_cli_smoke");
let put_edge = run_ir(&base, &["put-edge", "10", "1", "edge-payload"]);
assert!(
put_edge.status.success(),
"put-edge failed: {}",
String::from_utf8_lossy(&put_edge.stderr)
);
let put_vector = run_ir(&base, &["put-vector", "10", "2", "1.0,0.0,0.5"]);
assert!(
put_vector.status.success(),
"put-vector failed: {}",
String::from_utf8_lossy(&put_vector.stderr)
);
let put_full = run_ir(&base, &["put-full", "10", "3", "11,12,13"]);
assert!(
put_full.status.success(),
"put-full failed: {}",
String::from_utf8_lossy(&put_full.stderr)
);
let recover = run_ir(&base, &["recover"]);
assert!(
recover.status.success(),
"recover failed: {}",
String::from_utf8_lossy(&recover.stderr)
);
assert!(String::from_utf8_lossy(&recover.stdout).contains("recovered"));
let get = run_ir(&base, &["get", "10"]);
assert!(
get.status.success(),
"get failed: {}",
String::from_utf8_lossy(&get.stderr)
);
assert!(String::from_utf8_lossy(&get.stdout).contains("node 10"));
let traverse = run_ir(&base, &["traverse", "10"]);
assert!(
traverse.status.success(),
"traverse failed: {}",
String::from_utf8_lossy(&traverse.stderr)
);
let stats = run_ir(&base, &["stats"]);
assert!(
stats.status.success(),
"stats failed: {}",
String::from_utf8_lossy(&stats.stderr)
);
assert!(String::from_utf8_lossy(&stats.stdout).contains("write_amp:"));
let bench = run_ir(
&base,
&[
"bench",
"100",
"--seed",
"5",
"--json",
"artifacts/bench_smoke.json",
],
);
assert!(
bench.status.success(),
"bench failed: {}",
String::from_utf8_lossy(&bench.stderr)
);
assert!(base.join("artifacts/bench_smoke.json").exists());
}
#[test]
fn integration_mixed_ingest_query_recovery_flow() {
let base = temp_dir("it_mixed_iqr");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
let mut pipeline = ingest::IngestPipeline::new(ingest::IngestConfig {
max_queue_depth: 128,
max_batch_size: 16,
})
.expect("ingest config");
let events = vec![
ingest::IngestEvent::InsertNode {
node_id: 100,
version: 1,
adjacency: vec![101],
bitmap_terms: Vec::new(),
embedding_pending: false,
},
ingest::IngestEvent::InsertNode {
node_id: 101,
version: 1,
adjacency: vec![102],
bitmap_terms: Vec::new(),
embedding_pending: false,
},
ingest::IngestEvent::UpdateVectorDelta {
node_id: 100,
version: 2,
payload: api::encode_vector_payload_f32(
1,
api::VectorMetric::Cosine,
&[1.0, 0.0],
false,
),
},
];
let ack = ingest::ingest_batch(&mut pipeline, &mut handle, &events).expect("ingest batch");
assert_eq!(ack.accepted, 3);
pipeline.flush_all(&mut handle).expect("flush pipeline");
api::flush(&mut handle).expect("flush store");
let ast =
query::parse("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > -1 RETURN n LIMIT 10")
.expect("parse query");
let typed = query::validate(&ast, &query::Catalog).expect("validate query");
let plan = runtime::explain(&typed).expect("explain query");
let first = runtime::execute(
&plan,
&runtime::ExecuteParams {
scan_start: 0,
scan_end_exclusive: 256,
morsel_size: 32,
parallel_workers: 1,
},
&mut handle,
)
.expect("execute query before restart");
assert!(!first.rows.is_empty());
drop(handle);
let mut reopened = api::open_store(storage_config(&base)).expect("reopen store");
api::recover_from_wal(&mut reopened).expect("recover wal");
let second = runtime::execute(
&plan,
&runtime::ExecuteParams {
scan_start: 0,
scan_end_exclusive: 256,
morsel_size: 32,
parallel_workers: 1,
},
&mut reopened,
)
.expect("execute query after restart");
assert!(!second.rows.is_empty());
}
#[test]
fn integration_embedded_driver_plexus_restart_round_trip() {
let base = temp_dir("it_driver_plexus_restart");
let data_dir = base.join("data");
std::fs::create_dir_all(&data_dir).expect("create data dir");
let mut config = DriverConfig::embedded_data_dir(data_dir);
config.execution_mode = runtime::ExecutionMode::Plexus;
let driver = embedded_driver(config.clone()).expect("create plexus driver");
driver.begin_ingest().expect("begin ingest");
driver.ingest_node(10, 1, &[11]).expect("ingest node 10");
driver.ingest_node(11, 1, &[12]).expect("ingest node 11");
driver.ingest_node(12, 1, &[13]).expect("ingest node 12");
driver.finish_ingest().expect("finish ingest");
let serialized = serialized_scan_limit_plan(3);
let before = driver
.query_serialized_plan(&serialized)
.expect("query serialized plan before restart");
let before_ids: Vec<u64> = before.rows.iter().map(|row| row.node_id).collect();
assert_eq!(before_ids, vec![10, 11, 12]);
drop(driver);
let reopened = embedded_driver(config).expect("reopen plexus driver");
let after = reopened
.query_serialized_plan(&serialized)
.expect("query serialized plan after restart");
let after_ids: Vec<u64> = after.rows.iter().map(|row| row.node_id).collect();
assert_eq!(after_ids, before_ids);
}
#[test]
fn integration_cli_ingest_file_status_and_metrics_prom() {
let base = temp_dir("it_cli_ingest_file_status");
let events_path = base.join("events.jsonl");
std::fs::write(
&events_path,
concat!(
"{\"type\":\"InsertNode\",\"node_id\":200,\"version\":1,\"adjacency\":[201]}\n",
"{\"type\":\"AddEdgeDelta\",\"node_id\":200,\"version\":2,\"payload\":\"edge200\"}\n",
"{\"type\":\"UpdateVectorDelta\",\"node_id\":200,\"version\":3,\"payload\":\"1.0,2.0,3.0\"}\n"
),
)
.expect("write events");
let ingest = run_ir(
&base,
&[
"ingest",
"--file",
events_path.to_str().expect("events path utf8"),
"--batch-size",
"2",
],
);
assert!(
ingest.status.success(),
"ingest --file failed: {}",
String::from_utf8_lossy(&ingest.stderr)
);
assert!(String::from_utf8_lossy(&ingest.stdout).contains("accepted=3"));
let status = run_ir(&base, &["status"]);
assert!(
status.status.success(),
"status failed: {}",
String::from_utf8_lossy(&status.stderr)
);
let status_out = String::from_utf8_lossy(&status.stdout);
assert!(status_out.contains("schema_version:"));
assert!(status_out.contains("pending_deltas_max:"));
let metrics_prom = run_ir(&base, &["metrics-prom"]);
assert!(
metrics_prom.status.success(),
"metrics-prom failed: {}",
String::from_utf8_lossy(&metrics_prom.stderr)
);
let prom = String::from_utf8_lossy(&metrics_prom.stdout);
assert!(prom.contains("iridium_query_total"));
assert!(prom.contains("iridium_ingest_accepted"));
}
#[test]
fn hnsw_rebuild_on_open() {
let base = temp_dir("it_hnsw_rebuild");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
for i in 0_u64..50 {
let v: Vec<f32> = vec![i as f32, (i + 1) as f32, (i + 2) as f32, (i + 3) as f32];
let payload = f32_to_bytes(&v);
let delta = api::encode_delta(i, 1, &payload);
api::put_vector_delta(&mut handle, &delta).expect("put vector delta");
}
api::flush(&mut handle).expect("flush");
api::sync(&mut handle).expect("sync");
drop(handle);
let reopened = api::open_store(storage_config(&base)).expect("reopen store");
assert!(
!reopened.hnsw_graph.is_empty(),
"HNSW graph should be populated after rebuild"
);
let query = vec![25.0_f32, 26.0, 27.0, 28.0];
let results = api::hnsw_search(&reopened, &query, 10);
assert!(!results.is_empty(), "hnsw_search should return results");
let ids: Vec<u64> = results.iter().map(|(id, _)| *id).collect();
assert!(
ids.contains(&25),
"node 25 should be in top-10 results, got {:?}",
ids
);
}
#[test]
fn scalar_where_filters_by_adjacency_degree() {
let base = temp_dir("it_scalar_where");
let mut handle = api::open_store(storage_config(&base)).expect("open store");
api::put_full_node(&mut handle, 10, 1, &[]).expect("node 10");
api::put_full_node(&mut handle, 11, 1, &[20]).expect("node 11");
api::put_full_node(&mut handle, 12, 1, &[20, 21]).expect("node 12");
api::put_full_node(&mut handle, 13, 1, &[20, 21, 22]).expect("node 13");
api::flush(&mut handle).expect("flush");
let ast = query::parse("MATCH (n) WHERE n.adjacency_degree > 1 RETURN n").expect("parse");
let typed = query::validate(&ast, &query::Catalog).expect("validate");
assert!(typed.scalar_predicate.is_some());
let plan = runtime::explain(&typed).expect("explain");
let result = runtime::execute(
&plan,
&runtime::ExecuteParams {
scan_start: 10,
scan_end_exclusive: 14,
morsel_size: 16,
parallel_workers: 0,
},
&mut handle,
)
.expect("execute");
let ids: Vec<u64> = result.rows.iter().map(|r| r.node_id).collect();
assert!(
!ids.contains(&10),
"node 10 (degree 0) should be filtered out"
);
assert!(
!ids.contains(&11),
"node 11 (degree 1) should be filtered out"
);
assert!(ids.contains(&12), "node 12 (degree 2) should pass");
assert!(ids.contains(&13), "node 13 (degree 3) should pass");
assert_eq!(result.rows.len(), 2);
}