lance_graph/datafusion_planner/
mod.rs1pub mod analysis;
18mod builder;
19mod config_helpers;
20mod expression;
21mod join_ops;
22mod scan_ops;
23mod udf;
24pub mod vector_ops;
25
26#[cfg(test)]
27mod test_fixtures;
28
29pub use analysis::{PlanningContext, QueryAnalysis, RelationshipInstance};
31
32use crate::config::GraphConfig;
33use crate::error::Result;
34use crate::logical_plan::LogicalOperator;
35use crate::source_catalog::GraphSourceCatalog;
36use datafusion::logical_expr::LogicalPlan;
37use std::sync::Arc;
38
39pub trait GraphPhysicalPlanner {
41 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan>;
42}
43
44pub struct DataFusionPlanner {
46 pub(crate) config: GraphConfig,
47 pub(crate) catalog: Option<Arc<dyn GraphSourceCatalog>>,
48}
49
50impl DataFusionPlanner {
51 pub fn new(config: GraphConfig) -> Self {
52 Self {
53 config,
54 catalog: None,
55 }
56 }
57
58 pub fn with_catalog(config: GraphConfig, catalog: Arc<dyn GraphSourceCatalog>) -> Self {
59 Self {
60 config,
61 catalog: Some(catalog),
62 }
63 }
64
65 pub(crate) fn plan_error<E: std::fmt::Display>(
67 &self,
68 context: &str,
69 error: E,
70 ) -> crate::error::GraphError {
71 crate::error::GraphError::PlanError {
72 message: format!("{}: {}", context, error),
73 location: snafu::Location::new(file!(), line!(), column!()),
74 }
75 }
76}
77
78impl GraphPhysicalPlanner for DataFusionPlanner {
79 fn plan(&self, logical_plan: &LogicalOperator) -> Result<LogicalPlan> {
80 let analysis = analysis::analyze(logical_plan)?;
82
83 let mut ctx = PlanningContext::new(&analysis);
85 self.build_operator(&mut ctx, logical_plan)
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use crate::ast::{
93 BooleanExpression, ComparisonOperator, PropertyRef, PropertyValue, RelationshipDirection,
94 ValueExpression,
95 };
96 use crate::logical_plan::LogicalOperator;
97 use test_fixtures::{make_catalog, person_config, person_knows_config, person_scan};
98
99 #[test]
100 fn test_filter_preserves_error_context() {
101 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
103
104 let scan = person_scan("p");
105
106 let filter = LogicalOperator::Filter {
108 input: Box::new(scan),
109 predicate: BooleanExpression::Comparison {
110 left: ValueExpression::Property(PropertyRef {
111 variable: "p".to_string(),
112 property: "age".to_string(),
113 }),
114 operator: ComparisonOperator::GreaterThan,
115 right: ValueExpression::Literal(PropertyValue::Integer(30)),
116 },
117 };
118
119 let result = planner.plan(&filter);
120
121 assert!(result.is_ok(), "Valid filter should succeed");
123 }
124
125 #[test]
126 fn test_exists_on_relationship_property_is_qualified() {
127 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
129
130 let scan_a = person_scan("a");
131 let expand = LogicalOperator::Expand {
132 input: Box::new(scan_a),
133 source_variable: "a".to_string(),
134 target_variable: "b".to_string(),
135 target_label: "Person".to_string(),
136 relationship_types: vec!["KNOWS".to_string()],
137 direction: RelationshipDirection::Outgoing,
138 relationship_variable: Some("r".to_string()),
139 properties: Default::default(),
140 target_properties: Default::default(),
141 };
142 let pred = BooleanExpression::Exists(PropertyRef {
143 variable: "r".into(),
144 property: "src_person_id".into(),
145 });
146 let filter = LogicalOperator::Filter {
147 input: Box::new(expand),
148 predicate: pred,
149 };
150 let df_plan = planner.plan(&filter).unwrap();
151 let s = format!("{:?}", df_plan);
152 assert!(s.contains("Filter"), "missing Filter: {}", s);
153 assert!(
154 s.contains("r__src_person_id") || s.contains("IsNotNull"),
155 "missing qualified rel column or IsNotNull in filter: {}",
156 s
157 );
158 }
159
160 #[test]
161 fn test_in_list_on_relationship_property_is_qualified() {
162 let planner = DataFusionPlanner::with_catalog(person_knows_config(), make_catalog());
164
165 let scan_a = person_scan("a");
166 let expand = LogicalOperator::Expand {
167 input: Box::new(scan_a),
168 source_variable: "a".to_string(),
169 target_variable: "b".to_string(),
170 target_label: "Person".to_string(),
171 relationship_types: vec!["KNOWS".to_string()],
172 direction: RelationshipDirection::Outgoing,
173 relationship_variable: Some("r".to_string()),
174 properties: Default::default(),
175 target_properties: Default::default(),
176 };
177 let filter = LogicalOperator::Filter {
178 input: Box::new(expand),
179 predicate: BooleanExpression::In {
180 expression: ValueExpression::Property(PropertyRef {
181 variable: "r".into(),
182 property: "src_person_id".into(),
183 }),
184 list: vec![
185 ValueExpression::Literal(PropertyValue::Integer(1)),
186 ValueExpression::Literal(PropertyValue::Integer(2)),
187 ],
188 },
189 };
190 let df_plan = planner.plan(&filter).unwrap();
191 let s = format!("{:?}", df_plan);
192 assert!(s.contains("Filter"), "missing Filter: {}", s);
193 assert!(
194 s.contains("r__src_person_id"),
195 "missing qualified rel column in IN list filter: {}",
196 s
197 );
198 }
199
200 #[test]
201 fn test_exists_and_in_on_node_props_materialized() {
202 let planner = DataFusionPlanner::with_catalog(person_config(), make_catalog());
204
205 let scan_a = person_scan("a");
206 let pred = BooleanExpression::And(
207 Box::new(BooleanExpression::Exists(PropertyRef {
208 variable: "a".into(),
209 property: "name".into(),
210 })),
211 Box::new(BooleanExpression::In {
212 expression: ValueExpression::Property(PropertyRef {
213 variable: "a".into(),
214 property: "age".into(),
215 }),
216 list: vec![
217 ValueExpression::Literal(PropertyValue::Integer(20)),
218 ValueExpression::Literal(PropertyValue::Integer(30)),
219 ],
220 }),
221 );
222 let filter = LogicalOperator::Filter {
223 input: Box::new(scan_a),
224 predicate: pred,
225 };
226 let df_plan = planner.plan(&filter).unwrap();
227 let s = format!("{:?}", df_plan);
228 assert!(s.contains("Filter"), "missing Filter: {}", s);
229 assert!(
230 s.contains("a__name") || s.contains("IsNotNull"),
231 "missing EXISTS on a__name: {}",
232 s
233 );
234 assert!(
235 s.contains("a__age") || s.contains("age"),
236 "missing IN on a.age: {}",
237 s
238 );
239 }
240}