use super::*;
#[test]
fn execute_serialized_plan_executes_union_all_and_union_distinct() {
let base = temp_dir("runtime_plexus_union");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[4, 5, 6]).unwrap();
storage_api::flush(&mut handle).unwrap();
let filter_ge_2 = Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
};
let filter_ge_1 = Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
};
let union_all_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
filter_ge_2.clone(),
filter_ge_1.clone(),
Op::Union {
lhs: 1,
rhs: 2,
all: true,
schema: Vec::new(),
},
Op::Return { input: 3 },
],
4,
);
let union_all = execute_serialized_plan(
&union_all_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let union_all_ids: Vec<u64> = union_all.rows.iter().map(|row| row.node_id).collect();
assert_eq!(union_all_ids, vec![2, 3, 1, 2, 3]);
let union_distinct_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
filter_ge_2,
filter_ge_1,
Op::Union {
lhs: 1,
rhs: 2,
all: false,
schema: Vec::new(),
},
Op::Return { input: 3 },
],
4,
);
let union_distinct = execute_serialized_plan(
&union_distinct_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let union_distinct_ids: Vec<u64> = union_distinct.rows.iter().map(|row| row.node_id).collect();
assert_eq!(union_distinct_ids, vec![2, 3, 1]);
}
#[test]
fn execute_serialized_plan_executes_union_with_sort_and_limit_wrappers() {
let base = temp_dir("runtime_plexus_union_sort_limit_wrappers");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[4, 5, 6]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
},
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
},
Op::Union {
lhs: 1,
rhs: 2,
all: true,
schema: Vec::new(),
},
Op::Sort {
input: 3,
keys: vec![0],
dirs: vec![SortDir::Desc],
},
Op::Limit {
input: 4,
count: 2,
skip: 0,
},
Op::Return { input: 5 },
],
6,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![3, 3]);
}
#[test]
fn execute_serialized_plan_executes_union_with_aggregate_wrapper() {
let base = temp_dir("runtime_plexus_union_aggregate_wrapper");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[4, 5, 6]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
},
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
},
Op::Union {
lhs: 1,
rhs: 2,
all: false,
schema: Vec::new(),
},
Op::Aggregate {
input: 3,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
}],
schema: Vec::new(),
},
Op::Return { input: 4 },
],
5,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 1);
assert_eq!(stream.rows[0].aggregate_value, Some(3.0));
}
#[test]
fn execute_serialized_plan_executes_unwind_list_literal() {
let base = temp_dir("runtime_plexus_unwind_list");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 10, 1, &[11]).unwrap();
storage_api::put_full_node(&mut handle, 20, 1, &[21, 22]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Unwind {
input: 0,
list_expr: Expr::ListLiteral {
items: vec![Expr::IntLiteral(10), Expr::IntLiteral(20)],
},
out_var: "x".to_string(),
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 2,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![10, 20]);
}
#[test]
fn execute_serialized_plan_executes_unwind_before_expand() {
let base = temp_dir("runtime_plexus_unwind_before_expand");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[99]).unwrap();
storage_api::put_full_node(&mut handle, 10, 1, &[11]).unwrap();
storage_api::put_full_node(&mut handle, 11, 1, &[]).unwrap();
storage_api::put_full_node(&mut handle, 20, 1, &[21]).unwrap();
storage_api::put_full_node(&mut handle, 21, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Unwind {
input: 0,
list_expr: Expr::ListLiteral {
items: vec![Expr::IntLiteral(10), Expr::IntLiteral(20)],
},
out_var: "x".to_string(),
schema: Vec::new(),
},
Op::Expand {
input: 1,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Return { input: 2 },
],
3,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 2,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![11, 21]);
}
#[test]
fn execute_serialized_plan_rejects_unwind_after_expand() {
let base = temp_dir("runtime_plexus_unwind_after_expand_rejected");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Unwind {
input: 1,
list_expr: Expr::ListLiteral {
items: vec![Expr::IntLiteral(10)],
},
out_var: "x".to_string(),
schema: Vec::new(),
},
Op::Return { input: 2 },
],
3,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("post-expand unwind"));
}
other => panic!(
"expected unsupported unwind-after-expand error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_path_construct_is_passthrough() {
let base = temp_dir("runtime_plexus_path_construct_passthrough");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::PathConstruct {
input: 0,
rel_cols: vec![0],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 3,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 2]);
}