lance_graph/
query_processor.rs1use crate::ast::CypherQuery;
10use crate::config::GraphConfig;
11use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner};
13use crate::error::{GraphError, Result};
14use crate::logical_plan::{LogicalOperator, LogicalPlanner};
15use crate::parser::parse_cypher_query;
16use crate::semantic::{SemanticAnalyzer, SemanticResult};
17use arrow::record_batch::RecordBatch;
18use datafusion::logical_expr::LogicalPlan;
19use std::collections::HashMap;
20
21pub struct QueryProcessor {
23 config: GraphConfig,
24}
25
26#[derive(Debug, Clone)]
28pub struct QueryPlan {
29 pub query_text: String,
31 pub ast: CypherQuery,
33 pub semantic_result: SemanticResult,
35 pub logical_plan: LogicalOperator,
37 pub datafusion_plan: LogicalPlan,
39}
40
41impl QueryProcessor {
42 pub fn new(config: GraphConfig) -> Self {
43 Self { config }
44 }
45
46 pub fn process_query_with_datasets(
48 &self,
49 query_text: &str,
50 datasets: &HashMap<String, RecordBatch>,
51 ) -> Result<QueryPlan> {
52 let ast = parse_cypher_query(query_text)?;
54
55 let mut semantic_analyzer = SemanticAnalyzer::new(self.config.clone());
57 let semantic_result = semantic_analyzer.analyze(&ast)?;
58
59 if !semantic_result.errors.is_empty() {
60 return Err(GraphError::PlanError {
61 message: format!("Semantic errors: {}", semantic_result.errors.join(", ")),
62 location: snafu::Location::new(file!(), line!(), column!()),
63 });
64 }
65
66 let mut logical_planner = LogicalPlanner::new();
68 let logical_plan = logical_planner.plan(&ast)?;
69
70 let df_planner = DataFusionPlanner::new(self.config.clone());
72 let datafusion_plan = df_planner.plan_with_context(&logical_plan, datasets)?;
73
74 Ok(QueryPlan {
75 query_text: query_text.to_string(),
76 ast,
77 semantic_result,
78 logical_plan,
79 datafusion_plan,
80 })
81 }
82
83 pub fn process_query(&self, query_text: &str) -> Result<QueryPlan> {
85 let ast = parse_cypher_query(query_text)?;
87
88 let mut semantic_analyzer = SemanticAnalyzer::new(self.config.clone());
90 let semantic_result = semantic_analyzer.analyze(&ast)?;
91
92 if !semantic_result.errors.is_empty() {
94 return Err(GraphError::PlanError {
95 message: format!("Semantic errors: {}", semantic_result.errors.join(", ")),
96 location: snafu::Location::new(file!(), line!(), column!()),
97 });
98 }
99
100 let mut logical_planner = LogicalPlanner::new();
102 let logical_plan = logical_planner.plan(&ast)?;
103
104 let df_planner = DataFusionPlanner::new(self.config.clone());
106 let datafusion_plan = df_planner.plan(&logical_plan)?;
107
108 Ok(QueryPlan {
109 query_text: query_text.to_string(),
110 ast,
111 semantic_result,
112 logical_plan,
113 datafusion_plan,
114 })
115 }
116
117 pub fn explain_query(&self, query_text: &str) -> Result<String> {
119 let plan = self.process_query(query_text)?;
120
121 let mut explanation = String::new();
122 explanation.push_str("=== Query Processing Pipeline ===\n\n");
123 explanation.push_str(&format!("Original Query:\n{}\n\n", plan.query_text));
124
125 explanation.push_str("=== Phase 1: Parsing ===\n");
126 explanation.push_str(&format!("AST: {:#?}\n\n", plan.ast));
127
128 explanation.push_str("=== Phase 2: Semantic Analysis ===\n");
129 explanation.push_str(&format!(
130 "Variables: {:#?}\n",
131 plan.semantic_result.variables
132 ));
133 if !plan.semantic_result.warnings.is_empty() {
134 explanation.push_str(&format!("Warnings: {:?}\n", plan.semantic_result.warnings));
135 }
136 explanation.push('\n');
137
138 explanation.push_str("=== Phase 3: Logical Planning ===\n");
139 explanation.push_str(&format!("Logical Plan: {:#?}\n\n", plan.logical_plan));
140
141 explanation.push_str("=== Phase 4: DataFusion Planning ===\n");
142 explanation.push_str(&format!("DataFusion Plan: {:#?}\n\n", plan.datafusion_plan));
143
144 Ok(explanation)
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use arrow_schema::{DataType, Field, Schema};
154 use std::sync::Arc;
155
156 fn create_test_config() -> GraphConfig {
157 GraphConfig::builder()
158 .with_node_label("Person", "person_id")
159 .with_relationship("KNOWS", "src_person_id", "dst_person_id")
160 .build()
161 .unwrap()
162 }
163
164 fn make_datasets() -> HashMap<String, RecordBatch> {
165 let schema = Arc::new(Schema::new(vec![
166 Field::new("person_id", DataType::Int64, false),
167 Field::new("name", DataType::Utf8, true),
168 Field::new("age", DataType::Int64, true),
169 ]));
170 let batch = RecordBatch::new_empty(schema);
171 let mut datasets = HashMap::new();
172 datasets.insert("Person".to_string(), batch);
173 datasets
174 }
175
176 #[test]
177 fn test_simple_query_pipeline() {
178 let config = create_test_config();
179 let processor = QueryProcessor::new(config);
180
181 let query = "MATCH (n:Person) RETURN n.name";
182 let datasets = make_datasets();
183 let plan = processor
184 .process_query_with_datasets(query, &datasets)
185 .unwrap();
186
187 let _ = plan.datafusion_plan;
190 assert_eq!(plan.query_text, query);
191 assert!(!plan.semantic_result.variables.is_empty());
192 }
193
194 #[test]
195 fn test_query_explanation() {
196 let config = create_test_config();
197 let processor = QueryProcessor::new(config);
198
199 let query = "MATCH (n:Person) WHERE n.age > 30 RETURN n.name";
200 let datasets = make_datasets();
201 let plan = processor
202 .process_query_with_datasets(query, &datasets)
203 .unwrap();
204 let explanation = format!("{:?}", plan.datafusion_plan);
205
206 assert!(!explanation.is_empty());
207 assert!(!plan.semantic_result.variables.is_empty());
208 }
209
210 #[test]
211 fn test_semantic_error_detection() {
212 let config = create_test_config();
213 let processor = QueryProcessor::new(config);
214
215 let query = "MATCH (n:Person) RETURN m.name"; let result = processor.process_query(query);
218
219 assert!(result.is_err());
220 let error_msg = result.unwrap_err().to_string();
221 assert!(error_msg.contains("Undefined variable"));
222 }
223
224 #[test]
225 fn test_new_pipeline_vs_old() {
226 let config = create_test_config();
227 let processor = QueryProcessor::new(config);
228
229 let query = "MATCH (n:Person) WHERE n.age > 30 RETURN n.name";
231 let datasets = make_datasets();
232 let new_plan = processor
233 .process_query_with_datasets(query, &datasets)
234 .unwrap();
235
236 let _ = new_plan.datafusion_plan;
238
239 match &new_plan.logical_plan {
241 LogicalOperator::Limit { input, .. }
242 | LogicalOperator::Sort { input, .. }
243 | LogicalOperator::Project { input, .. } => {
244 assert!(matches!(**input, LogicalOperator::Filter { .. }));
246 }
247 _ => panic!("Expected nested logical plan structure"),
248 }
249
250 assert!(new_plan.semantic_result.variables.contains_key("n"));
252 }
253}