Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6//!
7//! This module contains shared infrastructure used by both the LPG and RDF planners:
8//! - [`PhysicalPlan`] - the output of planning
9//! - Expression and operator conversion functions
10//! - Reusable operator builders (in the common submodule)
11//!
12//! Model-specific planning lives in [`lpg`] and [`rdf`].
13
14pub(crate) mod common;
15pub mod lpg;
16
17#[cfg(feature = "rdf")]
18pub mod rdf;
19
20// Re-export the LPG planner as the default `Planner` for backwards compatibility.
21pub use lpg::Planner;
22
23use crate::query::plan::{
24    AggregateFunction as LogicalAggregateFunction, BinaryOp, LogicalExpression, UnaryOp,
25};
26use grafeo_common::types::LogicalType;
27use grafeo_common::utils::error::{Error, Result};
28use grafeo_core::execution::AdaptiveContext;
29use grafeo_core::execution::operators::{
30    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, FilterExpression, Operator,
31    UnaryFilterOp,
32};
33
34/// A physical plan ready for execution.
35pub struct PhysicalPlan {
36    /// The root physical operator.
37    pub operator: Box<dyn Operator>,
38    /// Column names for the result.
39    pub columns: Vec<String>,
40    /// Adaptive execution context with cardinality estimates.
41    ///
42    /// When adaptive execution is enabled, this context contains estimated
43    /// cardinalities at various checkpoints in the plan. During execution,
44    /// actual row counts are recorded and compared against estimates.
45    pub adaptive_context: Option<AdaptiveContext>,
46}
47
48impl PhysicalPlan {
49    /// Returns the column names.
50    #[must_use]
51    pub fn columns(&self) -> &[String] {
52        &self.columns
53    }
54
55    /// Consumes the plan and returns the operator.
56    pub fn into_operator(self) -> Box<dyn Operator> {
57        self.operator
58    }
59
60    /// Returns the adaptive context, if adaptive execution is enabled.
61    #[must_use]
62    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
63        self.adaptive_context.as_ref()
64    }
65
66    /// Takes ownership of the adaptive context.
67    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
68        self.adaptive_context.take()
69    }
70}
71
72// ---------------------------------------------------------------------------
73// Shared conversion functions (used by both LPG and RDF planners)
74// ---------------------------------------------------------------------------
75
76/// Converts a logical binary operator to a filter binary operator.
77pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
78    match op {
79        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
80        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
81        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
82        BinaryOp::Le => Ok(BinaryFilterOp::Le),
83        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
84        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
85        BinaryOp::And => Ok(BinaryFilterOp::And),
86        BinaryOp::Or => Ok(BinaryFilterOp::Or),
87        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
88        BinaryOp::Add => Ok(BinaryFilterOp::Add),
89        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
90        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
91        BinaryOp::Div => Ok(BinaryFilterOp::Div),
92        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
93        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
94        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
95        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
96        BinaryOp::In => Ok(BinaryFilterOp::In),
97        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
98        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
99        BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
100        BinaryOp::Like => Ok(BinaryFilterOp::Like),
101    }
102}
103
104/// Converts a logical unary operator to a filter unary operator.
105pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
106    match op {
107        UnaryOp::Not => Ok(UnaryFilterOp::Not),
108        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
109        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
110        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
111    }
112}
113
114/// Converts a logical aggregate function to a physical aggregate function.
115pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
116    match func {
117        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
118        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
119        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
120        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
121        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
122        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
123        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
124        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
125        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
126        LogicalAggregateFunction::Variance => PhysicalAggregateFunction::Variance,
127        LogicalAggregateFunction::VariancePop => PhysicalAggregateFunction::VariancePop,
128        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
129        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
130        LogicalAggregateFunction::GroupConcat => PhysicalAggregateFunction::GroupConcat,
131        LogicalAggregateFunction::Sample => PhysicalAggregateFunction::Sample,
132        LogicalAggregateFunction::CovarSamp => PhysicalAggregateFunction::CovarSamp,
133        LogicalAggregateFunction::CovarPop => PhysicalAggregateFunction::CovarPop,
134        LogicalAggregateFunction::Corr => PhysicalAggregateFunction::Corr,
135        LogicalAggregateFunction::RegrSlope => PhysicalAggregateFunction::RegrSlope,
136        LogicalAggregateFunction::RegrIntercept => PhysicalAggregateFunction::RegrIntercept,
137        LogicalAggregateFunction::RegrR2 => PhysicalAggregateFunction::RegrR2,
138        LogicalAggregateFunction::RegrCount => PhysicalAggregateFunction::RegrCount,
139        LogicalAggregateFunction::RegrSxx => PhysicalAggregateFunction::RegrSxx,
140        LogicalAggregateFunction::RegrSyy => PhysicalAggregateFunction::RegrSyy,
141        LogicalAggregateFunction::RegrSxy => PhysicalAggregateFunction::RegrSxy,
142        LogicalAggregateFunction::RegrAvgx => PhysicalAggregateFunction::RegrAvgx,
143        LogicalAggregateFunction::RegrAvgy => PhysicalAggregateFunction::RegrAvgy,
144    }
145}
146
147/// Converts a logical expression to a filter expression.
148///
149/// This is a standalone function used by both LPG and RDF planners.
150pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
151    match expr {
152        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
153        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
154        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
155            variable: variable.clone(),
156            property: property.clone(),
157        }),
158        LogicalExpression::Binary { left, op, right } => {
159            let left_expr = convert_filter_expression(left)?;
160            let right_expr = convert_filter_expression(right)?;
161            let filter_op = convert_binary_op(*op)?;
162            Ok(FilterExpression::Binary {
163                left: Box::new(left_expr),
164                op: filter_op,
165                right: Box::new(right_expr),
166            })
167        }
168        LogicalExpression::Unary { op, operand } => {
169            let operand_expr = convert_filter_expression(operand)?;
170            let filter_op = convert_unary_op(*op)?;
171            Ok(FilterExpression::Unary {
172                op: filter_op,
173                operand: Box::new(operand_expr),
174            })
175        }
176        LogicalExpression::FunctionCall { name, args, .. } => {
177            let filter_args: Vec<FilterExpression> = args
178                .iter()
179                .map(convert_filter_expression)
180                .collect::<Result<Vec<_>>>()?;
181            Ok(FilterExpression::FunctionCall {
182                name: name.clone(),
183                args: filter_args,
184            })
185        }
186        LogicalExpression::Case {
187            operand,
188            when_clauses,
189            else_clause,
190        } => {
191            let filter_operand = operand
192                .as_ref()
193                .map(|e| convert_filter_expression(e))
194                .transpose()?
195                .map(Box::new);
196            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
197                .iter()
198                .map(|(cond, result)| {
199                    Ok((
200                        convert_filter_expression(cond)?,
201                        convert_filter_expression(result)?,
202                    ))
203                })
204                .collect::<Result<Vec<_>>>()?;
205            let filter_else = else_clause
206                .as_ref()
207                .map(|e| convert_filter_expression(e))
208                .transpose()?
209                .map(Box::new);
210            Ok(FilterExpression::Case {
211                operand: filter_operand,
212                when_clauses: filter_when_clauses,
213                else_clause: filter_else,
214            })
215        }
216        LogicalExpression::List(items) => {
217            let filter_items: Vec<FilterExpression> = items
218                .iter()
219                .map(convert_filter_expression)
220                .collect::<Result<Vec<_>>>()?;
221            Ok(FilterExpression::List(filter_items))
222        }
223        LogicalExpression::Map(pairs) => {
224            let filter_pairs: Vec<(String, FilterExpression)> = pairs
225                .iter()
226                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
227                .collect::<Result<Vec<_>>>()?;
228            Ok(FilterExpression::Map(filter_pairs))
229        }
230        LogicalExpression::IndexAccess { base, index } => {
231            let base_expr = convert_filter_expression(base)?;
232            let index_expr = convert_filter_expression(index)?;
233            Ok(FilterExpression::IndexAccess {
234                base: Box::new(base_expr),
235                index: Box::new(index_expr),
236            })
237        }
238        LogicalExpression::SliceAccess { base, start, end } => {
239            let base_expr = convert_filter_expression(base)?;
240            let start_expr = start
241                .as_ref()
242                .map(|s| convert_filter_expression(s))
243                .transpose()?
244                .map(Box::new);
245            let end_expr = end
246                .as_ref()
247                .map(|e| convert_filter_expression(e))
248                .transpose()?
249                .map(Box::new);
250            Ok(FilterExpression::SliceAccess {
251                base: Box::new(base_expr),
252                start: start_expr,
253                end: end_expr,
254            })
255        }
256        LogicalExpression::Parameter(_) => Err(Error::Internal(
257            "Parameters not yet supported in filters".to_string(),
258        )),
259        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
260        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
261        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
262        LogicalExpression::ListComprehension {
263            variable,
264            list_expr,
265            filter_expr,
266            map_expr,
267        } => {
268            let list = convert_filter_expression(list_expr)?;
269            let filter = filter_expr
270                .as_ref()
271                .map(|f| convert_filter_expression(f))
272                .transpose()?
273                .map(Box::new);
274            let map = convert_filter_expression(map_expr)?;
275            Ok(FilterExpression::ListComprehension {
276                variable: variable.clone(),
277                list_expr: Box::new(list),
278                filter_expr: filter,
279                map_expr: Box::new(map),
280            })
281        }
282        LogicalExpression::ListPredicate {
283            kind,
284            variable,
285            list_expr,
286            predicate,
287        } => {
288            use crate::query::plan::ListPredicateKind as LPK;
289            let filter_kind = match kind {
290                LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
291                LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
292                LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
293                LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
294            };
295            let list = convert_filter_expression(list_expr)?;
296            let pred = convert_filter_expression(predicate)?;
297            Ok(FilterExpression::ListPredicate {
298                kind: filter_kind,
299                variable: variable.clone(),
300                list_expr: Box::new(list),
301                predicate: Box::new(pred),
302            })
303        }
304        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
305            // Complex subqueries are handled at the plan_filter level via semi-join
306            // or Apply rewrites. If we reach here, the subquery is in a position that
307            // cannot be rewritten (e.g., nested inside a CASE expression). Return a
308            // literal false/zero as a safe fallback.
309            Err(Error::Internal(
310                "Subquery expressions in this position require the semi-join or Apply rewrite; \
311                 move the EXISTS/COUNT subquery to a top-level WHERE predicate"
312                    .to_string(),
313            ))
314        }
315        LogicalExpression::MapProjection { base, entries } => {
316            let physical_entries: Vec<(String, FilterExpression)> = entries
317                .iter()
318                .map(|entry| match entry {
319                    crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
320                        name.clone(),
321                        FilterExpression::Property {
322                            variable: base.clone(),
323                            property: name.clone(),
324                        },
325                    )),
326                    crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
327                        Ok((key.clone(), convert_filter_expression(expr)?))
328                    }
329                    crate::query::plan::MapProjectionEntry::AllProperties => Ok((
330                        "*".to_string(),
331                        FilterExpression::FunctionCall {
332                            name: "properties".to_string(),
333                            args: vec![FilterExpression::Variable(base.clone())],
334                        },
335                    )),
336                })
337                .collect::<Result<Vec<_>>>()?;
338            Ok(FilterExpression::Map(physical_entries))
339        }
340        LogicalExpression::Reduce {
341            accumulator,
342            initial,
343            variable,
344            list,
345            expression,
346        } => Ok(FilterExpression::Reduce {
347            accumulator: accumulator.clone(),
348            initial: Box::new(convert_filter_expression(initial)?),
349            variable: variable.clone(),
350            list: Box::new(convert_filter_expression(list)?),
351            expression: Box::new(convert_filter_expression(expression)?),
352        }),
353        LogicalExpression::PatternComprehension { projection, .. } => {
354            let proj = convert_filter_expression(projection)?;
355            Ok(FilterExpression::FunctionCall {
356                name: "collect".to_string(),
357                args: vec![proj],
358            })
359        }
360    }
361}
362
363/// Infers the logical type from a value.
364pub(crate) fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
365    use grafeo_common::types::Value;
366    match value {
367        Value::Null => LogicalType::String,
368        Value::Bool(_) => LogicalType::Bool,
369        Value::Int64(_) => LogicalType::Int64,
370        Value::Float64(_) => LogicalType::Float64,
371        Value::String(_) => LogicalType::String,
372        Value::Bytes(_) => LogicalType::String,
373        Value::Timestamp(_) => LogicalType::Timestamp,
374        Value::Date(_) => LogicalType::Date,
375        Value::Time(_) => LogicalType::Time,
376        Value::Duration(_) => LogicalType::Duration,
377        Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
378        Value::List(_) => LogicalType::String,
379        Value::Map(_) => LogicalType::String,
380        Value::Vector(v) => LogicalType::Vector(v.len()),
381        Value::Path { .. } => LogicalType::Any,
382    }
383}
384
385/// Evaluates a constant logical expression to a Value.
386///
387/// Only handles literals, unary minus on numeric literals, and simple expressions.
388/// Returns an error for runtime-dependent expressions (variables, property accesses, etc.).
389pub(crate) fn eval_constant_expression(
390    expr: &crate::query::plan::LogicalExpression,
391) -> Result<grafeo_common::types::Value> {
392    use crate::query::plan::LogicalExpression;
393    use grafeo_common::types::Value;
394
395    match expr {
396        LogicalExpression::Literal(val) => Ok(val.clone()),
397        LogicalExpression::Unary {
398            op: crate::query::plan::UnaryOp::Neg,
399            operand,
400        } => {
401            let val = eval_constant_expression(operand)?;
402            match val {
403                Value::Int64(n) => Ok(Value::Int64(-n)),
404                Value::Float64(f) => Ok(Value::Float64(-f)),
405                _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
406            }
407        }
408        _ => Err(Error::Internal(
409            "Procedure argument must be a constant value".into(),
410        )),
411    }
412}