use super::*;
use crate::features::runtime;
use crate::features::storage::api as storage_api;
use plexus_serde::{current_plan_version, serialize_plan, ColDef, ColKind, LogicalType, Op, Plan};
fn temp_driver() -> RustDriver {
temp_driver_with_mode(runtime::ExecutionMode::Native)
}
fn temp_driver_with_mode(mode: runtime::ExecutionMode) -> RustDriver {
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let base = std::env::temp_dir().join(format!(
"iridium-driver-test-{}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock before unix epoch")
.as_nanos(),
COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
));
let data_dir = base.join("data");
std::fs::create_dir_all(&data_dir).expect("create data dir");
let mut config = DriverConfig::embedded_data_dir(data_dir);
config.execution_mode = mode;
embedded_driver(config).expect("create driver")
}
#[cfg(feature = "rhodium-backend")]
fn temp_driver_with_blob_backend(
mode: runtime::ExecutionMode,
blob_backend: storage_api::BlobBackend,
) -> RustDriver {
static COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let base = std::env::temp_dir().join(format!(
"iridium-driver-test-blob-{}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock before unix epoch")
.as_nanos(),
COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
));
let data_dir = base.join("data");
std::fs::create_dir_all(&data_dir).expect("create data dir");
let mut config = DriverConfig::embedded_data_dir(data_dir);
config.execution_mode = mode;
config.blob_backend = blob_backend;
embedded_driver(config).expect("create driver")
}
fn serialized_plan_bytes(limit: i64) -> Vec<u8> {
let plan = Plan {
version: current_plan_version("iridium-tests"),
ops: vec![
Op::ScanNodes {
labels: Vec::new(),
schema: vec![ColDef {
name: "n".to_string(),
kind: ColKind::Node,
logical_type: LogicalType::NodeRef,
}],
must_labels: Vec::new(),
forbidden_labels: Vec::new(),
est_rows: 64,
selectivity: 1.0,
graph_ref: None,
},
Op::Project {
input: 0,
exprs: Vec::new(),
schema: Vec::new(),
},
Op::Limit {
input: 1,
count: limit,
skip: 0,
cursor: None,
emit_cursor: false,
},
Op::Return { input: 2 },
],
root_op: 3,
};
serialize_plan(&plan).expect("serialize plan")
}
#[test]
fn grpc_contract_metadata_is_stable() {
assert_eq!(GRPC_CONTRACT.service_name, "IridiumService");
assert_eq!(GRPC_CONTRACT.package, "iridium.v1");
assert_eq!(GRPC_CONTRACT.version, "v1");
assert!(std::path::Path::new(GRPC_CONTRACT.proto_path).exists());
}
#[test]
fn driver_ingest_explain_and_query_round_trip() {
let driver = temp_driver();
driver.ingest_node(1, 1, &[2, 3]).expect("ingest node");
driver.ingest_node(2, 1, &[4]).expect("ingest node 2");
let plan = driver
.explain("MATCH (n) RETURN n LIMIT 10")
.expect("explain query");
assert!(plan.estimated_cost > 0);
let rows = driver
.query("MATCH (n) RETURN n LIMIT 10")
.expect("query execution");
assert!(!rows.rows.is_empty());
}
#[test]
fn driver_query_stream_exposes_rows_iterator() {
let driver = temp_driver();
for node_id in 1..=4 {
driver
.ingest_node(node_id, 1, &[node_id + 1])
.expect("ingest node");
}
let cursor = driver
.query_stream("MATCH (n) RETURN n LIMIT 10")
.expect("query stream");
assert!(!cursor.is_empty());
let count = cursor.count();
assert!(count >= 4);
}
#[test]
fn driver_batch_ingest_with_session_round_trip() {
let driver = temp_driver();
driver.begin_ingest().expect("begin ingest");
driver
.ingest_nodes_batch(&[(1, 1, vec![2, 3]), (2, 1, vec![4])])
.expect("ingest node batch");
driver
.ingest_edges_batch(&[(1, 2, b"edge-a".to_vec()), (2, 2, b"edge-b".to_vec())])
.expect("ingest edge batch");
driver.finish_ingest().expect("finish ingest");
let rows = driver
.query("MATCH (n) RETURN n LIMIT 10")
.expect("query execution");
assert!(!rows.rows.is_empty());
assert!(rows.rows.iter().any(|row| row.node_id == 1));
}
#[test]
fn driver_ingest_vector_supports_vector_query() {
let driver = temp_driver();
driver.ingest_node(1, 1, &[2]).expect("ingest node 1");
driver.ingest_node(2, 1, &[3]).expect("ingest node 2");
driver
.ingest_vector(1, 2, &[1.0, 0.0])
.expect("ingest vector 1");
driver
.ingest_vector(2, 2, &[0.0, 1.0])
.expect("ingest vector 2");
let rows = driver
.query("MATCH (n) WHERE vector.cosine(n.embedding, $q:1:0) > 0.8 RETURN n LIMIT 10")
.expect("vector query");
let ids: Vec<u64> = rows.rows.into_iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1]);
}
#[test]
fn driver_bitmap_index_round_trip() {
let driver = temp_driver();
driver
.create_bitmap_index("idx_country", "n.country")
.expect("create bitmap index");
driver.ingest_node(1, 1, &[2]).expect("ingest node 1");
driver.ingest_node(2, 1, &[3]).expect("ingest node 2");
driver
.bitmap_add_posting("idx_country", "US", 2)
.expect("add posting");
let indexes = driver.list_bitmap_indexes().expect("list indexes");
assert_eq!(
indexes,
vec![("idx_country".to_string(), "n.country".to_string())]
);
let out = driver
.query("MATCH (n) WHERE bitmap.contains(idx_country, US) = 1 RETURN n LIMIT 10")
.expect("bitmap query");
let ids: Vec<u64> = out.rows.into_iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![2]);
}
#[test]
fn idempotency_key_suppresses_duplicate_mutations() {
let driver = temp_driver();
let options = MutationOptions {
idempotency_key: Some("op-1".to_string()),
};
driver
.ingest_edge_with_options(11, 1, b"edge", &options)
.expect("first ingest");
driver
.ingest_edge_with_options(11, 1, b"edge", &options)
.expect("duplicate ingest");
let mut guard = driver.state_guard().expect("state guard");
let handle = driver.ensure_handle(&mut guard).expect("store handle");
let logical = storage_api::get_logical_node(handle, 11).expect("logical");
assert_eq!(logical.deltas.len(), 1);
}
#[test]
fn idempotency_key_persists_across_driver_restart() {
let base = std::env::temp_dir().join(format!(
"iridium-driver-idempotency-restart-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("clock before unix epoch")
.as_nanos(),
));
let data_dir = base.join("data");
std::fs::create_dir_all(&data_dir).expect("create data dir");
let config = DriverConfig::embedded_data_dir(data_dir.clone());
let driver = embedded_driver(config.clone()).expect("create driver");
let options = MutationOptions {
idempotency_key: Some("op-restart".to_string()),
};
driver
.ingest_edge_with_options(12, 1, b"edge", &options)
.expect("first ingest");
drop(driver);
let reopened = embedded_driver(config).expect("reopen driver");
{
let mut guard = reopened.state_guard().expect("state guard");
let _ = reopened.ensure_handle(&mut guard).expect("ensure handle");
assert!(guard.seen_mutations.contains("op-restart"));
}
let before = {
let mut guard = reopened.state_guard().expect("state guard");
let handle = reopened.ensure_handle(&mut guard).expect("store handle");
let logical = storage_api::get_logical_node(handle, 12).expect("logical");
logical.deltas.len()
};
reopened
.ingest_edge_with_options(12, 1, b"edge", &options)
.expect("duplicate ingest");
let mut guard = reopened.state_guard().expect("state guard");
let handle = reopened.ensure_handle(&mut guard).expect("store handle");
let logical = storage_api::get_logical_node(handle, 12).expect("logical");
assert_eq!(logical.deltas.len(), before);
}
#[test]
fn driver_error_retry_classification_contract() {
let io_err = DriverError::Storage(storage_api::StorageError::Io(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"timeout",
)));
assert!(matches!(io_err.retry_class(), RetryClass::Retryable));
let invalid = DriverError::Storage(storage_api::StorageError::InvalidInput(
"bad input".to_string(),
));
assert!(matches!(invalid.retry_class(), RetryClass::NonRetryable));
}
#[test]
fn driver_rejects_query_text_in_plexus_mode() {
let driver = temp_driver_with_mode(runtime::ExecutionMode::Plexus);
let err = driver.query("MATCH (n) RETURN n LIMIT 10").unwrap_err();
match err {
DriverError::InvalidConfig(message) => {
assert!(message.contains("native mode"));
}
other => panic!("expected invalid config error, got {:?}", other),
}
}
#[test]
fn driver_executes_serialized_plan_in_plexus_mode() {
let driver = temp_driver_with_mode(runtime::ExecutionMode::Plexus);
for node_id in 1..=6 {
driver
.ingest_node(node_id, 1, &[node_id + 1])
.expect("ingest node");
}
let serialized = serialized_plan_bytes(2);
let stream = driver
.query_serialized_plan(&serialized)
.expect("serialized query");
let ids: Vec<u64> = stream.rows.into_iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2]);
}
#[test]
fn driver_blob_optioned_and_batch_prefix_ops_local() {
let driver = temp_driver();
let first = driver
.put_blob_with_options(
"ns-a1",
b"v1",
storage_api::BlobPutOptions {
idempotent: true,
..storage_api::BlobPutOptions::default()
},
)
.expect("first put");
assert!(first.inserted);
let second = driver
.put_blob_with_options(
"ns-a1",
b"v1",
storage_api::BlobPutOptions {
idempotent: true,
..storage_api::BlobPutOptions::default()
},
)
.expect("second put");
assert!(second.idempotent_noop);
driver.put_blob("ns-a2", b"v2").expect("put a2");
driver.put_blob("ns-b1", b"v3").expect("put b1");
let got = driver
.get_blob_with_options(
"ns-a1",
storage_api::BlobReadOptions {
tier_policy: storage_api::BlobReadTierPolicy::Default,
rehydrate_local: false,
},
)
.expect("get with options")
.expect("blob exists");
assert_eq!(got.data, b"v1".to_vec());
let has = driver
.has_blobs(&[
"ns-a1".to_string(),
"ns-a2".to_string(),
"missing".to_string(),
])
.expect("has_blobs");
assert_eq!(has, vec![true, true, false]);
let listed = driver
.list_blob_prefix("ns-", "a", 10)
.expect("list prefix");
assert_eq!(listed, vec!["ns-a1".to_string(), "ns-a2".to_string()]);
let del_prefix = driver
.delete_blob_prefix("ns-", "a", 1)
.expect("delete prefix");
assert_eq!(del_prefix.deleted, 1);
assert!(del_prefix.truncated);
let deleted = driver
.delete_blobs(&["ns-b1".to_string(), "missing".to_string()])
.expect("delete blobs");
assert_eq!(deleted, 1);
}
#[test]
#[cfg(feature = "rhodium-backend")]
fn driver_blob_optioned_and_batch_prefix_ops_rhodium() {
let driver = temp_driver_with_blob_backend(
runtime::ExecutionMode::Native,
storage_api::BlobBackend::Rhodium,
);
driver
.put_blob_with_options(
"manifest/a/1",
b"rv1",
storage_api::BlobPutOptions {
ack_mode: storage_api::BlobAckMode::Flush,
durability_target: storage_api::BlobDurabilityTarget::Disk,
timeout_ms: 5_000,
idempotent: true,
deny_if_exists: false,
verify_content_hash: false,
},
)
.expect("put with options");
driver.put_blob("manifest/a/2", b"rv2").expect("put a2");
driver.put_blob("manifest/b/1", b"rv3").expect("put b1");
let got = driver
.get_blob_with_options(
"manifest/a/1",
storage_api::BlobReadOptions {
tier_policy: storage_api::BlobReadTierPolicy::LocalFirst,
rehydrate_local: true,
},
)
.expect("get with options")
.expect("exists");
assert_eq!(got.data, b"rv1".to_vec());
let has = driver
.has_blobs(&[
"manifest/a/1".to_string(),
"manifest/a/2".to_string(),
"manifest/missing".to_string(),
])
.expect("has blobs");
assert_eq!(has, vec![true, true, false]);
let listed = driver
.list_blob_prefix("manifest", "/a", 10)
.expect("list prefix");
assert_eq!(listed.len(), 2);
assert!(listed.contains(&"manifest/a/1".to_string()));
assert!(listed.contains(&"manifest/a/2".to_string()));
let del_prefix = driver
.delete_blob_prefix("manifest", "/a", 1)
.expect("delete prefix");
assert_eq!(del_prefix.deleted, 1);
assert!(del_prefix.truncated);
let deleted = driver
.delete_blobs(&["manifest/b/1".to_string(), "manifest/missing".to_string()])
.expect("delete blobs");
assert_eq!(deleted, 1);
}