use super::*;
#[test]
fn execute_serialized_plan_executes_one_hop_expand_out() {
let base = temp_dir("runtime_plexus_expand_out");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![2, 3, 3]);
}
#[test]
fn execute_serialized_plan_executes_filter_before_expand() {
let base = temp_dir("runtime_plexus_filter_before_expand");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::put_full_node(&mut handle, 4, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Filter {
input: 0,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(2)),
},
},
Op::Expand {
input: 1,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Return { input: 2 },
],
3,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 5,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![3, 4]);
}
#[test]
fn execute_serialized_plan_rejects_filter_after_expand() {
let base = temp_dir("runtime_plexus_filter_after_expand_rejected");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Filter {
input: 1,
predicate: Expr::Cmp {
op: CmpOp::Ge,
lhs: Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
}),
rhs: Box::new(Expr::IntLiteral(1)),
},
},
Op::Return { input: 2 },
],
3,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("post-expand filtering"));
}
other => panic!(
"expected unsupported filter-after-expand error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_rejects_expand_direction_in() {
let base = temp_dir("runtime_plexus_expand_in_rejected");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::In,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("Out direction"));
}
other => panic!(
"expected unsupported expand direction error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_executes_optional_expand_out_with_fallback_rows() {
let base = temp_dir("runtime_plexus_optional_expand_out");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::OptionalExpand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
graph_ref: None,
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![2, 2, 3]);
}
#[test]
fn execute_serialized_plan_executes_semi_expand_out() {
let base = temp_dir("runtime_plexus_semi_expand_out");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[1]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::SemiExpand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![1, 3]);
}
#[test]
fn execute_serialized_plan_executes_expand_var_len_out() {
let base = temp_dir("runtime_plexus_expand_varlen");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[4]).unwrap();
storage_api::put_full_node(&mut handle, 4, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::ExpandVarLen {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
min_hops: 2,
max_hops: 3,
schema: Vec::new(),
src_var: "n".to_string(),
path_var: "p".to_string(),
dst_var: "m".to_string(),
graph_ref: None,
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 2,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![3, 4]);
}