mod aggregate;
mod expand;
mod expression;
mod filter;
mod join;
mod mutation;
mod project;
mod scan;
#[cfg(feature = "algos")]
use crate::query::plan::CallProcedureOp;
use crate::query::plan::{
AddLabelOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, ApplyOp,
BinaryOp, CreateEdgeOp, CreateNodeOp, DeleteEdgeOp, DeleteNodeOp, DistinctOp,
EntityKind as LogicalEntityKind, ExceptOp, ExpandDirection, ExpandOp, FilterOp,
HorizontalAggregateOp, IntersectOp, JoinOp, JoinType, LeftJoinOp, LimitOp, LogicalExpression,
LogicalOperator, LogicalPlan, MapCollectOp, MergeOp, MergeRelationshipOp, MultiWayJoinOp,
NodeScanOp, OtherwiseOp, PathMode, RemoveLabelOp, ReturnOp, SetPropertyOp, ShortestPathOp,
SkipOp, SortOp, SortOrder, UnaryOp, UnionOp, UnwindOp,
};
use grafeo_common::grafeo_debug_span;
use grafeo_common::types::{EpochId, TransactionId};
use grafeo_common::types::{LogicalType, Value};
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::execution::AdaptiveContext;
use grafeo_core::execution::operators::{
AddLabelOperator, AggregateExpr as PhysicalAggregateExpr, ApplyOperator, ConstraintValidator,
CreateEdgeOperator, CreateNodeOperator, DeleteEdgeOperator, DeleteNodeOperator,
DistinctOperator, EmptyOperator, EntityKind, ExecutionPathMode, ExpandOperator, ExpandStep,
ExpressionPredicate, FactorizedAggregate, FactorizedAggregateOperator, FilterExpression,
FilterOperator, HashAggregateOperator, HashJoinOperator, HorizontalAggregateOperator,
JoinType as PhysicalJoinType, LazyFactorizedChainOperator, LeapfrogJoinOperator,
LoadDataOperator, MapCollectOperator, MergeConfig, MergeOperator, MergeRelationshipConfig,
MergeRelationshipOperator, NestedLoopJoinOperator, NodeListOperator, NullOrder, Operator,
ParameterScanOperator, ProjectExpr, ProjectOperator, PropertySource, RemoveLabelOperator,
ScanOperator, SetPropertyOperator, ShortestPathOperator, SimpleAggregateOperator,
SortDirection, SortKey as PhysicalSortKey, SortOperator, UnionOperator, UnwindOperator,
VariableLengthExpandOperator,
};
use grafeo_core::graph::{Direction, GraphStore, GraphStoreMut};
use std::collections::HashMap;
use std::sync::Arc;
use crate::query::planner::common;
use crate::query::planner::common::expression_to_string;
use crate::query::planner::{
PhysicalPlan, convert_aggregate_function, convert_binary_op, convert_filter_expression,
convert_unary_op, value_to_logical_type,
};
use crate::transaction::TransactionManager;
struct RangeBounds<'a> {
min: Option<&'a Value>,
max: Option<&'a Value>,
min_inclusive: bool,
max_inclusive: bool,
}
pub struct Planner {
pub(super) store: Arc<dyn GraphStore>,
pub(super) write_store: Option<Arc<dyn GraphStoreMut>>,
pub(super) transaction_manager: Option<Arc<TransactionManager>>,
pub(super) transaction_id: Option<TransactionId>,
pub(super) viewing_epoch: EpochId,
pub(super) anon_edge_counter: std::cell::Cell<u32>,
pub(super) factorized_execution: bool,
pub(super) scalar_columns: std::cell::RefCell<std::collections::HashSet<String>>,
pub(super) edge_columns: std::cell::RefCell<std::collections::HashSet<String>>,
pub(super) validator: Option<Arc<dyn ConstraintValidator>>,
pub(super) catalog: Option<Arc<crate::catalog::Catalog>>,
pub(super) correlated_param_state:
std::cell::RefCell<Option<Arc<grafeo_core::execution::operators::ParameterState>>>,
pub(super) group_list_variables: std::cell::RefCell<std::collections::HashSet<String>>,
profiling: std::cell::Cell<bool>,
profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker>,
pub(super) session_context: grafeo_core::execution::operators::SessionContext,
pub(super) read_only: bool,
}
impl Planner {
#[must_use]
pub fn new(store: Arc<dyn GraphStore>) -> Self {
let epoch = store.current_epoch();
Self {
store,
write_store: None,
transaction_manager: None,
transaction_id: None,
viewing_epoch: epoch,
anon_edge_counter: std::cell::Cell::new(0),
factorized_execution: true,
scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
validator: None,
catalog: None,
correlated_param_state: std::cell::RefCell::new(None),
group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
profiling: std::cell::Cell::new(false),
profile_entries: std::cell::RefCell::new(Vec::new()),
write_tracker: None,
session_context: grafeo_core::execution::operators::SessionContext::default(),
read_only: false,
}
}
#[must_use]
pub fn with_context(
store: Arc<dyn GraphStore>,
write_store: Option<Arc<dyn GraphStoreMut>>,
transaction_manager: Arc<TransactionManager>,
transaction_id: Option<TransactionId>,
viewing_epoch: EpochId,
) -> Self {
use crate::transaction::TransactionWriteTracker;
let write_tracker: Option<grafeo_core::execution::operators::SharedWriteTracker> =
if transaction_id.is_some() {
Some(Arc::new(TransactionWriteTracker::new(Arc::clone(
&transaction_manager,
))))
} else {
None
};
Self {
store,
write_store,
transaction_manager: Some(transaction_manager),
transaction_id,
viewing_epoch,
anon_edge_counter: std::cell::Cell::new(0),
factorized_execution: true,
scalar_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
edge_columns: std::cell::RefCell::new(std::collections::HashSet::new()),
validator: None,
catalog: None,
correlated_param_state: std::cell::RefCell::new(None),
group_list_variables: std::cell::RefCell::new(std::collections::HashSet::new()),
profiling: std::cell::Cell::new(false),
profile_entries: std::cell::RefCell::new(Vec::new()),
write_tracker,
session_context: grafeo_core::execution::operators::SessionContext::default(),
read_only: false,
}
}
#[must_use]
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
fn write_store(&self) -> Result<Arc<dyn GraphStoreMut>> {
self.write_store
.as_ref()
.map(Arc::clone)
.ok_or(Error::Transaction(
grafeo_common::utils::error::TransactionError::ReadOnly,
))
}
#[must_use]
pub fn viewing_epoch(&self) -> EpochId {
self.viewing_epoch
}
#[must_use]
pub fn transaction_id(&self) -> Option<TransactionId> {
self.transaction_id
}
#[must_use]
pub fn transaction_manager(&self) -> Option<&Arc<TransactionManager>> {
self.transaction_manager.as_ref()
}
#[must_use]
pub fn with_factorized_execution(mut self, enabled: bool) -> Self {
self.factorized_execution = enabled;
self
}
#[must_use]
pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
self.validator = Some(validator);
self
}
#[must_use]
pub fn with_catalog(mut self, catalog: Arc<crate::catalog::Catalog>) -> Self {
self.catalog = Some(catalog);
self
}
#[must_use]
pub fn with_session_context(
mut self,
context: grafeo_core::execution::operators::SessionContext,
) -> Self {
self.session_context = context;
self
}
fn count_expand_chain(op: &LogicalOperator) -> (usize, &LogicalOperator) {
match op {
LogicalOperator::Expand(expand) => {
let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
if is_single_hop {
let (inner_count, base) = Self::count_expand_chain(&expand.input);
(inner_count + 1, base)
} else {
(0, op)
}
}
_ => (0, op),
}
}
fn collect_expand_chain(op: &LogicalOperator) -> Vec<&ExpandOp> {
let mut chain = Vec::new();
let mut current = op;
while let LogicalOperator::Expand(expand) = current {
let is_single_hop = expand.min_hops == 1 && expand.max_hops == Some(1);
if !is_single_hop {
break;
}
chain.push(expand);
current = &expand.input;
}
chain.reverse();
chain
}
pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
let _span = grafeo_debug_span!("grafeo::query::plan");
let (operator, columns) = self.plan_operator(&logical_plan.root)?;
Ok(PhysicalPlan {
operator,
columns,
adaptive_context: None,
})
}
pub fn plan_profiled(
&self,
logical_plan: &LogicalPlan,
) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
self.profiling.set(true);
self.profile_entries.borrow_mut().clear();
let result = self.plan_operator(&logical_plan.root);
self.profiling.set(false);
let (operator, columns) = result?;
let entries = self.profile_entries.borrow_mut().drain(..).collect();
Ok((
PhysicalPlan {
operator,
columns,
adaptive_context: None,
},
entries,
))
}
pub fn plan_adaptive(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
let (operator, columns) = self.plan_operator(&logical_plan.root)?;
let mut adaptive_context = AdaptiveContext::new();
self.collect_cardinality_estimates(&logical_plan.root, &mut adaptive_context, 0);
Ok(PhysicalPlan {
operator,
columns,
adaptive_context: Some(adaptive_context),
})
}
fn collect_cardinality_estimates(
&self,
op: &LogicalOperator,
ctx: &mut AdaptiveContext,
depth: usize,
) {
match op {
LogicalOperator::NodeScan(scan) => {
let estimate = if let Some(label) = &scan.label {
self.store.nodes_by_label(label).len() as f64
} else {
self.store.node_count() as f64
};
let id = format!("scan_{}", scan.variable);
ctx.set_estimate(&id, estimate);
if let Some(input) = &scan.input {
self.collect_cardinality_estimates(input, ctx, depth + 1);
}
}
LogicalOperator::Filter(filter) => {
let input_estimate = self.estimate_cardinality(&filter.input);
let estimate = input_estimate * 0.3;
let id = format!("filter_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&filter.input, ctx, depth + 1);
}
LogicalOperator::Expand(expand) => {
let input_estimate = self.estimate_cardinality(&expand.input);
let stats = self.store.statistics();
let avg_degree = self.estimate_expand_degree(&stats, expand);
let estimate = input_estimate * avg_degree;
let id = format!("expand_{}", expand.to_variable);
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&expand.input, ctx, depth + 1);
}
LogicalOperator::Join(join) => {
let left_est = self.estimate_cardinality(&join.left);
let right_est = self.estimate_cardinality(&join.right);
let estimate = (left_est * right_est).sqrt();
let id = format!("join_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&join.left, ctx, depth + 1);
self.collect_cardinality_estimates(&join.right, ctx, depth + 1);
}
LogicalOperator::Aggregate(agg) => {
let input_estimate = self.estimate_cardinality(&agg.input);
let estimate = if agg.group_by.is_empty() {
1.0
} else {
(input_estimate * 0.1).max(1.0)
};
let id = format!("aggregate_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&agg.input, ctx, depth + 1);
}
LogicalOperator::Distinct(distinct) => {
let input_estimate = self.estimate_cardinality(&distinct.input);
let estimate = (input_estimate * 0.5).max(1.0);
let id = format!("distinct_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&distinct.input, ctx, depth + 1);
}
LogicalOperator::Return(ret) => {
self.collect_cardinality_estimates(&ret.input, ctx, depth + 1);
}
LogicalOperator::Limit(limit) => {
let input_estimate = self.estimate_cardinality(&limit.input);
let estimate = (input_estimate).min(limit.count.estimate());
let id = format!("limit_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&limit.input, ctx, depth + 1);
}
LogicalOperator::Skip(skip) => {
let input_estimate = self.estimate_cardinality(&skip.input);
let estimate = (input_estimate - skip.count.estimate()).max(0.0);
let id = format!("skip_{depth}");
ctx.set_estimate(&id, estimate);
self.collect_cardinality_estimates(&skip.input, ctx, depth + 1);
}
LogicalOperator::Sort(sort) => {
self.collect_cardinality_estimates(&sort.input, ctx, depth + 1);
}
LogicalOperator::Union(union) => {
let estimate: f64 = union
.inputs
.iter()
.map(|input| self.estimate_cardinality(input))
.sum();
let id = format!("union_{depth}");
ctx.set_estimate(&id, estimate);
for input in &union.inputs {
self.collect_cardinality_estimates(input, ctx, depth + 1);
}
}
_ => {
}
}
}
fn estimate_cardinality(&self, op: &LogicalOperator) -> f64 {
match op {
LogicalOperator::NodeScan(scan) => {
if let Some(label) = &scan.label {
self.store.nodes_by_label(label).len() as f64
} else {
self.store.node_count() as f64
}
}
LogicalOperator::Filter(filter) => self.estimate_cardinality(&filter.input) * 0.3,
LogicalOperator::Expand(expand) => {
let stats = self.store.statistics();
let avg_degree = self.estimate_expand_degree(&stats, expand);
self.estimate_cardinality(&expand.input) * avg_degree
}
LogicalOperator::Join(join) => {
let left = self.estimate_cardinality(&join.left);
let right = self.estimate_cardinality(&join.right);
(left * right).sqrt()
}
LogicalOperator::Aggregate(agg) => {
if agg.group_by.is_empty() {
1.0
} else {
(self.estimate_cardinality(&agg.input) * 0.1).max(1.0)
}
}
LogicalOperator::Distinct(distinct) => {
(self.estimate_cardinality(&distinct.input) * 0.5).max(1.0)
}
LogicalOperator::Return(ret) => self.estimate_cardinality(&ret.input),
LogicalOperator::Limit(limit) => self
.estimate_cardinality(&limit.input)
.min(limit.count.estimate()),
LogicalOperator::Skip(skip) => {
(self.estimate_cardinality(&skip.input) - skip.count.estimate()).max(0.0)
}
LogicalOperator::Sort(sort) => self.estimate_cardinality(&sort.input),
LogicalOperator::Union(union) => union
.inputs
.iter()
.map(|input| self.estimate_cardinality(input))
.sum(),
LogicalOperator::Except(except) => {
let left = self.estimate_cardinality(&except.left);
let right = self.estimate_cardinality(&except.right);
(left - right).max(0.0)
}
LogicalOperator::Intersect(intersect) => {
let left = self.estimate_cardinality(&intersect.left);
let right = self.estimate_cardinality(&intersect.right);
left.min(right)
}
LogicalOperator::Otherwise(otherwise) => self
.estimate_cardinality(&otherwise.left)
.max(self.estimate_cardinality(&otherwise.right)),
_ => 1000.0,
}
}
fn estimate_expand_degree(
&self,
stats: &grafeo_core::statistics::Statistics,
expand: &ExpandOp,
) -> f64 {
let outgoing = !matches!(expand.direction, ExpandDirection::Incoming);
if expand.edge_types.len() == 1 {
stats.estimate_avg_degree(&expand.edge_types[0], outgoing)
} else if stats.total_nodes > 0 {
(stats.total_edges as f64 / stats.total_nodes as f64).max(1.0)
} else {
10.0
}
}
fn maybe_profile(
&self,
result: Result<(Box<dyn Operator>, Vec<String>)>,
op: &LogicalOperator,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
if self.profiling.get() {
let (physical, columns) = result?;
let (entry, stats) =
crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
self.profile_entries.borrow_mut().push(entry);
Ok((Box::new(profiled), columns))
} else {
result
}
}
fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
let result = match op {
LogicalOperator::NodeScan(scan) => self.plan_node_scan(scan),
LogicalOperator::Expand(expand) => {
if self.factorized_execution {
let (chain_len, _base) = Self::count_expand_chain(op);
if chain_len >= 2 {
return self.maybe_profile(self.plan_expand_chain(op), op);
}
}
self.plan_expand(expand)
}
LogicalOperator::Return(ret) => self.plan_return(ret),
LogicalOperator::Filter(filter) => self.plan_filter(filter),
LogicalOperator::Project(project) => self.plan_project(project),
LogicalOperator::Limit(limit) => self.plan_limit(limit),
LogicalOperator::Skip(skip) => self.plan_skip(skip),
LogicalOperator::Sort(sort) => self.plan_sort(sort),
LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
LogicalOperator::Join(join) => self.plan_join(join),
LogicalOperator::Union(union) => self.plan_union(union),
LogicalOperator::Except(except) => self.plan_except(except),
LogicalOperator::Intersect(intersect) => self.plan_intersect(intersect),
LogicalOperator::Otherwise(otherwise) => self.plan_otherwise(otherwise),
LogicalOperator::Apply(apply) => self.plan_apply(apply),
LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
LogicalOperator::CreateNode(create) => self.plan_create_node(create),
LogicalOperator::CreateEdge(create) => self.plan_create_edge(create),
LogicalOperator::DeleteNode(delete) => self.plan_delete_node(delete),
LogicalOperator::DeleteEdge(delete) => self.plan_delete_edge(delete),
LogicalOperator::LeftJoin(left_join) => self.plan_left_join(left_join),
LogicalOperator::AntiJoin(anti_join) => self.plan_anti_join(anti_join),
LogicalOperator::Unwind(unwind) => self.plan_unwind(unwind),
LogicalOperator::Merge(merge) => self.plan_merge(merge),
LogicalOperator::MergeRelationship(merge_rel) => {
self.plan_merge_relationship(merge_rel)
}
LogicalOperator::AddLabel(add_label) => self.plan_add_label(add_label),
LogicalOperator::RemoveLabel(remove_label) => self.plan_remove_label(remove_label),
LogicalOperator::SetProperty(set_prop) => self.plan_set_property(set_prop),
LogicalOperator::ShortestPath(sp) => self.plan_shortest_path(sp),
LogicalOperator::MapCollect(mc) => self.plan_map_collect(mc),
#[cfg(feature = "algos")]
LogicalOperator::CallProcedure(call) => self.plan_call_procedure(call),
#[cfg(not(feature = "algos"))]
LogicalOperator::CallProcedure(_) => Err(Error::Internal(
"CALL procedures require the 'algos' feature".to_string(),
)),
LogicalOperator::ParameterScan(_param_scan) => {
let state = self
.correlated_param_state
.borrow()
.clone()
.ok_or_else(|| {
Error::Internal(
"ParameterScan without correlated Apply context".to_string(),
)
})?;
let columns = state.columns.clone();
let operator: Box<dyn Operator> = Box::new(ParameterScanOperator::new(state));
Ok((operator, columns))
}
LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
LogicalOperator::HorizontalAggregate(ha) => self.plan_horizontal_aggregate(ha),
LogicalOperator::LoadData(load) => {
let operator: Box<dyn Operator> = Box::new(LoadDataOperator::new(
load.path.clone(),
load.format,
load.with_headers,
load.field_terminator,
load.variable.clone(),
));
Ok((operator, vec![load.variable.clone()]))
}
LogicalOperator::Empty => Err(Error::Internal("Empty plan".to_string())),
LogicalOperator::VectorScan(_) => Err(Error::Internal(
"VectorScan requires vector-index feature".to_string(),
)),
LogicalOperator::VectorJoin(_) => Err(Error::Internal(
"VectorJoin requires vector-index feature".to_string(),
)),
_ => Err(Error::Internal(format!(
"Unsupported operator: {:?}",
std::mem::discriminant(op)
))),
};
self.maybe_profile(result, op)
}
fn plan_horizontal_aggregate(
&self,
ha: &HorizontalAggregateOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (child_op, child_columns) = self.plan_operator(&ha.input)?;
let list_col_idx = child_columns
.iter()
.position(|c| c == &ha.list_column)
.ok_or_else(|| {
Error::Internal(format!(
"HorizontalAggregate list column '{}' not found in {:?}",
ha.list_column, child_columns
))
})?;
let entity_kind = match ha.entity_kind {
LogicalEntityKind::Edge => EntityKind::Edge,
LogicalEntityKind::Node => EntityKind::Node,
};
let function = convert_aggregate_function(ha.function);
let input_column_count = child_columns.len();
let operator: Box<dyn Operator> = Box::new(HorizontalAggregateOperator::new(
child_op,
list_col_idx,
entity_kind,
function,
ha.property.clone(),
Arc::clone(&self.store) as Arc<dyn GraphStore>,
input_column_count,
));
let mut columns = child_columns;
columns.push(ha.alias.clone());
self.scalar_columns.borrow_mut().insert(ha.alias.clone());
Ok((operator, columns))
}
fn plan_map_collect(&self, mc: &MapCollectOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (child_op, child_columns) = self.plan_operator(&mc.input)?;
let key_idx = child_columns
.iter()
.position(|c| c == &mc.key_var)
.ok_or_else(|| {
Error::Internal(format!(
"MapCollect key '{}' not in columns {:?}",
mc.key_var, child_columns
))
})?;
let value_idx = child_columns
.iter()
.position(|c| c == &mc.value_var)
.ok_or_else(|| {
Error::Internal(format!(
"MapCollect value '{}' not in columns {:?}",
mc.value_var, child_columns
))
})?;
let operator = Box::new(MapCollectOperator::new(child_op, key_idx, value_idx));
self.scalar_columns.borrow_mut().insert(mc.alias.clone());
Ok((operator, vec![mc.alias.clone()]))
}
}
#[cfg(feature = "algos")]
struct StaticResultOperator {
rows: Vec<Vec<Value>>,
column_indices: Vec<usize>,
row_index: usize,
}
#[cfg(feature = "algos")]
impl Operator for StaticResultOperator {
fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
use grafeo_core::execution::DataChunk;
if self.row_index >= self.rows.len() {
return Ok(None);
}
let remaining = self.rows.len() - self.row_index;
let chunk_rows = remaining.min(1024);
let col_count = self.column_indices.len();
let col_types: Vec<LogicalType> = vec![LogicalType::Any; col_count];
let mut chunk = DataChunk::with_capacity(&col_types, chunk_rows);
for row_offset in 0..chunk_rows {
let row = &self.rows[self.row_index + row_offset];
for (col_idx, &src_idx) in self.column_indices.iter().enumerate() {
let value = row.get(src_idx).cloned().unwrap_or(Value::Null);
if let Some(col) = chunk.column_mut(col_idx) {
col.push_value(value);
}
}
}
chunk.set_count(chunk_rows);
self.row_index += chunk_rows;
Ok(Some(chunk))
}
fn reset(&mut self) {
self.row_index = 0;
}
fn name(&self) -> &'static str {
"StaticResult"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query::plan::{
AggregateExpr as LogicalAggregateExpr, CreateEdgeOp, CreateNodeOp, DeleteNodeOp,
DistinctOp as LogicalDistinctOp, ExpandOp, FilterOp, JoinCondition, JoinOp,
LimitOp as LogicalLimitOp, NodeScanOp, PathMode, ReturnItem, ReturnOp,
SkipOp as LogicalSkipOp, SortKey, SortOp,
};
use grafeo_common::types::Value;
use grafeo_core::execution::operators::AggregateFunction as PhysicalAggregateFunction;
use grafeo_core::graph::GraphStoreMut;
use grafeo_core::graph::lpg::LpgStore;
fn create_test_store() -> Arc<LpgStore> {
let store = Arc::new(LpgStore::new().unwrap());
store.create_node(&["Person"]);
store.create_node(&["Person"]);
store.create_node(&["Company"]);
store
}
#[test]
fn test_plan_simple_scan() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_scan_without_label() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_return_with_alias() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: Some("person".to_string()),
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["person"]);
}
#[test]
fn test_plan_return_property() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Property {
variable: "n".to_string(),
property: "name".to_string(),
},
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n.name"]);
}
#[test]
fn test_plan_return_literal() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Literal(Value::Int64(42)),
alias: Some("answer".to_string()),
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["answer"]);
}
#[test]
fn test_plan_filter_equality() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
op: BinaryOp::Eq,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_filter_compound_and() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(20))),
}),
op: BinaryOp::And,
right: Box::new(LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
op: BinaryOp::Lt,
right: Box::new(LogicalExpression::Literal(Value::Int64(40))),
}),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_filter_unary_not() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Unary {
op: UnaryOp::Not,
operand: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "active".to_string(),
}),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_filter_is_null() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Unary {
op: UnaryOp::IsNull,
operand: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "email".to_string(),
}),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_filter_function_call() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::FunctionCall {
name: "size".to_string(),
args: vec![LogicalExpression::Property {
variable: "n".to_string(),
property: "friends".to_string(),
}],
distinct: false,
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(0))),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_expand_outgoing() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: Some("Person".to_string()),
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_expand_with_edge_variable() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("r".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: Some("r".to_string()),
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"r".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_limit() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
count: 10.into(),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_skip() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
count: 5.into(),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_sort() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Sort(SortOp {
keys: vec![SortKey {
expression: LogicalExpression::Variable("n".to_string()),
order: SortOrder::Ascending,
nulls: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_sort_descending() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Sort(SortOp {
keys: vec![SortKey {
expression: LogicalExpression::Variable("n".to_string()),
order: SortOrder::Descending,
nulls: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_distinct() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
columns: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_distinct_with_columns() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
columns: Some(vec!["n".to_string()]),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_distinct_with_nonexistent_columns() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
columns: Some(vec!["nonexistent".to_string()]),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
}
#[test]
fn test_plan_aggregate_count() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("cnt".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![LogicalAggregateExpr {
function: LogicalAggregateFunction::Count,
expression: Some(LogicalExpression::Variable("n".to_string())),
expression2: None,
distinct: false,
alias: Some("cnt".to_string()),
percentile: None,
separator: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
having: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"cnt".to_string()));
}
#[test]
fn test_plan_aggregate_with_group_by() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![LogicalExpression::Property {
variable: "n".to_string(),
property: "city".to_string(),
}],
aggregates: vec![LogicalAggregateExpr {
function: LogicalAggregateFunction::Count,
expression: Some(LogicalExpression::Variable("n".to_string())),
expression2: None,
distinct: false,
alias: Some("cnt".to_string()),
percentile: None,
separator: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
having: None,
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns().len(), 2);
}
#[test]
fn test_plan_aggregate_sum() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![LogicalAggregateExpr {
function: LogicalAggregateFunction::Sum,
expression: Some(LogicalExpression::Property {
variable: "n".to_string(),
property: "value".to_string(),
}),
expression2: None,
distinct: false,
alias: Some("total".to_string()),
percentile: None,
separator: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
having: None,
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"total".to_string()));
}
#[test]
fn test_plan_aggregate_avg() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![LogicalAggregateExpr {
function: LogicalAggregateFunction::Avg,
expression: Some(LogicalExpression::Property {
variable: "n".to_string(),
property: "score".to_string(),
}),
expression2: None,
distinct: false,
alias: Some("average".to_string()),
percentile: None,
separator: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
having: None,
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"average".to_string()));
}
#[test]
fn test_plan_aggregate_min_max() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![
LogicalAggregateExpr {
function: LogicalAggregateFunction::Min,
expression: Some(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
expression2: None,
distinct: false,
alias: Some("youngest".to_string()),
percentile: None,
separator: None,
},
LogicalAggregateExpr {
function: LogicalAggregateFunction::Max,
expression: Some(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
expression2: None,
distinct: false,
alias: Some("oldest".to_string()),
percentile: None,
separator: None,
},
],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
having: None,
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"youngest".to_string()));
assert!(physical.columns().contains(&"oldest".to_string()));
}
#[test]
fn test_plan_inner_join() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Join(JoinOp {
left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: Some("Person".to_string()),
input: None,
})),
right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: Some("Company".to_string()),
input: None,
})),
join_type: JoinType::Inner,
conditions: vec![JoinCondition {
left: LogicalExpression::Variable("a".to_string()),
right: LogicalExpression::Variable("b".to_string()),
}],
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_cross_join() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: None,
input: None,
})),
join_type: JoinType::Cross,
conditions: vec![],
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns().len(), 2);
}
#[test]
fn test_plan_left_join() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Join(JoinOp {
left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: None,
input: None,
})),
join_type: JoinType::Left,
conditions: vec![],
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns().len(), 2);
}
fn create_writable_planner(store: &Arc<LpgStore>) -> Planner {
let mut p = Planner::new(Arc::clone(store) as Arc<dyn GraphStore>);
p.write_store = Some(Arc::clone(store) as Arc<dyn GraphStoreMut>);
p
}
#[test]
fn test_plan_create_node() {
let store = create_test_store();
let planner = create_writable_planner(&store);
let logical = LogicalPlan::new(LogicalOperator::CreateNode(CreateNodeOp {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
properties: vec![(
"name".to_string(),
LogicalExpression::Literal(Value::String("Alix".into())),
)],
input: None,
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"n".to_string()));
}
#[test]
fn test_plan_create_edge() {
let store = create_test_store();
let planner = create_writable_planner(&store);
let logical = LogicalPlan::new(LogicalOperator::CreateEdge(CreateEdgeOp {
variable: Some("r".to_string()),
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_type: "KNOWS".to_string(),
properties: vec![],
input: Box::new(LogicalOperator::Join(JoinOp {
left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: None,
input: None,
})),
join_type: JoinType::Cross,
conditions: vec![],
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"r".to_string()));
}
#[test]
fn test_plan_delete_node() {
let store = create_test_store();
let planner = create_writable_planner(&store);
let logical = LogicalPlan::new(LogicalOperator::DeleteNode(DeleteNodeOp {
variable: "n".to_string(),
detach: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"n".to_string()));
}
#[test]
fn test_plan_empty_errors() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Empty);
let result = planner.plan(&logical);
assert!(result.is_err());
}
#[test]
fn test_plan_missing_variable_in_return() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("missing".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
}));
let result = planner.plan(&logical);
assert!(result.is_err());
}
#[test]
fn test_convert_binary_ops() {
assert!(convert_binary_op(BinaryOp::Eq).is_ok());
assert!(convert_binary_op(BinaryOp::Ne).is_ok());
assert!(convert_binary_op(BinaryOp::Lt).is_ok());
assert!(convert_binary_op(BinaryOp::Le).is_ok());
assert!(convert_binary_op(BinaryOp::Gt).is_ok());
assert!(convert_binary_op(BinaryOp::Ge).is_ok());
assert!(convert_binary_op(BinaryOp::And).is_ok());
assert!(convert_binary_op(BinaryOp::Or).is_ok());
assert!(convert_binary_op(BinaryOp::Add).is_ok());
assert!(convert_binary_op(BinaryOp::Sub).is_ok());
assert!(convert_binary_op(BinaryOp::Mul).is_ok());
assert!(convert_binary_op(BinaryOp::Div).is_ok());
}
#[test]
fn test_convert_unary_ops() {
assert!(convert_unary_op(UnaryOp::Not).is_ok());
assert!(convert_unary_op(UnaryOp::IsNull).is_ok());
assert!(convert_unary_op(UnaryOp::IsNotNull).is_ok());
assert!(convert_unary_op(UnaryOp::Neg).is_ok());
}
#[test]
fn test_convert_aggregate_functions() {
assert!(matches!(
convert_aggregate_function(LogicalAggregateFunction::Count),
PhysicalAggregateFunction::Count
));
assert!(matches!(
convert_aggregate_function(LogicalAggregateFunction::Sum),
PhysicalAggregateFunction::Sum
));
assert!(matches!(
convert_aggregate_function(LogicalAggregateFunction::Avg),
PhysicalAggregateFunction::Avg
));
assert!(matches!(
convert_aggregate_function(LogicalAggregateFunction::Min),
PhysicalAggregateFunction::Min
));
assert!(matches!(
convert_aggregate_function(LogicalAggregateFunction::Max),
PhysicalAggregateFunction::Max
));
}
#[test]
fn test_planner_accessors() {
let store = create_test_store();
let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>);
assert!(planner.transaction_id().is_none());
assert!(planner.transaction_manager().is_none());
let _ = planner.viewing_epoch(); }
#[test]
fn test_physical_plan_accessors() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
}));
let physical = planner.plan(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
let _ = physical.into_operator();
}
#[test]
fn test_plan_adaptive_with_scan() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert_eq!(physical.columns(), &["n"]);
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_filter() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Filter(FilterOp {
predicate: LogicalExpression::Binary {
left: Box::new(LogicalExpression::Property {
variable: "n".to_string(),
property: "age".to_string(),
}),
op: BinaryOp::Gt,
right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
},
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
pushdown_hint: None,
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_expand() {
let store = create_test_store();
let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
.with_factorized_execution(false);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_join() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Join(JoinOp {
left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: None,
input: None,
})),
join_type: JoinType::Cross,
conditions: vec![],
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_aggregate() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Aggregate(AggregateOp {
group_by: vec![],
aggregates: vec![LogicalAggregateExpr {
function: LogicalAggregateFunction::Count,
expression: Some(LogicalExpression::Variable("n".to_string())),
expression2: None,
distinct: false,
alias: Some("cnt".to_string()),
percentile: None,
separator: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
having: None,
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_distinct() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Distinct(LogicalDistinctOp {
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
columns: None,
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_limit() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Limit(LogicalLimitOp {
count: 10.into(),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_skip() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Skip(LogicalSkipOp {
count: 5.into(),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_sort() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Sort(SortOp {
keys: vec![SortKey {
expression: LogicalExpression::Variable("n".to_string()),
order: SortOrder::Ascending,
nulls: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_adaptive_with_union() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Union(UnionOp {
inputs: vec![
LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Person".to_string()),
input: None,
}),
LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: Some("Company".to_string()),
input: None,
}),
],
})),
}));
let physical = planner.plan_adaptive(&logical).unwrap();
assert!(physical.adaptive_context.is_some());
}
#[test]
fn test_plan_expand_variable_length() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(3),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_expand_with_path_alias() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(3),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: Some("p".to_string()),
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_expand_incoming() {
let store = create_test_store();
let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
.with_factorized_execution(false);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Incoming,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_plan_expand_both_directions() {
let store = create_test_store();
let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
.with_factorized_execution(false);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Both,
edge_types: vec!["KNOWS".to_string()],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
#[test]
fn test_planner_with_context() {
use crate::transaction::TransactionManager;
let store = create_test_store();
let transaction_manager = Arc::new(TransactionManager::new());
let transaction_id = transaction_manager.begin();
let epoch = transaction_manager.current_epoch();
let planner = Planner::with_context(
Arc::clone(&store) as Arc<dyn GraphStore>,
Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>),
Arc::clone(&transaction_manager),
Some(transaction_id),
epoch,
);
assert_eq!(planner.transaction_id(), Some(transaction_id));
assert!(planner.transaction_manager().is_some());
assert_eq!(planner.viewing_epoch(), epoch);
}
#[test]
fn test_planner_with_factorized_execution_disabled() {
let store = create_test_store();
let planner = Planner::new(Arc::clone(&store) as Arc<dyn GraphStore>)
.with_factorized_execution(false);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("c".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "b".to_string(),
to_variable: "c".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec![],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::Expand(ExpandOp {
from_variable: "a".to_string(),
to_variable: "b".to_string(),
edge_variable: None,
direction: ExpandDirection::Outgoing,
edge_types: vec![],
min_hops: 1,
max_hops: Some(1),
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: None,
input: None,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
path_alias: None,
path_mode: PathMode::Walk,
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"c".to_string()));
}
#[test]
fn test_plan_sort_by_property() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![ReturnItem {
expression: LogicalExpression::Variable("n".to_string()),
alias: None,
}],
distinct: false,
input: Box::new(LogicalOperator::Sort(SortOp {
keys: vec![SortKey {
expression: LogicalExpression::Property {
variable: "n".to_string(),
property: "name".to_string(),
},
order: SortOrder::Ascending,
nulls: None,
}],
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "n".to_string(),
label: None,
input: None,
})),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"n".to_string()));
}
#[test]
fn test_plan_scan_with_input() {
let store = create_test_store();
let planner = Planner::new(store);
let logical = LogicalPlan::new(LogicalOperator::Return(ReturnOp {
items: vec![
ReturnItem {
expression: LogicalExpression::Variable("a".to_string()),
alias: None,
},
ReturnItem {
expression: LogicalExpression::Variable("b".to_string()),
alias: None,
},
],
distinct: false,
input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "b".to_string(),
label: Some("Company".to_string()),
input: Some(Box::new(LogicalOperator::NodeScan(NodeScanOp {
variable: "a".to_string(),
label: Some("Person".to_string()),
input: None,
}))),
})),
}));
let physical = planner.plan(&logical).unwrap();
assert!(physical.columns().contains(&"a".to_string()));
assert!(physical.columns().contains(&"b".to_string()));
}
}