Skip to main content

lance_graph/datafusion_planner/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! DataFusion-based physical planner for graph queries
5//!
6//! Translates graph logical plans into DataFusion logical plans using a two-phase approach:
7//!
8//! ## Phase 1: Analysis
9//! - Assigns unique IDs to relationship instances to avoid column conflicts
10//! - Collects variable-to-label mappings and required datasets
11//!
12//! ## Phase 2: Plan Building
13//! - Nodes -> Table scans, Relationships -> Linking tables, Traversals -> Joins
14//! - Variable-length paths (`*1..3`) use unrolling: generate fixed-length plans + UNION
15//! - All columns qualified as `{variable}__{column}` to avoid ambiguity
16
17pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23mod udf;
24pub mod vector_ops;
25
26#[cfg(test)]
27mod test_fixtures;
28
29// Re-export public types
30pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
31
32use crate::config::GraphConfig;
33use crate::error::Result;
34use crate::logical_plan::LogicalOperator;
35use crate::source_catalog::GraphSourceCatalog;
36use datafusion::logical_expr::LogicalPlan;
37use std::sync::Arc;
38
39/// Planner abstraction for graph-to-physical planning
40pub trait GraphPhysicalPlanner {
41    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
42}
43
44/// DataFusion-based physical planner
45pub struct DataFusionPlanner {
46    pub(crate) config: GraphConfig,
47    pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
48}
49
50impl DataFusionPlanner {
51    pub fn new(config: GraphConfig) -> Self {
52        Self {
53            config,
54            catalog: None,
55        }
56    }
57
58    pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
59        Self {
60            config,
61            catalog: Some(catalog),
62        }
63    }
64
65    pub fn plan_with_context(
66        &self,
67        logical_plan: &LogicalOperator,
68        datasets: &std::collections::HashMap<String, arrow::record_batch::RecordBatch>,
69    ) -> Result<LogicalPlan> {
70        use crate::source_catalog::{InMemoryCatalog, SimpleTableSource};
71
72        // Use the analyze() method to extract metadata
73        let analysis = analysis::analyze(logical_plan)?;
74
75        // Build an in-memory catalog from provided datasets (nodes and relationships)
76        let mut catalog = InMemoryCatalog::new();
77
78        // Register node sources from required datasets
79        for label in &analysis.required_datasets {
80            if self.config.node_mappings.contains_key(label) {
81                if let Some(batch) = datasets.get(label) {
82                    let src = Arc::new(SimpleTableSource::new(batch.schema()));
83                    catalog = catalog.with_node_source(label, src);
84                }
85            }
86        }
87
88        // Register relationship sources from required datasets
89        for rel_type in &analysis.required_datasets {
90            if self.config.relationship_mappings.contains_key(rel_type) {
91                if let Some(batch) = datasets.get(rel_type) {
92                    let src = Arc::new(SimpleTableSource::new(batch.schema()));
93                    catalog = catalog.with_relationship_source(rel_type, src);
94                }
95            }
96        }
97
98        // Plan using a planner bound to this catalog so scans get qualified projections
99        let planner_with_cat =
100            DataFusionPlanner::with_catalog(self.config.clone(), Arc::new(catalog));
101        planner_with_cat.plan(logical_plan)
102    }
103
104    /// Helper to convert DataFusion builder errors into GraphError::PlanError with context
105    pub(crate) fn plan_error<E: std::fmt::Display>(
106        &self,
107        context: &str,
108        error: E,
109    ) -> crate::error::GraphError {
110        crate::error::GraphError::PlanError {
111            message: format!("{}: {}", context, error),
112            location: snafu::Location::new(file!(), line!(), column!()),
113        }
114    }
115}
116
117impl GraphPhysicalPlanner for DataFusionPlanner {
118    fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
119        // Phase 1: Analyze query structure
120        let analysis = analysis::analyze(logical_plan)?;
121
122        // Phase 2: Build execution plan with context
123        let mut ctx = PlanningContext::new(&analysis);
124        self.build_operator(&mut ctx, logical_plan)
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use crate::ast::{
132        BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
133        ValueExpression,
134    };
135    use crate::logical_plan::LogicalOperator;
136    use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
137
138    #[test]
139    fn test_filter_preserves_error_context() {
140        // Test that filter errors include helpful context
141        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
142
143        let scan = person_scan("p");
144
145        // Create a filter with a property reference
146        let filter = LogicalOperator::Filter {
147            input: Box::new(scan),
148            predicate: BooleanExpression::Comparison {
149                left: ValueExpression::Property(PropertyRef {
150                    variable: "p".to_string(),
151                    property: "age".to_string(),
152                }),
153                operator: ComparisonOperator::GreaterThan,
154                right: ValueExpression::Literal(PropertyValue::Integer(30)),
155            },
156        };
157
158        let result = planner.plan(&filter);
159
160        // Should succeed - this tests that valid filters work
161        assert!(result.is_ok(), "Valid filter should succeed");
162    }
163
164    #[test]
165    fn test_exists_on_relationship_property_is_qualified() {
166        // Test that EXISTS on relationship properties uses qualified column names
167        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
168
169        let scan_a = person_scan("a");
170        let expand = LogicalOperator::Expand {
171            input: Box::new(scan_a),
172            source_variable: "a".to_string(),
173            target_variable: "b".to_string(),
174            target_label: "Person".to_string(),
175            relationship_types: vec!["KNOWS".to_string()],
176            direction: RelationshipDirection::Outgoing,
177            relationship_variable: Some("r".to_string()),
178            properties: Default::default(),
179            target_properties: Default::default(),
180        };
181        let pred = BooleanExpression::Exists(PropertyRef {
182            variable: "r".into(),
183            property: "src_person_id".into(),
184        });
185        let filter = LogicalOperator::Filter {
186            input: Box::new(expand),
187            predicate: pred,
188        };
189        let df_plan = planner.plan(&filter).unwrap();
190        let s = format!("{:?}", df_plan);
191        assert!(s.contains("Filter"), "missing Filter: {}", s);
192        assert!(
193            s.contains("r__src_person_id") || s.contains("IsNotNull"),
194            "missing qualified rel column or IsNotNull in filter: {}",
195            s
196        );
197    }
198
199    #[test]
200    fn test_in_list_on_relationship_property_is_qualified() {
201        // Test that IN lists on relationship properties use qualified column names
202        let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
203
204        let scan_a = person_scan("a");
205        let expand = LogicalOperator::Expand {
206            input: Box::new(scan_a),
207            source_variable: "a".to_string(),
208            target_variable: "b".to_string(),
209            target_label: "Person".to_string(),
210            relationship_types: vec!["KNOWS".to_string()],
211            direction: RelationshipDirection::Outgoing,
212            relationship_variable: Some("r".to_string()),
213            properties: Default::default(),
214            target_properties: Default::default(),
215        };
216        let filter = LogicalOperator::Filter {
217            input: Box::new(expand),
218            predicate: BooleanExpression::In {
219                expression: ValueExpression::Property(PropertyRef {
220                    variable: "r".into(),
221                    property: "src_person_id".into(),
222                }),
223                list: vec![
224                    ValueExpression::Literal(PropertyValue::Integer(1)),
225                    ValueExpression::Literal(PropertyValue::Integer(2)),
226                ],
227            },
228        };
229        let df_plan = planner.plan(&filter).unwrap();
230        let s = format!("{:?}", df_plan);
231        assert!(s.contains("Filter"), "missing Filter: {}", s);
232        assert!(
233            s.contains("r__src_person_id"),
234            "missing qualified rel column in IN list filter: {}",
235            s
236        );
237    }
238
239    #[test]
240    fn test_exists_and_in_on_node_props_materialized() {
241        // Test that EXISTS and IN expressions on node properties work correctly
242        let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
243
244        let scan_a = person_scan("a");
245        let pred = BooleanExpression::And(
246            Box::new(BooleanExpression::Exists(PropertyRef {
247                variable: "a".into(),
248                property: "name".into(),
249            })),
250            Box::new(BooleanExpression::In {
251                expression: ValueExpression::Property(PropertyRef {
252                    variable: "a".into(),
253                    property: "age".into(),
254                }),
255                list: vec![
256                    ValueExpression::Literal(PropertyValue::Integer(20)),
257                    ValueExpression::Literal(PropertyValue::Integer(30)),
258                ],
259            }),
260        );
261        let filter = LogicalOperator::Filter {
262            input: Box::new(scan_a),
263            predicate: pred,
264        };
265        let df_plan = planner.plan(&filter).unwrap();
266        let s = format!("{:?}", df_plan);
267        assert!(s.contains("Filter"), "missing Filter: {}", s);
268        assert!(
269            s.contains("a__name") || s.contains("IsNotNull"),
270            "missing EXISTS on a__name: {}",
271            s
272        );
273        assert!(
274            s.contains("a__age") || s.contains("age"),
275            "missing IN on a.age: {}",
276            s
277        );
278    }
279}