Skip to main content

grafeo_engine/query/planner/
mod.rs

1//! Converts logical plans into physical execution trees.
2//!
3//! The optimizer produces a logical plan (what data you want), but the planner
4//! converts it to a physical plan (how to actually get it). This means choosing
5//! hash joins vs nested loops, picking index scans vs full scans, etc.
6//!
7//! This module contains shared infrastructure used by both the LPG and RDF planners:
8//! - [`PhysicalPlan`] - the output of planning
9//! - Expression and operator conversion functions
10//! - Reusable operator builders (in the common submodule)
11//!
12//! Model-specific planning lives in [`lpg`] and [`rdf`].
13
14pub(crate) mod common;
15pub mod lpg;
16
17#[cfg(feature = "triple-store")]
18pub mod rdf;
19
20// Re-export the LPG planner as the default `Planner` for backwards compatibility.
21pub use lpg::Planner;
22
23use crate::query::plan::{
24    AggregateFunction as LogicalAggregateFunction, BinaryOp, LogicalExpression, UnaryOp,
25};
26use grafeo_common::types::LogicalType;
27use grafeo_common::utils::error::{Error, Result};
28use grafeo_core::execution::AdaptiveContext;
29use grafeo_core::execution::operators::{
30    AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, FilterExpression, Operator,
31    UnaryFilterOp,
32};
33
34/// A physical plan ready for execution.
35pub struct PhysicalPlan {
36    /// The root physical operator.
37    pub operator: Box<dyn Operator>,
38    /// Column names for the result.
39    pub columns: Vec<String>,
40    /// Adaptive execution context with cardinality estimates.
41    ///
42    /// When adaptive execution is enabled, this context contains estimated
43    /// cardinalities at various checkpoints in the plan. During execution,
44    /// actual row counts are recorded and compared against estimates.
45    pub adaptive_context: Option<AdaptiveContext>,
46}
47
48impl PhysicalPlan {
49    /// Returns the column names.
50    #[must_use]
51    pub fn columns(&self) -> &[String] {
52        &self.columns
53    }
54
55    /// Consumes the plan and returns the operator.
56    pub fn into_operator(self) -> Box<dyn Operator> {
57        self.operator
58    }
59
60    /// Returns the adaptive context, if adaptive execution is enabled.
61    #[must_use]
62    pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
63        self.adaptive_context.as_ref()
64    }
65
66    /// Takes ownership of the adaptive context.
67    pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
68        self.adaptive_context.take()
69    }
70}
71
72// ---------------------------------------------------------------------------
73// Shared conversion functions (used by both LPG and RDF planners)
74// ---------------------------------------------------------------------------
75
76/// Converts a logical binary operator to a filter binary operator.
77///
78/// # Errors
79///
80/// Currently infallible for all `BinaryOp` variants, but returns `Result`
81/// for forward compatibility.
82pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
83    match op {
84        BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
85        BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
86        BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
87        BinaryOp::Le => Ok(BinaryFilterOp::Le),
88        BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
89        BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
90        BinaryOp::And => Ok(BinaryFilterOp::And),
91        BinaryOp::Or => Ok(BinaryFilterOp::Or),
92        BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
93        BinaryOp::Add => Ok(BinaryFilterOp::Add),
94        BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
95        BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
96        BinaryOp::Div => Ok(BinaryFilterOp::Div),
97        BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
98        BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
99        BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
100        BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
101        BinaryOp::In => Ok(BinaryFilterOp::In),
102        BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
103        BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
104        BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
105        BinaryOp::Like => Ok(BinaryFilterOp::Like),
106    }
107}
108
109/// Converts a logical unary operator to a filter unary operator.
110///
111/// # Errors
112///
113/// Currently infallible for all `UnaryOp` variants, but returns `Result`
114/// for forward compatibility.
115pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
116    match op {
117        UnaryOp::Not => Ok(UnaryFilterOp::Not),
118        UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
119        UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
120        UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
121    }
122}
123
124/// Converts a logical aggregate function to a physical aggregate function.
125pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
126    match func {
127        LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
128        LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
129        LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
130        LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
131        LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
132        LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
133        LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
134        LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
135        LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
136        LogicalAggregateFunction::Variance => PhysicalAggregateFunction::Variance,
137        LogicalAggregateFunction::VariancePop => PhysicalAggregateFunction::VariancePop,
138        LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
139        LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
140        LogicalAggregateFunction::GroupConcat => PhysicalAggregateFunction::GroupConcat,
141        LogicalAggregateFunction::Sample => PhysicalAggregateFunction::Sample,
142        LogicalAggregateFunction::CovarSamp => PhysicalAggregateFunction::CovarSamp,
143        LogicalAggregateFunction::CovarPop => PhysicalAggregateFunction::CovarPop,
144        LogicalAggregateFunction::Corr => PhysicalAggregateFunction::Corr,
145        LogicalAggregateFunction::RegrSlope => PhysicalAggregateFunction::RegrSlope,
146        LogicalAggregateFunction::RegrIntercept => PhysicalAggregateFunction::RegrIntercept,
147        LogicalAggregateFunction::RegrR2 => PhysicalAggregateFunction::RegrR2,
148        LogicalAggregateFunction::RegrCount => PhysicalAggregateFunction::RegrCount,
149        LogicalAggregateFunction::RegrSxx => PhysicalAggregateFunction::RegrSxx,
150        LogicalAggregateFunction::RegrSyy => PhysicalAggregateFunction::RegrSyy,
151        LogicalAggregateFunction::RegrSxy => PhysicalAggregateFunction::RegrSxy,
152        LogicalAggregateFunction::RegrAvgx => PhysicalAggregateFunction::RegrAvgx,
153        LogicalAggregateFunction::RegrAvgy => PhysicalAggregateFunction::RegrAvgy,
154    }
155}
156
157/// Converts a logical expression to a filter expression.
158///
159/// This is a standalone function used by both LPG and RDF planners.
160///
161/// # Errors
162///
163/// Returns an error if the expression contains an unsupported operator or
164/// function that cannot be translated to a filter expression.
165pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
166    match expr {
167        LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
168        LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
169        LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
170            variable: variable.clone(),
171            property: property.clone(),
172        }),
173        LogicalExpression::Binary { left, op, right } => {
174            let left_expr = convert_filter_expression(left)?;
175            let right_expr = convert_filter_expression(right)?;
176            let filter_op = convert_binary_op(*op)?;
177            Ok(FilterExpression::Binary {
178                left: Box::new(left_expr),
179                op: filter_op,
180                right: Box::new(right_expr),
181            })
182        }
183        LogicalExpression::Unary { op, operand } => {
184            let operand_expr = convert_filter_expression(operand)?;
185            let filter_op = convert_unary_op(*op)?;
186            Ok(FilterExpression::Unary {
187                op: filter_op,
188                operand: Box::new(operand_expr),
189            })
190        }
191        LogicalExpression::FunctionCall { name, args, .. } => {
192            let filter_args: Vec<FilterExpression> = args
193                .iter()
194                .map(convert_filter_expression)
195                .collect::<Result<Vec<_>>>()?;
196            Ok(FilterExpression::FunctionCall {
197                name: name.clone(),
198                args: filter_args,
199            })
200        }
201        LogicalExpression::Case {
202            operand,
203            when_clauses,
204            else_clause,
205        } => {
206            let filter_operand = operand
207                .as_ref()
208                .map(|e| convert_filter_expression(e))
209                .transpose()?
210                .map(Box::new);
211            let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
212                .iter()
213                .map(|(cond, result)| {
214                    Ok((
215                        convert_filter_expression(cond)?,
216                        convert_filter_expression(result)?,
217                    ))
218                })
219                .collect::<Result<Vec<_>>>()?;
220            let filter_else = else_clause
221                .as_ref()
222                .map(|e| convert_filter_expression(e))
223                .transpose()?
224                .map(Box::new);
225            Ok(FilterExpression::Case {
226                operand: filter_operand,
227                when_clauses: filter_when_clauses,
228                else_clause: filter_else,
229            })
230        }
231        LogicalExpression::List(items) => {
232            let filter_items: Vec<FilterExpression> = items
233                .iter()
234                .map(convert_filter_expression)
235                .collect::<Result<Vec<_>>>()?;
236            Ok(FilterExpression::List(filter_items))
237        }
238        LogicalExpression::Map(pairs) => {
239            let filter_pairs: Vec<(String, FilterExpression)> = pairs
240                .iter()
241                .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
242                .collect::<Result<Vec<_>>>()?;
243            Ok(FilterExpression::Map(filter_pairs))
244        }
245        LogicalExpression::IndexAccess { base, index } => {
246            let base_expr = convert_filter_expression(base)?;
247            let index_expr = convert_filter_expression(index)?;
248            Ok(FilterExpression::IndexAccess {
249                base: Box::new(base_expr),
250                index: Box::new(index_expr),
251            })
252        }
253        LogicalExpression::SliceAccess { base, start, end } => {
254            let base_expr = convert_filter_expression(base)?;
255            let start_expr = start
256                .as_ref()
257                .map(|s| convert_filter_expression(s))
258                .transpose()?
259                .map(Box::new);
260            let end_expr = end
261                .as_ref()
262                .map(|e| convert_filter_expression(e))
263                .transpose()?
264                .map(Box::new);
265            Ok(FilterExpression::SliceAccess {
266                base: Box::new(base_expr),
267                start: start_expr,
268                end: end_expr,
269            })
270        }
271        LogicalExpression::Parameter(_) => Err(Error::Internal(
272            "Parameters not yet supported in filters".to_string(),
273        )),
274        LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
275        LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
276        LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
277        LogicalExpression::ListComprehension {
278            variable,
279            list_expr,
280            filter_expr,
281            map_expr,
282        } => {
283            let list = convert_filter_expression(list_expr)?;
284            let filter = filter_expr
285                .as_ref()
286                .map(|f| convert_filter_expression(f))
287                .transpose()?
288                .map(Box::new);
289            let map = convert_filter_expression(map_expr)?;
290            Ok(FilterExpression::ListComprehension {
291                variable: variable.clone(),
292                list_expr: Box::new(list),
293                filter_expr: filter,
294                map_expr: Box::new(map),
295            })
296        }
297        LogicalExpression::ListPredicate {
298            kind,
299            variable,
300            list_expr,
301            predicate,
302        } => {
303            use crate::query::plan::ListPredicateKind as LPK;
304            let filter_kind = match kind {
305                LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
306                LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
307                LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
308                LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
309            };
310            let list = convert_filter_expression(list_expr)?;
311            let pred = convert_filter_expression(predicate)?;
312            Ok(FilterExpression::ListPredicate {
313                kind: filter_kind,
314                variable: variable.clone(),
315                list_expr: Box::new(list),
316                predicate: Box::new(pred),
317            })
318        }
319        LogicalExpression::ExistsSubquery(_)
320        | LogicalExpression::CountSubquery(_)
321        | LogicalExpression::ValueSubquery(_) => {
322            // Complex subqueries are handled at the plan_filter level via semi-join
323            // or Apply rewrites. If we reach here, the subquery is in a position that
324            // cannot be rewritten (e.g., nested inside a CASE expression). Return a
325            // literal false/zero as a safe fallback.
326            Err(Error::Internal(
327                "Subquery expressions in this position require the semi-join or Apply rewrite; \
328                 move the EXISTS/COUNT/VALUE subquery to a top-level WHERE predicate or RETURN"
329                    .to_string(),
330            ))
331        }
332        LogicalExpression::MapProjection { base, entries } => {
333            let physical_entries: Vec<(String, FilterExpression)> = entries
334                .iter()
335                .map(|entry| match entry {
336                    crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
337                        name.clone(),
338                        FilterExpression::Property {
339                            variable: base.clone(),
340                            property: name.clone(),
341                        },
342                    )),
343                    crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
344                        Ok((key.clone(), convert_filter_expression(expr)?))
345                    }
346                    crate::query::plan::MapProjectionEntry::AllProperties => Ok((
347                        "*".to_string(),
348                        FilterExpression::FunctionCall {
349                            name: "properties".to_string(),
350                            args: vec![FilterExpression::Variable(base.clone())],
351                        },
352                    )),
353                })
354                .collect::<Result<Vec<_>>>()?;
355            Ok(FilterExpression::Map(physical_entries))
356        }
357        LogicalExpression::Reduce {
358            accumulator,
359            initial,
360            variable,
361            list,
362            expression,
363        } => Ok(FilterExpression::Reduce {
364            accumulator: accumulator.clone(),
365            initial: Box::new(convert_filter_expression(initial)?),
366            variable: variable.clone(),
367            list: Box::new(convert_filter_expression(list)?),
368            expression: Box::new(convert_filter_expression(expression)?),
369        }),
370        LogicalExpression::PatternComprehension { projection, .. } => {
371            let proj = convert_filter_expression(projection)?;
372            Ok(FilterExpression::FunctionCall {
373                name: "collect".to_string(),
374                args: vec![proj],
375            })
376        }
377    }
378}
379
380/// Infers the logical type from a value.
381pub(crate) fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
382    use grafeo_common::types::Value;
383    match value {
384        Value::Null => LogicalType::String,
385        Value::Bool(_) => LogicalType::Bool,
386        Value::Int64(_) => LogicalType::Int64,
387        Value::Float64(_) => LogicalType::Float64,
388        Value::String(_) => LogicalType::String,
389        Value::Bytes(_) => LogicalType::String,
390        Value::Timestamp(_) => LogicalType::Timestamp,
391        Value::Date(_) => LogicalType::Date,
392        Value::Time(_) => LogicalType::Time,
393        Value::Duration(_) => LogicalType::Duration,
394        Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
395        Value::List(_) => LogicalType::String,
396        Value::Map(_) => LogicalType::String,
397        Value::Vector(v) => LogicalType::Vector(v.len()),
398        Value::Path { .. } => LogicalType::Any,
399        Value::GCounter(_) | Value::OnCounter { .. } => LogicalType::Any,
400        _ => LogicalType::Any,
401    }
402}
403
404/// Evaluates a constant logical expression to a Value.
405///
406/// Only handles literals, unary minus on numeric literals, and simple expressions.
407/// Returns an error for runtime-dependent expressions (variables, property accesses, etc.).
408#[cfg(feature = "algos")]
409pub(crate) fn eval_constant_expression(
410    expr: &crate::query::plan::LogicalExpression,
411) -> Result<grafeo_common::types::Value> {
412    use crate::query::plan::LogicalExpression;
413    use grafeo_common::types::Value;
414
415    match expr {
416        LogicalExpression::Literal(val) => Ok(val.clone()),
417        LogicalExpression::Unary {
418            op: crate::query::plan::UnaryOp::Neg,
419            operand,
420        } => {
421            let val = eval_constant_expression(operand)?;
422            match val {
423                Value::Int64(n) => Ok(Value::Int64(-n)),
424                Value::Float64(f) => Ok(Value::Float64(-f)),
425                _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
426            }
427        }
428        _ => Err(Error::Internal(
429            "Procedure argument must be a constant value".into(),
430        )),
431    }
432}