lance_graph/datafusion_planner/
mod.rs1pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23
24#[cfg(test)]
25mod test_fixtures;
26
27pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
29
30use crate::config::GraphConfig;
31use crate::error::Result;
32use crate::logical_plan::LogicalOperator;
33use crate::source_catalog::GraphSourceCatalog;
34use datafusion::logical_expr::LogicalPlan;
35use std::sync::Arc;
36
37pub trait GraphPhysicalPlanner {
39 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
40}
41
42pub struct DataFusionPlanner {
44 pub(crate) config: GraphConfig,
45 pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
46}
47
48impl DataFusionPlanner {
49 pub fn new(config: GraphConfig) -> Self {
50 Self {
51 config,
52 catalog: None,
53 }
54 }
55
56 pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
57 Self {
58 config,
59 catalog: Some(catalog),
60 }
61 }
62
63 pub fn plan_with_context(
64 &self,
65 logical_plan: &LogicalOperator,
66 datasets: &std::collections::HashMap<String, arrow::record_batch::RecordBatch>,
67 ) -> Result<LogicalPlan> {
68 use crate::source_catalog::{InMemoryCatalog, SimpleTableSource};
69
70 let analysis = analysis::analyze(logical_plan)?;
72
73 let mut catalog = InMemoryCatalog::new();
75
76 for label in &analysis.required_datasets {
78 if self.config.node_mappings.contains_key(label) {
79 if let Some(batch) = datasets.get(label) {
80 let src = Arc::new(SimpleTableSource::new(batch.schema()));
81 catalog = catalog.with_node_source(label, src);
82 }
83 }
84 }
85
86 for rel_type in &analysis.required_datasets {
88 if self.config.relationship_mappings.contains_key(rel_type) {
89 if let Some(batch) = datasets.get(rel_type) {
90 let src = Arc::new(SimpleTableSource::new(batch.schema()));
91 catalog = catalog.with_relationship_source(rel_type, src);
92 }
93 }
94 }
95
96 let planner_with_cat =
98 DataFusionPlanner::with_catalog(self.config.clone(), Arc::new(catalog));
99 planner_with_cat.plan(logical_plan)
100 }
101
102 pub(crate) fn plan_error<E: std::fmt::Display>(
104 &self,
105 context: &str,
106 error: E,
107 ) -> crate::error::GraphError {
108 crate::error::GraphError::PlanError {
109 message: format!("{}: {}", context, error),
110 location: snafu::Location::new(file!(), line!(), column!()),
111 }
112 }
113}
114
115impl GraphPhysicalPlanner for DataFusionPlanner {
116 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
117 let analysis = analysis::analyze(logical_plan)?;
119
120 let mut ctx = PlanningContext::new(&analysis);
122 self.build_operator(&mut ctx, logical_plan)
123 }
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use crate::ast::{
130 BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
131 ValueExpression,
132 };
133 use crate::logical_plan::LogicalOperator;
134 use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
135
136 #[test]
137 fn test_filter_preserves_error_context() {
138 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
140
141 let scan = person_scan("p");
142
143 let filter = LogicalOperator::Filter {
145 input: Box::new(scan),
146 predicate: BooleanExpression::Comparison {
147 left: ValueExpression::Property(PropertyRef {
148 variable: "p".to_string(),
149 property: "age".to_string(),
150 }),
151 operator: ComparisonOperator::GreaterThan,
152 right: ValueExpression::Literal(PropertyValue::Integer(30)),
153 },
154 };
155
156 let result = planner.plan(&filter);
157
158 assert!(result.is_ok(), "Valid filter should succeed");
160 }
161
162 #[test]
163 fn test_exists_on_relationship_property_is_qualified() {
164 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
166
167 let scan_a = person_scan("a");
168 let expand = LogicalOperator::Expand {
169 input: Box::new(scan_a),
170 source_variable: "a".to_string(),
171 target_variable: "b".to_string(),
172 target_label: "Person".to_string(),
173 relationship_types: vec!["KNOWS".to_string()],
174 direction: RelationshipDirection::Outgoing,
175 relationship_variable: Some("r".to_string()),
176 properties: Default::default(),
177 target_properties: Default::default(),
178 };
179 let pred = BooleanExpression::Exists(PropertyRef {
180 variable: "r".into(),
181 property: "src_person_id".into(),
182 });
183 let filter = LogicalOperator::Filter {
184 input: Box::new(expand),
185 predicate: pred,
186 };
187 let df_plan = planner.plan(&filter).unwrap();
188 let s = format!("{:?}", df_plan);
189 assert!(s.contains("Filter"), "missing Filter: {}", s);
190 assert!(
191 s.contains("r__src_person_id") || s.contains("IsNotNull"),
192 "missing qualified rel column or IsNotNull in filter: {}",
193 s
194 );
195 }
196
197 #[test]
198 fn test_in_list_on_relationship_property_is_qualified() {
199 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
201
202 let scan_a = person_scan("a");
203 let expand = LogicalOperator::Expand {
204 input: Box::new(scan_a),
205 source_variable: "a".to_string(),
206 target_variable: "b".to_string(),
207 target_label: "Person".to_string(),
208 relationship_types: vec!["KNOWS".to_string()],
209 direction: RelationshipDirection::Outgoing,
210 relationship_variable: Some("r".to_string()),
211 properties: Default::default(),
212 target_properties: Default::default(),
213 };
214 let filter = LogicalOperator::Filter {
215 input: Box::new(expand),
216 predicate: BooleanExpression::In {
217 expression: ValueExpression::Property(PropertyRef {
218 variable: "r".into(),
219 property: "src_person_id".into(),
220 }),
221 list: vec![
222 ValueExpression::Literal(PropertyValue::Integer(1)),
223 ValueExpression::Literal(PropertyValue::Integer(2)),
224 ],
225 },
226 };
227 let df_plan = planner.plan(&filter).unwrap();
228 let s = format!("{:?}", df_plan);
229 assert!(s.contains("Filter"), "missing Filter: {}", s);
230 assert!(
231 s.contains("r__src_person_id"),
232 "missing qualified rel column in IN list filter: {}",
233 s
234 );
235 }
236
237 #[test]
238 fn test_exists_and_in_on_node_props_materialized() {
239 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
241
242 let scan_a = person_scan("a");
243 let pred = BooleanExpression::And(
244 Box::new(BooleanExpression::Exists(PropertyRef {
245 variable: "a".into(),
246 property: "name".into(),
247 })),
248 Box::new(BooleanExpression::In {
249 expression: ValueExpression::Property(PropertyRef {
250 variable: "a".into(),
251 property: "age".into(),
252 }),
253 list: vec![
254 ValueExpression::Literal(PropertyValue::Integer(20)),
255 ValueExpression::Literal(PropertyValue::Integer(30)),
256 ],
257 }),
258 );
259 let filter = LogicalOperator::Filter {
260 input: Box::new(scan_a),
261 predicate: pred,
262 };
263 let df_plan = planner.plan(&filter).unwrap();
264 let s = format!("{:?}", df_plan);
265 assert!(s.contains("Filter"), "missing Filter: {}", s);
266 assert!(
267 s.contains("a__name") || s.contains("IsNotNull"),
268 "missing EXISTS on a__name: {}",
269 s
270 );
271 assert!(
272 s.contains("a__age") || s.contains("age"),
273 "missing IN on a.age: {}",
274 s
275 );
276 }
277}