lance_graph/datafusion_planner/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! DataFusion-based physical planner for graph queries
5//!
6//! Translates graph logical plans into DataFusion logical plans using a two-phase approach:
7//!
8//! ## Phase 1: Analysis
9//! - Assigns unique IDs to relationship instances to avoid column conflicts
10//! - Collects variable-to-label mappings and required datasets
11//!
12//! ## Phase 2: Plan Building
13//! - Nodes -> Table scans, Relationships -> Linking tables, Traversals -> Joins
14//! - Variable-length paths (`*1..3`) use unrolling: generate fixed-length plans + UNION
15//! - All columns qualified as `{variable}__{column}` to avoid ambiguity
16
17pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23
24#[cfg(test)]
25mod test_fixtures;
26
27// Re-export public types
28pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
29
30use crate::config::GraphConfig;
31use crate::error::Result;
32use crate::logical_plan::LogicalOperator;
33use crate::source_catalog::GraphSourceCatalog;
34use datafusion::logical_expr::LogicalPlan;
35use std::sync::Arc;
36
37/// Planner abstraction for graph-to-physical planning
38pub trait GraphPhysicalPlanner {
39    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
40}
41
42/// DataFusion-based physical planner
43pub struct DataFusionPlanner {
44    pub(crate) config: GraphConfig,
45    pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
46}
47
48impl DataFusionPlanner {
49    pub fn new(config: GraphConfig) -> Self {
50        Self {
51            config,
52            catalog: None,
53        }
54    }
55
56    pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
57        Self {
58            config,
59            catalog: Some(catalog),
60        }
61    }
62
63    pub fn plan_with_context(
64        &self,
65        logical_plan: &LogicalOperator,
66        datasets: &std::collections::HashMap<String, arrow::record_batch::RecordBatch>,
67    ) -> Result<LogicalPlan> {
68        use crate::source_catalog::{InMemoryCatalog, SimpleTableSource};
69
70        // Use the analyze() method to extract metadata
71        let analysis = analysis::analyze(logical_plan)?;
72
73        // Build an in-memory catalog from provided datasets (nodes and relationships)
74        let mut catalog = InMemoryCatalog::new();
75
76        // Register node sources from required datasets
77        for label in &analysis.required_datasets {
78            if self.config.node_mappings.contains_key(label) {
79                if let Some(batch) = datasets.get(label) {
80                    let src = Arc::new(SimpleTableSource::new(batch.schema()));
81                    catalog = catalog.with_node_source(label, src);
82                }
83            }
84        }
85
86        // Register relationship sources from required datasets
87        for rel_type in &analysis.required_datasets {
88            if self.config.relationship_mappings.contains_key(rel_type) {
89                if let Some(batch) = datasets.get(rel_type) {
90                    let src = Arc::new(SimpleTableSource::new(batch.schema()));
91                    catalog = catalog.with_relationship_source(rel_type, src);
92                }
93            }
94        }
95
96        // Plan using a planner bound to this catalog so scans get qualified projections
97        let planner_with_cat =
98            DataFusionPlanner::with_catalog(self.config.clone(), Arc::new(catalog));
99        planner_with_cat.plan(logical_plan)
100    }
101
102    /// Helper to convert DataFusion builder errors into GraphError::PlanError with context
103    pub(crate) fn plan_error<E: std::fmt::Display>(
104        &self,
105        context: &str,
106        error: E,
107    ) -> crate::error::GraphError {
108        crate::error::GraphError::PlanError {
109            message: format!("{}: {}", context, error),
110            location: snafu::Location::new(file!(), line!(), column!()),
111        }
112    }
113}
114
115impl GraphPhysicalPlanner for DataFusionPlanner {
116    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
117        // Phase 1: Analyze query structure
118        let analysis = analysis::analyze(logical_plan)?;
119
120        // Phase 2: Build execution plan with context
121        let mut ctx = PlanningContext::new(&analysis);
122        self.build_operator(&mut ctx, logical_plan)
123    }
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129    use crate::ast::{
130        BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
131        ValueExpression,
132    };
133    use crate::logical_plan::LogicalOperator;
134    use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
135
136    #[test]
137    fn test_filter_preserves_error_context() {
138        // Test that filter errors include helpful context
139        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
140
141        let scan = person_scan("p");
142
143        // Create a filter with a property reference
144        let filter = LogicalOperator::Filter {
145            input: Box::new(scan),
146            predicate: BooleanExpression::Comparison {
147                left: ValueExpression::Property(PropertyRef {
148                    variable: "p".to_string(),
149                    property: "age".to_string(),
150                }),
151                operator: ComparisonOperator::GreaterThan,
152                right: ValueExpression::Literal(PropertyValue::Integer(30)),
153            },
154        };
155
156        let result = planner.plan(&filter);
157
158        // Should succeed - this tests that valid filters work
159        assert!(result.is_ok(), "Valid filter should succeed");
160    }
161
162    #[test]
163    fn test_exists_on_relationship_property_is_qualified() {
164        // Test that EXISTS on relationship properties uses qualified column names
165        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
166
167        let scan_a = person_scan("a");
168        let expand = LogicalOperator::Expand {
169            input: Box::new(scan_a),
170            source_variable: "a".to_string(),
171            target_variable: "b".to_string(),
172            target_label: "Person".to_string(),
173            relationship_types: vec!["KNOWS".to_string()],
174            direction: RelationshipDirection::Outgoing,
175            relationship_variable: Some("r".to_string()),
176            properties: Default::default(),
177            target_properties: Default::default(),
178        };
179        let pred = BooleanExpression::Exists(PropertyRef {
180            variable: "r".into(),
181            property: "src_person_id".into(),
182        });
183        let filter = LogicalOperator::Filter {
184            input: Box::new(expand),
185            predicate: pred,
186        };
187        let df_plan = planner.plan(&filter).unwrap();
188        let s = format!("{:?}", df_plan);
189        assert!(s.contains("Filter"), "missing Filter: {}", s);
190        assert!(
191            s.contains("r__src_person_id") || s.contains("IsNotNull"),
192            "missing qualified rel column or IsNotNull in filter: {}",
193            s
194        );
195    }
196
197    #[test]
198    fn test_in_list_on_relationship_property_is_qualified() {
199        // Test that IN lists on relationship properties use qualified column names
200        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
201
202        let scan_a = person_scan("a");
203        let expand = LogicalOperator::Expand {
204            input: Box::new(scan_a),
205            source_variable: "a".to_string(),
206            target_variable: "b".to_string(),
207            target_label: "Person".to_string(),
208            relationship_types: vec!["KNOWS".to_string()],
209            direction: RelationshipDirection::Outgoing,
210            relationship_variable: Some("r".to_string()),
211            properties: Default::default(),
212            target_properties: Default::default(),
213        };
214        let filter = LogicalOperator::Filter {
215            input: Box::new(expand),
216            predicate: BooleanExpression::In {
217                expression: ValueExpression::Property(PropertyRef {
218                    variable: "r".into(),
219                    property: "src_person_id".into(),
220                }),
221                list: vec![
222                    ValueExpression::Literal(PropertyValue::Integer(1)),
223                    ValueExpression::Literal(PropertyValue::Integer(2)),
224                ],
225            },
226        };
227        let df_plan = planner.plan(&filter).unwrap();
228        let s = format!("{:?}", df_plan);
229        assert!(s.contains("Filter"), "missing Filter: {}", s);
230        assert!(
231            s.contains("r__src_person_id"),
232            "missing qualified rel column in IN list filter: {}",
233            s
234        );
235    }
236
237    #[test]
238    fn test_exists_and_in_on_node_props_materialized() {
239        // Test that EXISTS and IN expressions on node properties work correctly
240        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
241
242        let scan_a = person_scan("a");
243        let pred = BooleanExpression::And(
244            Box::new(BooleanExpression::Exists(PropertyRef {
245                variable: "a".into(),
246                property: "name".into(),
247            })),
248            Box::new(BooleanExpression::In {
249                expression: ValueExpression::Property(PropertyRef {
250                    variable: "a".into(),
251                    property: "age".into(),
252                }),
253                list: vec![
254                    ValueExpression::Literal(PropertyValue::Integer(20)),
255                    ValueExpression::Literal(PropertyValue::Integer(30)),
256                ],
257            }),
258        );
259        let filter = LogicalOperator::Filter {
260            input: Box::new(scan_a),
261            predicate: pred,
262        };
263        let df_plan = planner.plan(&filter).unwrap();
264        let s = format!("{:?}", df_plan);
265        assert!(s.contains("Filter"), "missing Filter: {}", s);
266        assert!(
267            s.contains("a__name") || s.contains("IsNotNull"),
268            "missing EXISTS on a__name: {}",
269            s
270        );
271        assert!(
272            s.contains("a__age") || s.contains("age"),
273            "missing IN on a.age: {}",
274            s
275        );
276    }
277}