use super::*;
#[test]
fn execute_serialized_plan_aggregate_group_by_node_id_distinct() {
let base = temp_dir("runtime_plexus_aggregate_group_by_node");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Aggregate {
input: 1,
keys: vec![0],
aggs: Vec::new(),
schema: Vec::new(),
},
Op::Return { input: 2 },
],
3,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 4,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let ids: Vec<u64> = stream.rows.iter().map(|row| row.node_id).collect();
assert_eq!(ids, vec![2, 3]);
}
#[test]
fn execute_serialized_plan_aggregate_count_star_encodes_count() {
let base = temp_dir("runtime_plexus_aggregate_count_star");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 10, 1, &[11]).unwrap();
storage_api::put_full_node(&mut handle, 20, 1, &[21]).unwrap();
storage_api::put_full_node(&mut handle, 30, 1, &[31]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 10,
scan_end_exclusive: 31,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(stream.rows.len(), 1);
let only = &stream.rows[0];
assert_eq!(only.node_id, 0);
assert_eq!(only.delta_count, 0);
assert_eq!(only.aggregate_value, Some(3.0));
}
#[test]
fn execute_serialized_plan_aggregate_group_by_node_count_star() {
let base = temp_dir("runtime_plexus_aggregate_group_by_count_star");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Aggregate {
input: 1,
keys: vec![0],
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
}],
schema: Vec::new(),
},
Op::Sort {
input: 2,
keys: vec![0],
dirs: vec![SortDir::Asc],
},
Op::Return { input: 3 },
],
4,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let pairs: Vec<(u64, usize)> = stream
.rows
.iter()
.map(|row| {
(
row.node_id,
row.aggregate_value.unwrap_or_default() as usize,
)
})
.collect();
assert_eq!(pairs, vec![(2, 1), (3, 2)]);
}
#[test]
fn execute_serialized_plan_aggregate_sum_and_avg_over_row_fields() {
let base = temp_dir("runtime_plexus_aggregate_sum_avg");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3, 4]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[5, 6, 7]).unwrap();
storage_api::flush(&mut handle).unwrap();
let sum_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Sum,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let sum_stream = execute_serialized_plan(
&sum_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(sum_stream.rows.len(), 1);
assert_eq!(sum_stream.rows[0].aggregate_value, Some(6.0));
let avg_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Avg,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let avg_stream = execute_serialized_plan(
&avg_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(avg_stream.rows.len(), 1);
assert_eq!(avg_stream.rows[0].aggregate_value, Some(2.0));
}
#[test]
fn execute_serialized_plan_aggregate_grouped_max_over_row_field() {
let base = temp_dir("runtime_plexus_aggregate_grouped_max");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2, 3]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::put_full_node(&mut handle, 3, 1, &[]).unwrap();
storage_api::flush(&mut handle).unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Expand {
input: 0,
src_col: 0,
types: Vec::new(),
dir: ExpandDir::Out,
schema: Vec::new(),
src_var: "n".to_string(),
rel_var: "r".to_string(),
dst_var: "m".to_string(),
legal_src_labels: Vec::new(),
legal_dst_labels: Vec::new(),
est_degree: 1.0,
graph_ref: None,
},
Op::Aggregate {
input: 1,
keys: vec![0],
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Max,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Sort {
input: 2,
keys: vec![0],
dirs: vec![SortDir::Asc],
},
Op::Return { input: 3 },
],
4,
);
let stream = execute_serialized_plan(
&plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 4,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
let pairs: Vec<(u64, usize)> = stream
.rows
.iter()
.map(|row| {
(
row.node_id,
row.aggregate_value.unwrap_or_default() as usize,
)
})
.collect();
assert_eq!(pairs, vec![(2, 1), (3, 0)]);
}
#[test]
fn execute_serialized_plan_aggregate_null_semantics_for_sparse_fields() {
let base = temp_dir("runtime_plexus_aggregate_null_semantics");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
storage_api::put_full_node(&mut handle, 1, 1, &[2]).unwrap();
storage_api::put_full_node(&mut handle, 2, 1, &[3]).unwrap();
storage_api::flush(&mut handle).unwrap();
let count_score_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Count,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "score".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let count_score = execute_serialized_plan(
&count_score_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 3,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(count_score.rows[0].aggregate_value, Some(0.0));
let sum_score_plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Sum,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "score".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let sum_score = execute_serialized_plan(
&sum_score_plan,
&ExecuteParams {
scan_start: 1,
scan_end_exclusive: 3,
morsel_size: 8,
parallel_workers: 1,
},
&mut handle,
)
.unwrap();
assert_eq!(sum_score.rows[0].aggregate_value, None);
}
#[test]
fn execute_serialized_plan_aggregate_rejects_collect() {
let base = temp_dir("runtime_plexus_aggregate_collect_reject");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Collect,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "adjacency_degree".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("Collect"));
}
other => panic!(
"expected unsupported aggregate-collect error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_aggregate_rejects_unsupported_field() {
let base = temp_dir("runtime_plexus_aggregate_bad_field_reject");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let plan = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::Sum,
expr: Some(Box::new(Expr::PropAccess {
col: 0,
prop: "made_up".to_string(),
})),
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err = execute_serialized_plan(&plan, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("unsupported row field"));
}
other => panic!(
"expected unsupported aggregate-field error, got {:?}",
other
),
}
}
#[test]
fn execute_serialized_plan_aggregate_rejects_unsupported_keys_and_arity() {
let base = temp_dir("runtime_plexus_aggregate_bad_keys_reject");
let mut handle = storage_api::open_store(storage_api::StorageConfig {
buffer_pool_pages: 8,
wal_dir: base.join("wal"),
wal_segment_max_bytes: 1 << 20,
manifest_path: base.join("ir.manifest"),
sstable_dir: base.join("sst"),
})
.unwrap();
let bad_keys = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: vec![1],
aggs: vec![Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
}],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err =
execute_serialized_plan(&bad_keys, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("keys=[] or keys=[0]"));
}
other => panic!("expected unsupported aggregate-keys error, got {:?}", other),
}
let bad_arity = serialized_plan_bytes(
(1, 0, 0),
vec![
scan_nodes(),
Op::Aggregate {
input: 0,
keys: Vec::new(),
aggs: vec![
Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
},
Expr::Agg {
fn_: plexus_serde::AggFn::CountStar,
expr: None,
},
],
schema: Vec::new(),
},
Op::Return { input: 1 },
],
2,
);
let err =
execute_serialized_plan(&bad_arity, &ExecuteParams::default(), &mut handle).unwrap_err();
match err {
ExplainError::UnsupportedSerializedOperator(message) => {
assert!(message.contains("exactly one aggregate expression"));
}
other => panic!(
"expected unsupported aggregate-arity error, got {:?}",
other
),
}
}