lance_graph/
query_processor.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Unified query processor implementing the full pipeline
5//!
6//! This module implements the complete query processing pipeline:
7//! Parse → Semantic Analysis → Logical Plan → Physical Plan → Execution
8
9use crate::ast::CypherQuery;
10use crate::config::GraphConfig;
11// use crate::datafusion_planner::DataFusionPlanner;
12use crate::datafusion_planner::{DataFusionPlanner, GraphPhysicalPlanner};
13use crate::error::{GraphError, Result};
14use crate::logical_plan::{LogicalOperator, LogicalPlanner};
15use crate::parser::parse_cypher_query;
16use crate::semantic::{SemanticAnalyzer, SemanticResult};
17use arrow::record_batch::RecordBatch;
18use datafusion::logical_expr::LogicalPlan;
19use std::collections::HashMap;
20
21/// Complete query processing pipeline
22pub struct QueryProcessor {
23    config: GraphConfig,
24}
25
26/// Query execution plan with all intermediate representations
27#[derive(Debug, Clone)]
28pub struct QueryPlan {
29    /// Original query string
30    pub query_text: String,
31    /// Parsed AST
32    pub ast: CypherQuery,
33    /// Semantic analysis result
34    pub semantic_result: SemanticResult,
35    /// Logical plan
36    pub logical_plan: LogicalOperator,
37    /// DataFusion physical plan
38    pub datafusion_plan: LogicalPlan,
39}
40
41impl QueryProcessor {
42    pub fn new(config: GraphConfig) -> Self {
43        Self { config }
44    }
45
46    /// Process a Cypher query with in-memory datasets registered for DataFusion planning
47    pub fn process_query_with_datasets(
48        &self,
49        query_text: &str,
50        datasets: &HashMap<String, RecordBatch>,
51    ) -> Result<QueryPlan> {
52        // Phase 1: Parse - Convert text to AST
53        let ast = parse_cypher_query(query_text)?;
54
55        // Phase 2: Semantic Analysis - Validate and enrich AST
56        let mut semantic_analyzer = SemanticAnalyzer::new(self.config.clone());
57        let semantic_result = semantic_analyzer.analyze(&ast)?;
58
59        if !semantic_result.errors.is_empty() {
60            return Err(GraphError::PlanError {
61                message: format!("Semantic errors: {}", semantic_result.errors.join(", ")),
62                location: snafu::Location::new(file!(), line!(), column!()),
63            });
64        }
65
66        // Phase 3: Logical Planning - Convert AST to logical operators
67        let mut logical_planner = LogicalPlanner::new();
68        let logical_plan = logical_planner.plan(&ast)?;
69
70        // Phase 4: Physical Planning with datasets registered in a DF context
71        let df_planner = DataFusionPlanner::new(self.config.clone());
72        let datafusion_plan = df_planner.plan_with_context(&logical_plan, datasets)?;
73
74        Ok(QueryPlan {
75            query_text: query_text.to_string(),
76            ast,
77            semantic_result,
78            logical_plan,
79            datafusion_plan,
80        })
81    }
82
83    /// Process a Cypher query through the complete pipeline
84    pub fn process_query(&self, query_text: &str) -> Result<QueryPlan> {
85        // Phase 1: Parse - Convert text to AST
86        let ast = parse_cypher_query(query_text)?;
87
88        // Phase 2: Semantic Analysis - Validate and enrich AST
89        let mut semantic_analyzer = SemanticAnalyzer::new(self.config.clone());
90        let semantic_result = semantic_analyzer.analyze(&ast)?;
91
92        // Check for semantic errors
93        if !semantic_result.errors.is_empty() {
94            return Err(GraphError::PlanError {
95                message: format!("Semantic errors: {}", semantic_result.errors.join(", ")),
96                location: snafu::Location::new(file!(), line!(), column!()),
97            });
98        }
99
100        // Phase 3: Logical Planning - Convert AST to logical operators
101        let mut logical_planner = LogicalPlanner::new();
102        let logical_plan = logical_planner.plan(&ast)?;
103
104        // Phase 4: Physical Planning - Convert logical plan to DataFusion plan
105        let df_planner = DataFusionPlanner::new(self.config.clone());
106        let datafusion_plan = df_planner.plan(&logical_plan)?;
107
108        Ok(QueryPlan {
109            query_text: query_text.to_string(),
110            ast,
111            semantic_result,
112            logical_plan,
113            datafusion_plan,
114        })
115    }
116
117    /// Process and explain a query (for debugging)
118    pub fn explain_query(&self, query_text: &str) -> Result<String> {
119        let plan = self.process_query(query_text)?;
120
121        let mut explanation = String::new();
122        explanation.push_str("=== Query Processing Pipeline ===\n\n");
123        explanation.push_str(&format!("Original Query:\n{}\n\n", plan.query_text));
124
125        explanation.push_str("=== Phase 1: Parsing ===\n");
126        explanation.push_str(&format!("AST: {:#?}\n\n", plan.ast));
127
128        explanation.push_str("=== Phase 2: Semantic Analysis ===\n");
129        explanation.push_str(&format!(
130            "Variables: {:#?}\n",
131            plan.semantic_result.variables
132        ));
133        if !plan.semantic_result.warnings.is_empty() {
134            explanation.push_str(&format!("Warnings: {:?}\n", plan.semantic_result.warnings));
135        }
136        explanation.push('\n');
137
138        explanation.push_str("=== Phase 3: Logical Planning ===\n");
139        explanation.push_str(&format!("Logical Plan: {:#?}\n\n", plan.logical_plan));
140
141        explanation.push_str("=== Phase 4: DataFusion Planning ===\n");
142        explanation.push_str(&format!("DataFusion Plan: {:#?}\n\n", plan.datafusion_plan));
143
144        // No legacy SQL generation layer; DataFusion plan is the physical output.
145
146        Ok(explanation)
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use arrow_schema::{DataType, Field, Schema};
154    use std::sync::Arc;
155
156    fn create_test_config() -> GraphConfig {
157        GraphConfig::builder()
158            .with_node_label("Person", "person_id")
159            .with_relationship("KNOWS", "src_person_id", "dst_person_id")
160            .build()
161            .unwrap()
162    }
163
164    fn make_datasets() -> HashMap<String, RecordBatch> {
165        let schema = Arc::new(Schema::new(vec![
166            Field::new("person_id", DataType::Int64, false),
167            Field::new("name", DataType::Utf8, true),
168            Field::new("age", DataType::Int64, true),
169        ]));
170        let batch = RecordBatch::new_empty(schema);
171        let mut datasets = HashMap::new();
172        datasets.insert("Person".to_string(), batch);
173        datasets
174    }
175
176    #[test]
177    fn test_simple_query_pipeline() {
178        let config = create_test_config();
179        let processor = QueryProcessor::new(config);
180
181        let query = "MATCH (n:Person) RETURN n.name";
182        let datasets = make_datasets();
183        let plan = processor
184            .process_query_with_datasets(query, &datasets)
185            .unwrap();
186
187        // Verify we have all phases
188        // DataFusion plan is present (placeholder or concrete)
189        let _ = plan.datafusion_plan;
190        assert_eq!(plan.query_text, query);
191        assert!(!plan.semantic_result.variables.is_empty());
192    }
193
194    #[test]
195    fn test_query_explanation() {
196        let config = create_test_config();
197        let processor = QueryProcessor::new(config);
198
199        let query = "MATCH (n:Person) WHERE n.age > 30 RETURN n.name";
200        let datasets = make_datasets();
201        let plan = processor
202            .process_query_with_datasets(query, &datasets)
203            .unwrap();
204        let explanation = format!("{:?}", plan.datafusion_plan);
205
206        assert!(!explanation.is_empty());
207        assert!(!plan.semantic_result.variables.is_empty());
208    }
209
210    #[test]
211    fn test_semantic_error_detection() {
212        let config = create_test_config();
213        let processor = QueryProcessor::new(config);
214
215        // Query with undefined variable
216        let query = "MATCH (n:Person) RETURN m.name"; // 'm' is not defined
217        let result = processor.process_query(query);
218
219        assert!(result.is_err());
220        let error_msg = result.unwrap_err().to_string();
221        assert!(error_msg.contains("Undefined variable"));
222    }
223
224    #[test]
225    fn test_new_pipeline_vs_old() {
226        let config = create_test_config();
227        let processor = QueryProcessor::new(config);
228
229        // Test the new pipeline
230        let query = "MATCH (n:Person) WHERE n.age > 30 RETURN n.name";
231        let datasets = make_datasets();
232        let new_plan = processor
233            .process_query_with_datasets(query, &datasets)
234            .unwrap();
235
236        // The new pipeline should produce a DataFusion plan
237        let _ = new_plan.datafusion_plan;
238
239        // Verify we have logical plan structure
240        match &new_plan.logical_plan {
241            LogicalOperator::Limit { input, .. }
242            | LogicalOperator::Sort { input, .. }
243            | LogicalOperator::Project { input, .. } => {
244                // We should have nested structure
245                assert!(matches!(**input, LogicalOperator::Filter { .. }));
246            }
247            _ => panic!("Expected nested logical plan structure"),
248        }
249
250        // Verify semantic analysis found variables
251        assert!(new_plan.semantic_result.variables.contains_key("n"));
252    }
253}