use super::*;
#[test]
fn execute_serialized_plan_rejects_invalid_magic() {
let base = temp_dir("runtime_plexus_invalid_magic");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let err = execute_serialized_plan(b"not-a-flatbuffer", &ExecuteParams::default(), &mut handle)
.unwrap_err();
match err {
ExplainError::SerializedPlanMalformed(message) => {
assert!(message.contains("invalid flatbuffer"));
}
other => panic!("expected malformed plan error, got {:?}", other),
}
}
#[test]
fn execute_serialized_plan_rejects_major_version_mismatch() {
let base = temp_dir("runtime_plexus_version_gate");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(2, 0, 0),
vec![
scan_nodes(),
Op::Limit {
input: 0,
count: 2,
skip: 0,
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 16,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap_err();
match err {
ExplainError::UnsupportedSerializedPlanVersion {
found_version,
supported_major,
max_supported_minor,
} => {
assert_eq!(found_version, "2.0.0");
assert_eq!(supported_major, 0_u32);
assert_eq!(max_supported_minor, 3_u32);
}
other => panic!("expected unsupported version error, got {:?}", other),
}
}
#[test]
fn execute_serialized_plan_rejects_invalid_op_reference() {
let base = temp_dir("runtime_plexus_invalid_ref");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes((1, 0, 0), vec![scan_nodes(), Op::Return { input: 3 }], 1);
let err = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 0,
scan_end_exclusive: 16,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap_err();
match err {
ExplainError::SerializedPlanMalformed(message) => {
assert!(message.contains("invalid edge reference"));
}
other => panic!("expected invalid edge reference error, got {:?}", other),
}
}
#[test]
fn execute_serialized_plan_executes_minimal_readonly_subset() {
let base = temp_dir("runtime_plexus_readonly_subset");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=8 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Project {
input: 0,
exprs: Vec::new(),
schema: Vec::new(),
},
Op::Limit {
input: 1,
count: 3,
skip: 0,
},
Op::Return { input: 2 },
],
3,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 9,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 3);
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2, 3]);
}
#[cfg(feature = "plexus-mlir-opt")]
#[test]
fn execute_serialized_plan_core_optimizer_pipeline_executes_minimal_readonly_subset() {
let base = temp_dir("runtime_plexus_opt_core");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=8 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Limit {
input: 0,
count: 3,
skip: 0,
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan_with_pipeline_override(
&plan,
Some(PIPELINE_CORE),
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 9,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 3);
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2, 3]);
}
#[cfg(feature = "plexus-mlir-opt")]
#[test]
fn execute_serialized_plan_rejects_unknown_optimizer_pipeline_name() {
let base = temp_dir("runtime_plexus_opt_bad_pipeline");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes((1, 0, 0), vec![scan_nodes(), Op::Return { input: 0 }], 1);
let err = execute_serialized_plan_with_pipeline_name_override(
&plan,
"banana",
&ExecuteParams::default(),
&mut handle,
)
.unwrap_err();
match err {
ExplainError::InvalidPlan(message) => {
assert!(message.contains("IR_PLEXUS_OPT_PIPELINE"));
}
other => panic!("expected invalid plan error, got {:?}", other),
}
}
#[test]
fn plexus_iridium_engine_executes_deserialized_plan_directly() {
let base = temp_dir("runtime_plexus_engine_direct");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=8 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let bytes = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Limit {
input: 0,
count: 3,
skip: 0,
},
Op::Return { input: 1 },
],
2,
);
let plan = deserialize_plan(&bytes).unwrap();
let mut engine = PlexusIridiumEngine::new(
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 9,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
);
let out = PlanEngine::execute_plan(&mut engine, &plan).unwrap();
let rows: Vec<u64> = out
.rows
.into_iter()
.map(|row| match row.first() {
Some(::plexus_engine::Value::Int(value)) => *value as u64,
other => panic!("unexpected row shape: {:?}", other),
})
.collect();
assert_eq!(rows, vec![1, 2, 3]);
}
#[test]
fn execute_serialized_plan_applies_scalar_filter_predicate() {
let base = temp_dir("runtime_plexus_scalar_filter");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[5, 6, 7]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![2, 3]);
}
#[test]
fn execute_serialized_plan_rejects_unsupported_op_capability() {
let base = temp_dir("runtime_plexus_unsupported_capability");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::CreateNode {
input: 0,
labels: Vec::new(),
props: Expr::NullLiteral,
schema: Vec::new(),
out_var: "n".to_string(),
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("CreateNode"));
}
other => panic!("expected unsupported capability error, got {:?}", other),
}
}