lance_graph/datafusion_planner/
mod.rs1pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23mod udf;
24pub mod vector_ops;
25
26#[cfg(test)]
27mod test_fixtures;
28
29pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
31
32use crate::config::GraphConfig;
33use crate::error::Result;
34use crate::logical_plan::LogicalOperator;
35use crate::source_catalog::GraphSourceCatalog;
36use datafusion::logical_expr::LogicalPlan;
37use std::sync::Arc;
38
39pub trait GraphPhysicalPlanner {
41 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
42}
43
44pub struct DataFusionPlanner {
46 pub(crate) config: GraphConfig,
47 pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
48}
49
50impl DataFusionPlanner {
51 pub fn new(config: GraphConfig) -> Self {
52 Self {
53 config,
54 catalog: None,
55 }
56 }
57
58 pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
59 Self {
60 config,
61 catalog: Some(catalog),
62 }
63 }
64
65 pub fn plan_with_context(
66 &self,
67 logical_plan: &LogicalOperator,
68 datasets: &std::collections::HashMap<String, arrow::record_batch::RecordBatch>,
69 ) -> Result<LogicalPlan> {
70 use crate::source_catalog::{InMemoryCatalog, SimpleTableSource};
71
72 let analysis = analysis::analyze(logical_plan)?;
74
75 let mut catalog = InMemoryCatalog::new();
77
78 for label in &analysis.required_datasets {
80 if self.config.node_mappings.contains_key(label) {
81 if let Some(batch) = datasets.get(label) {
82 let src = Arc::new(SimpleTableSource::new(batch.schema()));
83 catalog = catalog.with_node_source(label, src);
84 }
85 }
86 }
87
88 for rel_type in &analysis.required_datasets {
90 if self.config.relationship_mappings.contains_key(rel_type) {
91 if let Some(batch) = datasets.get(rel_type) {
92 let src = Arc::new(SimpleTableSource::new(batch.schema()));
93 catalog = catalog.with_relationship_source(rel_type, src);
94 }
95 }
96 }
97
98 let planner_with_cat =
100 DataFusionPlanner::with_catalog(self.config.clone(), Arc::new(catalog));
101 planner_with_cat.plan(logical_plan)
102 }
103
104 pub(crate) fn plan_error<E: std::fmt::Display>(
106 &self,
107 context: &str,
108 error: E,
109 ) -> crate::error::GraphError {
110 crate::error::GraphError::PlanError {
111 message: format!("{}: {}", context, error),
112 location: snafu::Location::new(file!(), line!(), column!()),
113 }
114 }
115}
116
117impl GraphPhysicalPlanner for DataFusionPlanner {
118 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
119 let analysis = analysis::analyze(logical_plan)?;
121
122 let mut ctx = PlanningContext::new(&analysis);
124 self.build_operator(&mut ctx, logical_plan)
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use crate::ast::{
132 BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
133 ValueExpression,
134 };
135 use crate::logical_plan::LogicalOperator;
136 use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
137
138 #[test]
139 fn test_filter_preserves_error_context() {
140 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
142
143 let scan = person_scan("p");
144
145 let filter = LogicalOperator::Filter {
147 input: Box::new(scan),
148 predicate: BooleanExpression::Comparison {
149 left: ValueExpression::Property(PropertyRef {
150 variable: "p".to_string(),
151 property: "age".to_string(),
152 }),
153 operator: ComparisonOperator::GreaterThan,
154 right: ValueExpression::Literal(PropertyValue::Integer(30)),
155 },
156 };
157
158 let result = planner.plan(&filter);
159
160 assert!(result.is_ok(), "Valid filter should succeed");
162 }
163
164 #[test]
165 fn test_exists_on_relationship_property_is_qualified() {
166 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
168
169 let scan_a = person_scan("a");
170 let expand = LogicalOperator::Expand {
171 input: Box::new(scan_a),
172 source_variable: "a".to_string(),
173 target_variable: "b".to_string(),
174 target_label: "Person".to_string(),
175 relationship_types: vec!["KNOWS".to_string()],
176 direction: RelationshipDirection::Outgoing,
177 relationship_variable: Some("r".to_string()),
178 properties: Default::default(),
179 target_properties: Default::default(),
180 };
181 let pred = BooleanExpression::Exists(PropertyRef {
182 variable: "r".into(),
183 property: "src_person_id".into(),
184 });
185 let filter = LogicalOperator::Filter {
186 input: Box::new(expand),
187 predicate: pred,
188 };
189 let df_plan = planner.plan(&filter).unwrap();
190 let s = format!("{:?}", df_plan);
191 assert!(s.contains("Filter"), "missing Filter: {}", s);
192 assert!(
193 s.contains("r__src_person_id") || s.contains("IsNotNull"),
194 "missing qualified rel column or IsNotNull in filter: {}",
195 s
196 );
197 }
198
199 #[test]
200 fn test_in_list_on_relationship_property_is_qualified() {
201 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
203
204 let scan_a = person_scan("a");
205 let expand = LogicalOperator::Expand {
206 input: Box::new(scan_a),
207 source_variable: "a".to_string(),
208 target_variable: "b".to_string(),
209 target_label: "Person".to_string(),
210 relationship_types: vec!["KNOWS".to_string()],
211 direction: RelationshipDirection::Outgoing,
212 relationship_variable: Some("r".to_string()),
213 properties: Default::default(),
214 target_properties: Default::default(),
215 };
216 let filter = LogicalOperator::Filter {
217 input: Box::new(expand),
218 predicate: BooleanExpression::In {
219 expression: ValueExpression::Property(PropertyRef {
220 variable: "r".into(),
221 property: "src_person_id".into(),
222 }),
223 list: vec![
224 ValueExpression::Literal(PropertyValue::Integer(1)),
225 ValueExpression::Literal(PropertyValue::Integer(2)),
226 ],
227 },
228 };
229 let df_plan = planner.plan(&filter).unwrap();
230 let s = format!("{:?}", df_plan);
231 assert!(s.contains("Filter"), "missing Filter: {}", s);
232 assert!(
233 s.contains("r__src_person_id"),
234 "missing qualified rel column in IN list filter: {}",
235 s
236 );
237 }
238
239 #[test]
240 fn test_exists_and_in_on_node_props_materialized() {
241 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
243
244 let scan_a = person_scan("a");
245 let pred = BooleanExpression::And(
246 Box::new(BooleanExpression::Exists(PropertyRef {
247 variable: "a".into(),
248 property: "name".into(),
249 })),
250 Box::new(BooleanExpression::In {
251 expression: ValueExpression::Property(PropertyRef {
252 variable: "a".into(),
253 property: "age".into(),
254 }),
255 list: vec![
256 ValueExpression::Literal(PropertyValue::Integer(20)),
257 ValueExpression::Literal(PropertyValue::Integer(30)),
258 ],
259 }),
260 );
261 let filter = LogicalOperator::Filter {
262 input: Box::new(scan_a),
263 predicate: pred,
264 };
265 let df_plan = planner.plan(&filter).unwrap();
266 let s = format!("{:?}", df_plan);
267 assert!(s.contains("Filter"), "missing Filter: {}", s);
268 assert!(
269 s.contains("a__name") || s.contains("IsNotNull"),
270 "missing EXISTS on a__name: {}",
271 s
272 );
273 assert!(
274 s.contains("a__age") || s.contains("age"),
275 "missing IN on a.age: {}",
276 s
277 );
278 }
279}