Skip to main content

lance_graph/datafusion_planner/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! DataFusion-based physical planner for graph queries
5//!
6//! Translates graph logical plans into DataFusion logical plans using a two-phase approach:
7//!
8//! ## Phase 1: Analysis
9//! - Assigns unique IDs to relationship instances to avoid column conflicts
10//! - Collects variable-to-label mappings and required datasets
11//!
12//! ## Phase 2: Plan Building
13//! - Nodes -> Table scans, Relationships -> Linking tables, Traversals -> Joins
14//! - Variable-length paths (`*1..3`) use unrolling: generate fixed-length plans + UNION
15//! - All columns qualified as `{variable}__{column}` to avoid ambiguity
16
17pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23mod udf;
24pub mod vector_ops;
25
26#[cfg(test)]
27mod test_fixtures;
28
29// Re-export public types
30pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
31
32use crate::config::GraphConfig;
33use crate::error::Result;
34use crate::logical_plan::LogicalOperator;
35use crate::source_catalog::GraphSourceCatalog;
36use datafusion::logical_expr::LogicalPlan;
37use std::sync::Arc;
38
39/// Planner abstraction for graph-to-physical planning
40pub trait GraphPhysicalPlanner {
41    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
42}
43
44/// DataFusion-based physical planner
45pub struct DataFusionPlanner {
46    pub(crate) config: GraphConfig,
47    pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
48}
49
50impl DataFusionPlanner {
51    pub fn new(config: GraphConfig) -> Self {
52        Self {
53            config,
54            catalog: None,
55        }
56    }
57
58    pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
59        Self {
60            config,
61            catalog: Some(catalog),
62        }
63    }
64
65    /// Helper to convert DataFusion builder errors into GraphError::PlanError with context
66    pub(crate) fn plan_error<E: std::fmt::Display>(
67        &self,
68        context: &str,
69        error: E,
70    ) -> crate::error::GraphError {
71        crate::error::GraphError::PlanError {
72            message: format!("{}: {}", context, error),
73            location: snafu::Location::new(file!(), line!(), column!()),
74        }
75    }
76}
77
78impl GraphPhysicalPlanner for DataFusionPlanner {
79    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
80        // Phase 1: Analyze query structure
81        let analysis = analysis::analyze(logical_plan)?;
82
83        // Phase 2: Build execution plan with context
84        let mut ctx = PlanningContext::new(&analysis);
85        self.build_operator(&mut ctx, logical_plan)
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use super::*;
92    use crate::ast::{
93        BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
94        ValueExpression,
95    };
96    use crate::logical_plan::LogicalOperator;
97    use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
98
99    #[test]
100    fn test_filter_preserves_error_context() {
101        // Test that filter errors include helpful context
102        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
103
104        let scan = person_scan("p");
105
106        // Create a filter with a property reference
107        let filter = LogicalOperator::Filter {
108            input: Box::new(scan),
109            predicate: BooleanExpression::Comparison {
110                left: ValueExpression::Property(PropertyRef {
111                    variable: "p".to_string(),
112                    property: "age".to_string(),
113                }),
114                operator: ComparisonOperator::GreaterThan,
115                right: ValueExpression::Literal(PropertyValue::Integer(30)),
116            },
117        };
118
119        let result = planner.plan(&filter);
120
121        // Should succeed - this tests that valid filters work
122        assert!(result.is_ok(), "Valid filter should succeed");
123    }
124
125    #[test]
126    fn test_exists_on_relationship_property_is_qualified() {
127        // Test that EXISTS on relationship properties uses qualified column names
128        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
129
130        let scan_a = person_scan("a");
131        let expand = LogicalOperator::Expand {
132            input: Box::new(scan_a),
133            source_variable: "a".to_string(),
134            target_variable: "b".to_string(),
135            target_label: "Person".to_string(),
136            relationship_types: vec!["KNOWS".to_string()],
137            direction: RelationshipDirection::Outgoing,
138            relationship_variable: Some("r".to_string()),
139            properties: Default::default(),
140            target_properties: Default::default(),
141        };
142        let pred = BooleanExpression::Exists(PropertyRef {
143            variable: "r".into(),
144            property: "src_person_id".into(),
145        });
146        let filter = LogicalOperator::Filter {
147            input: Box::new(expand),
148            predicate: pred,
149        };
150        let df_plan = planner.plan(&filter).unwrap();
151        let s = format!("{:?}", df_plan);
152        assert!(s.contains("Filter"), "missing Filter: {}", s);
153        assert!(
154            s.contains("r__src_person_id") || s.contains("IsNotNull"),
155            "missing qualified rel column or IsNotNull in filter: {}",
156            s
157        );
158    }
159
160    #[test]
161    fn test_in_list_on_relationship_property_is_qualified() {
162        // Test that IN lists on relationship properties use qualified column names
163        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
164
165        let scan_a = person_scan("a");
166        let expand = LogicalOperator::Expand {
167            input: Box::new(scan_a),
168            source_variable: "a".to_string(),
169            target_variable: "b".to_string(),
170            target_label: "Person".to_string(),
171            relationship_types: vec!["KNOWS".to_string()],
172            direction: RelationshipDirection::Outgoing,
173            relationship_variable: Some("r".to_string()),
174            properties: Default::default(),
175            target_properties: Default::default(),
176        };
177        let filter = LogicalOperator::Filter {
178            input: Box::new(expand),
179            predicate: BooleanExpression::In {
180                expression: ValueExpression::Property(PropertyRef {
181                    variable: "r".into(),
182                    property: "src_person_id".into(),
183                }),
184                list: vec![
185                    ValueExpression::Literal(PropertyValue::Integer(1)),
186                    ValueExpression::Literal(PropertyValue::Integer(2)),
187                ],
188            },
189        };
190        let df_plan = planner.plan(&filter).unwrap();
191        let s = format!("{:?}", df_plan);
192        assert!(s.contains("Filter"), "missing Filter: {}", s);
193        assert!(
194            s.contains("r__src_person_id"),
195            "missing qualified rel column in IN list filter: {}",
196            s
197        );
198    }
199
200    #[test]
201    fn test_exists_and_in_on_node_props_materialized() {
202        // Test that EXISTS and IN expressions on node properties work correctly
203        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
204
205        let scan_a = person_scan("a");
206        let pred = BooleanExpression::And(
207            Box::new(BooleanExpression::Exists(PropertyRef {
208                variable: "a".into(),
209                property: "name".into(),
210            })),
211            Box::new(BooleanExpression::In {
212                expression: ValueExpression::Property(PropertyRef {
213                    variable: "a".into(),
214                    property: "age".into(),
215                }),
216                list: vec![
217                    ValueExpression::Literal(PropertyValue::Integer(20)),
218                    ValueExpression::Literal(PropertyValue::Integer(30)),
219                ],
220            }),
221        );
222        let filter = LogicalOperator::Filter {
223            input: Box::new(scan_a),
224            predicate: pred,
225        };
226        let df_plan = planner.plan(&filter).unwrap();
227        let s = format!("{:?}", df_plan);
228        assert!(s.contains("Filter"), "missing Filter: {}", s);
229        assert!(
230            s.contains("a__name") || s.contains("IsNotNull"),
231            "missing EXISTS on a__name: {}",
232            s
233        );
234        assert!(
235            s.contains("a__age") || s.contains("age"),
236            "missing IN on a.age: {}",
237            s
238        );
239    }
240}