use super::*;
#[test]
fn execute_serialized_plan_sorts_desc_and_applies_limit() {
let base = temp_dir("runtime_plexus_sort_limit");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
for node_id in 1..=5 {
storage_api::put_full_node(&mut handle, node_id, 1, &[node_id + 1]).unwrap();
}
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Sort {
input: 0,
keys: vec![0],
dirs: vec![SortDir::Desc],
},
Op::Limit {
input: 1,
count: 2,
skip: 0,
},
Op::Return { input: 2 },
],
3,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 6,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![5, 4]);
}
#[test]
fn execute_serialized_plan_accepts_identity_project_exprs() {
let base = temp_dir("runtime_plexus_identity_project");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Project {
input: 0,
exprs: vec![Expr::ColRef { idx: 0 }],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 3,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2]);
}
#[test]
fn execute_serialized_plan_accepts_project_with_supported_trailing_exprs() {
let base = temp_dir("runtime_plexus_project_trailing_exprs");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Project {
input: 0,
exprs: vec![
Expr::ColRef { idx: 0 },
Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
},
Expr::IntLiteral(42),
Expr::BoolLiteral(true),
],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 3,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2]);
}
#[test]
fn execute_serialized_plan_rejects_project_without_leading_colref() {
let base = temp_dir("runtime_plexus_project_lead_reject");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Project {
input: 0,
exprs: vec![Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("first expression ColRef(idx=0)"));
}
other => panic!(
"expected unsupported project-leading-expression error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_rejects_project_with_unsupported_trailing_expr() {
let base = temp_dir("runtime_plexus_project_trailing_reject");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Project {
input: 0,
exprs: vec![
Expr::ColRef { idx: 0 },
Expr::Cmp {
op: CmpOp::Eq,
lhs: Box::new(Expr::IntLiteral(1)),
rhs: Box::new(Expr::IntLiteral(1)),
},
],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("Project currently supports only"));
}
other => panic!(
"expected unsupported project-trailing-expression error, got {:?}",
other
),
}
}