use super::{execute_serialized_plan, temp_dir, ExecuteParams};
use crate::features::storage::api as storage_api;
use ::plexus_conformance::{run_conformance_suite, ConformanceCase, ConformanceSetup};
use ::plexus_engine::{QueryResult, Value};
use plexus_serde::{
current_plan_version, serialize_plan, CmpOp, ColDef, ColKind, Expr, LogicalType, Op, SortDir,
};
fn scan_nodes() -> Op {
Op::ScanNodes {
labels: Vec::new(),
schema: vec![ColDef {
name: "n".to_string(),
kind: ColKind::Node,
logical_type: LogicalType::NodeRef,
}],
must_labels: Vec::new(),
forbidden_labels: Vec::new(),
est_rows: 64,
selectivity: 1.0,
graph_ref: None,
}
}
fn serialize_ops(ops: Vec<Op>, root_op: u32) -> Vec<u8> {
let plan = plexus_serde::Plan {
version: current_plan_version("iridium-conformance-tests"),
ops,
root_op,
};
serialize_plan(&plan).expect("serialize conformance plan")
}
fn seeded_handle(prefix: &str) -> storage_api::StorageHandle {
let base = temp_dir(prefix);
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.expect("open store");
storage_api::put_full_node(&mut handle, 1, 1, &[2]).expect("seed node 1");
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).expect("seed node 2");
storage_api::put_full_node(&mut handle, 3, 1, &[4, 5, 6]).expect("seed node 3");
storage_api::flush(&mut handle).expect("flush seed");
handle
}
fn run_case(case: &ConformanceCase) -> std::result::Result<QueryResult, String> {
let (plan, params) = match case.name.as_str() {
"readonly_scan_limit" => (
serialize_ops(
vec![
scan_nodes(),
Op::Limit {
input: 0,
count: 2,
skip: 0,
},
Op::Return { input: 1 },
],
2,
),
ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
),
"readonly_union_sort_limit" => (
serialize_ops(
vec![
scan_nodes(),
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
},
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
},
Op::Union {
lhs: 1,
rhs: 2,
all: true,
schema: Vec::new(),
},
Op::Sort {
input: 3,
keys: vec![0],
dirs: vec![SortDir::Desc],
},
Op::Limit {
input: 4,
count: 2,
skip: 0,
},
Op::Return { input: 5 },
],
6,
),
ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
),
"unsupported_post_expand_filter" => (
serialize_ops(
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: plexus_serde::ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Filter {
input: 1,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
},
Op::Return { input: 2 },
],
3,
),
ExecuteParams::default(),
),
"readonly_optional_expand_limit" => (
serialize_ops(
vec![
scan_nodes(),
Op::OptionalExpand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: plexus_serde::ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
graph_ref: None,
},
Op::Limit {
input: 1,
count: 2,
skip: 0,
},
Op::Return { input: 2 },
],
3,
),
ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
),
"readonly_grouped_count_star" => (
serialize_ops(
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: vec![0],
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
),
ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
),
other => return Err(format!("unknown conformance case '{}'", other)),
};
let mut handle = seeded_handle("runtime_plexus_conformance_subset");
let out = execute_serialized_plan(&plan, ¶ms, &mut handle)
.map_err(|error| format!("{:?}", error))?;
let rows = out
.rows
.into_iter()
.map(|row| vec![Value::Int(row.node_id as i64)])
.collect::<Vec<_>>();
Ok(QueryResult { rows })
}
#[test]
fn plexus_conformance_subset_runner_reports_pass() {
let cases = vec![
ConformanceCase {
name: "readonly_scan_limit".to_string(),
query: "MATCH (n) RETURN n LIMIT 2".to_string(),
plan_mlir: None,
expected_rows: vec![vec![Value::Int(1)], vec![Value::Int(2)]],
expected_error_contains: None,
any_order: false,
tags: vec![
"readonly".to_string(),
"match".to_string(),
"return".to_string(),
],
setup: ConformanceSetup::default(),
},
ConformanceCase {
name: "readonly_union_sort_limit".to_string(),
query: "MATCH (n) ... UNION ALL ... RETURN ...".to_string(),
plan_mlir: None,
expected_rows: vec![vec![Value::Int(3)], vec![Value::Int(3)]],
expected_error_contains: None,
any_order: false,
tags: vec![
"readonly".to_string(),
"union".to_string(),
"return".to_string(),
],
setup: ConformanceSetup::default(),
},
ConformanceCase {
name: "unsupported_post_expand_filter".to_string(),
query: "MATCH (n)-->(m) WHERE ... RETURN m".to_string(),
plan_mlir: None,
expected_rows: Vec::new(),
expected_error_contains: Some("post-expand filtering".to_string()),
any_order: false,
tags: vec![
"readonly".to_string(),
"optional".to_string(),
"where".to_string(),
],
setup: ConformanceSetup::default(),
},
ConformanceCase {
name: "readonly_optional_expand_limit".to_string(),
query: "MATCH (n) OPTIONAL MATCH (n)-->(m) RETURN n LIMIT 2".to_string(),
plan_mlir: None,
expected_rows: vec![vec![Value::Int(2)], vec![Value::Int(3)]],
expected_error_contains: None,
any_order: false,
tags: vec![
"readonly".to_string(),
"optional".to_string(),
"limit".to_string(),
],
setup: ConformanceSetup::default(),
},
ConformanceCase {
name: "readonly_grouped_count_star".to_string(),
query: "MATCH (n) RETURN n, count(*)".to_string(),
plan_mlir: None,
expected_rows: vec![
vec![Value::Int(1)],
vec![Value::Int(2)],
vec![Value::Int(3)],
],
expected_error_contains: None,
any_order: false,
tags: vec![
"readonly".to_string(),
"aggregate".to_string(),
"return".to_string(),
],
setup: ConformanceSetup::default(),
},
];
let report = run_conformance_suite(&cases, run_case);
assert_eq!(report.total, 5);
assert_eq!(report.passed, 5);
assert_eq!(report.failed, 0);
}