Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
41    #[cfg(feature = "sql-pgq")]
42    SqlPgq,
43    /// SPARQL 1.1 for RDF
44    #[cfg(feature = "sparql")]
45    Sparql,
46    /// GraphQL for RDF
47    #[cfg(all(feature = "graphql", feature = "rdf"))]
48    GraphQLRdf,
49}
50
51impl QueryLanguage {
52    /// Returns whether this language targets LPG (vs RDF).
53    #[must_use]
54    pub const fn is_lpg(&self) -> bool {
55        match self {
56            #[cfg(feature = "gql")]
57            Self::Gql => true,
58            #[cfg(feature = "cypher")]
59            Self::Cypher => true,
60            #[cfg(feature = "gremlin")]
61            Self::Gremlin => true,
62            #[cfg(feature = "graphql")]
63            Self::GraphQL => true,
64            #[cfg(feature = "sql-pgq")]
65            Self::SqlPgq => true,
66            #[cfg(feature = "sparql")]
67            Self::Sparql => false,
68            #[cfg(all(feature = "graphql", feature = "rdf"))]
69            Self::GraphQLRdf => false,
70        }
71    }
72}
73
74/// Query parameters for prepared statements.
75pub type QueryParams = HashMap<String, Value>;
76
77/// Processes queries through the full pipeline.
78///
79/// The processor holds references to the stores and provides a unified
80/// interface for executing queries in any supported language.
81///
82/// # Example
83///
84/// ```ignore
85/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
86///
87/// let processor = QueryProcessor::for_lpg(store);
88/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
89/// ```
90pub struct QueryProcessor {
91    /// LPG store for property graph queries.
92    lpg_store: Arc<LpgStore>,
93    /// Transaction manager for MVCC operations.
94    tx_manager: Arc<TransactionManager>,
95    /// Catalog for schema and index metadata.
96    catalog: Arc<Catalog>,
97    /// Query optimizer.
98    optimizer: Optimizer,
99    /// Current transaction context (if any).
100    tx_context: Option<(EpochId, TxId)>,
101    /// RDF store for triple pattern queries (optional).
102    #[cfg(feature = "rdf")]
103    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
104}
105
106impl QueryProcessor {
107    /// Creates a new query processor for LPG queries.
108    #[must_use]
109    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
110        let optimizer = Optimizer::from_store(&store);
111        Self {
112            lpg_store: store,
113            tx_manager: Arc::new(TransactionManager::new()),
114            catalog: Arc::new(Catalog::new()),
115            optimizer,
116            tx_context: None,
117            #[cfg(feature = "rdf")]
118            rdf_store: None,
119        }
120    }
121
122    /// Creates a new query processor with a transaction manager.
123    #[must_use]
124    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
125        let optimizer = Optimizer::from_store(&store);
126        Self {
127            lpg_store: store,
128            tx_manager,
129            catalog: Arc::new(Catalog::new()),
130            optimizer,
131            tx_context: None,
132            #[cfg(feature = "rdf")]
133            rdf_store: None,
134        }
135    }
136
137    /// Creates a new query processor with both LPG and RDF stores.
138    #[cfg(feature = "rdf")]
139    #[must_use]
140    pub fn with_rdf(
141        lpg_store: Arc<LpgStore>,
142        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
143    ) -> Self {
144        let optimizer = Optimizer::from_store(&lpg_store);
145        Self {
146            lpg_store,
147            tx_manager: Arc::new(TransactionManager::new()),
148            catalog: Arc::new(Catalog::new()),
149            optimizer,
150            tx_context: None,
151            rdf_store: Some(rdf_store),
152        }
153    }
154
155    /// Sets the transaction context for MVCC visibility.
156    ///
157    /// This should be called when the processor is used within a transaction.
158    #[must_use]
159    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
160        self.tx_context = Some((viewing_epoch, tx_id));
161        self
162    }
163
164    /// Sets a custom catalog.
165    #[must_use]
166    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
167        self.catalog = catalog;
168        self
169    }
170
171    /// Sets a custom optimizer.
172    #[must_use]
173    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
174        self.optimizer = optimizer;
175        self
176    }
177
178    /// Processes a query string and returns results.
179    ///
180    /// Pipeline:
181    /// 1. Parse (language-specific parser → AST)
182    /// 2. Translate (AST → LogicalPlan)
183    /// 3. Bind (semantic validation)
184    /// 4. Optimize (filter pushdown, join reorder, etc.)
185    /// 5. Plan (logical → physical operators)
186    /// 6. Execute (run operators, collect results)
187    ///
188    /// # Arguments
189    ///
190    /// * `query` - The query string
191    /// * `language` - Which query language to use
192    /// * `params` - Optional query parameters for prepared statements
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if any stage of the pipeline fails.
197    pub fn process(
198        &self,
199        query: &str,
200        language: QueryLanguage,
201        params: Option<&QueryParams>,
202    ) -> Result<QueryResult> {
203        if language.is_lpg() {
204            self.process_lpg(query, language, params)
205        } else {
206            #[cfg(feature = "rdf")]
207            {
208                self.process_rdf(query, language, params)
209            }
210            #[cfg(not(feature = "rdf"))]
211            {
212                Err(Error::Internal(
213                    "RDF support not enabled. Compile with --features rdf".to_string(),
214                ))
215            }
216        }
217    }
218
219    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
220    fn process_lpg(
221        &self,
222        query: &str,
223        language: QueryLanguage,
224        params: Option<&QueryParams>,
225    ) -> Result<QueryResult> {
226        let start_time = std::time::Instant::now();
227
228        // 1. Parse and translate to logical plan
229        let mut logical_plan = self.translate_lpg(query, language)?;
230
231        // 2. Substitute parameters if provided
232        if let Some(params) = params {
233            substitute_params(&mut logical_plan, params)?;
234        }
235
236        // 3. Semantic validation
237        let mut binder = Binder::new();
238        let _binding_context = binder.bind(&logical_plan)?;
239
240        // 4. Optimize the plan
241        let optimized_plan = self.optimizer.optimize(logical_plan)?;
242
243        // 5. Convert to physical plan with transaction context
244        let planner = if let Some((epoch, tx_id)) = self.tx_context {
245            Planner::with_context(
246                Arc::clone(&self.lpg_store),
247                Arc::clone(&self.tx_manager),
248                Some(tx_id),
249                epoch,
250            )
251        } else {
252            Planner::with_context(
253                Arc::clone(&self.lpg_store),
254                Arc::clone(&self.tx_manager),
255                None,
256                self.tx_manager.current_epoch(),
257            )
258        };
259        let mut physical_plan = planner.plan(&optimized_plan)?;
260
261        // 6. Execute and collect results
262        let executor = Executor::with_columns(physical_plan.columns.clone());
263        let mut result = executor.execute(physical_plan.operator.as_mut())?;
264
265        // Add execution metrics
266        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
267        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
268        result.execution_time_ms = Some(elapsed_ms);
269        result.rows_scanned = Some(rows_scanned);
270
271        Ok(result)
272    }
273
274    /// Translates an LPG query to a logical plan.
275    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
276        match language {
277            #[cfg(feature = "gql")]
278            QueryLanguage::Gql => {
279                use crate::query::gql_translator;
280                gql_translator::translate(query)
281            }
282            #[cfg(feature = "cypher")]
283            QueryLanguage::Cypher => {
284                use crate::query::cypher_translator;
285                cypher_translator::translate(query)
286            }
287            #[cfg(feature = "gremlin")]
288            QueryLanguage::Gremlin => {
289                use crate::query::gremlin_translator;
290                gremlin_translator::translate(query)
291            }
292            #[cfg(feature = "graphql")]
293            QueryLanguage::GraphQL => {
294                use crate::query::graphql_translator;
295                graphql_translator::translate(query)
296            }
297            #[cfg(feature = "sql-pgq")]
298            QueryLanguage::SqlPgq => {
299                use crate::query::sql_pgq_translator;
300                sql_pgq_translator::translate(query)
301            }
302            #[allow(unreachable_patterns)]
303            _ => Err(Error::Internal(format!(
304                "Language {:?} is not an LPG language",
305                language
306            ))),
307        }
308    }
309
310    /// Processes an RDF query (SPARQL, GraphQL-RDF).
311    #[cfg(feature = "rdf")]
312    fn process_rdf(
313        &self,
314        query: &str,
315        language: QueryLanguage,
316        _params: Option<&QueryParams>,
317    ) -> Result<QueryResult> {
318        use crate::query::planner_rdf::RdfPlanner;
319
320        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
321            Error::Internal("RDF store not configured for this processor".to_string())
322        })?;
323
324        // 1. Parse and translate to logical plan
325        let logical_plan = self.translate_rdf(query, language)?;
326
327        // 2. Semantic validation
328        let mut binder = Binder::new();
329        let _binding_context = binder.bind(&logical_plan)?;
330
331        // 3. Optimize the plan
332        let optimized_plan = self.optimizer.optimize(logical_plan)?;
333
334        // 4. Convert to physical plan (using RDF planner)
335        let planner = RdfPlanner::new(Arc::clone(rdf_store));
336        let mut physical_plan = planner.plan(&optimized_plan)?;
337
338        // 5. Execute and collect results
339        let executor = Executor::with_columns(physical_plan.columns.clone());
340        executor.execute(physical_plan.operator.as_mut())
341    }
342
343    /// Translates an RDF query to a logical plan.
344    #[cfg(feature = "rdf")]
345    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
346        match language {
347            #[cfg(feature = "sparql")]
348            QueryLanguage::Sparql => {
349                use crate::query::sparql_translator;
350                sparql_translator::translate(query)
351            }
352            #[cfg(all(feature = "graphql", feature = "rdf"))]
353            QueryLanguage::GraphQLRdf => {
354                use crate::query::graphql_rdf_translator;
355                // Default namespace for GraphQL-RDF queries
356                graphql_rdf_translator::translate(query, "http://example.org/")
357            }
358            _ => Err(Error::Internal(format!(
359                "Language {:?} is not an RDF language",
360                language
361            ))),
362        }
363    }
364
365    /// Returns a reference to the LPG store.
366    #[must_use]
367    pub fn lpg_store(&self) -> &Arc<LpgStore> {
368        &self.lpg_store
369    }
370
371    /// Returns a reference to the catalog.
372    #[must_use]
373    pub fn catalog(&self) -> &Arc<Catalog> {
374        &self.catalog
375    }
376
377    /// Returns a reference to the optimizer.
378    #[must_use]
379    pub fn optimizer(&self) -> &Optimizer {
380        &self.optimizer
381    }
382
383    /// Returns a reference to the RDF store (if configured).
384    #[cfg(feature = "rdf")]
385    #[must_use]
386    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
387        self.rdf_store.as_ref()
388    }
389}
390
391// Legacy compatibility: Keep the old API working
392impl QueryProcessor {
393    /// Creates a new query processor (legacy API).
394    ///
395    /// This creates a processor with an empty in-memory store.
396    /// Prefer using [`QueryProcessor::for_lpg`] with an explicit store.
397    #[must_use]
398    #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
399    pub fn new() -> Self {
400        Self::for_lpg(Arc::new(LpgStore::new()))
401    }
402
403    /// Processes a query using default GQL language (legacy API).
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if the query fails.
408    #[cfg(feature = "gql")]
409    #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
410    pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
411        self.process(query, QueryLanguage::Gql, None)
412    }
413
414    /// Returns a reference to the transaction manager.
415    #[must_use]
416    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
417        &self.tx_manager
418    }
419}
420
421impl Default for QueryProcessor {
422    fn default() -> Self {
423        Self::for_lpg(Arc::new(LpgStore::new()))
424    }
425}
426
427/// Substitutes parameters in a logical plan with their values.
428fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
429    substitute_in_operator(&mut plan.root, params)
430}
431
432/// Recursively substitutes parameters in an operator.
433fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
434    use crate::query::plan::*;
435
436    match op {
437        LogicalOperator::Filter(filter) => {
438            substitute_in_expression(&mut filter.predicate, params)?;
439            substitute_in_operator(&mut filter.input, params)?;
440        }
441        LogicalOperator::Return(ret) => {
442            for item in &mut ret.items {
443                substitute_in_expression(&mut item.expression, params)?;
444            }
445            substitute_in_operator(&mut ret.input, params)?;
446        }
447        LogicalOperator::Project(proj) => {
448            for p in &mut proj.projections {
449                substitute_in_expression(&mut p.expression, params)?;
450            }
451            substitute_in_operator(&mut proj.input, params)?;
452        }
453        LogicalOperator::NodeScan(scan) => {
454            if let Some(input) = &mut scan.input {
455                substitute_in_operator(input, params)?;
456            }
457        }
458        LogicalOperator::EdgeScan(scan) => {
459            if let Some(input) = &mut scan.input {
460                substitute_in_operator(input, params)?;
461            }
462        }
463        LogicalOperator::Expand(expand) => {
464            substitute_in_operator(&mut expand.input, params)?;
465        }
466        LogicalOperator::Join(join) => {
467            substitute_in_operator(&mut join.left, params)?;
468            substitute_in_operator(&mut join.right, params)?;
469            for cond in &mut join.conditions {
470                substitute_in_expression(&mut cond.left, params)?;
471                substitute_in_expression(&mut cond.right, params)?;
472            }
473        }
474        LogicalOperator::LeftJoin(join) => {
475            substitute_in_operator(&mut join.left, params)?;
476            substitute_in_operator(&mut join.right, params)?;
477            if let Some(cond) = &mut join.condition {
478                substitute_in_expression(cond, params)?;
479            }
480        }
481        LogicalOperator::Aggregate(agg) => {
482            for expr in &mut agg.group_by {
483                substitute_in_expression(expr, params)?;
484            }
485            for agg_expr in &mut agg.aggregates {
486                if let Some(expr) = &mut agg_expr.expression {
487                    substitute_in_expression(expr, params)?;
488                }
489            }
490            substitute_in_operator(&mut agg.input, params)?;
491        }
492        LogicalOperator::Sort(sort) => {
493            for key in &mut sort.keys {
494                substitute_in_expression(&mut key.expression, params)?;
495            }
496            substitute_in_operator(&mut sort.input, params)?;
497        }
498        LogicalOperator::Limit(limit) => {
499            substitute_in_operator(&mut limit.input, params)?;
500        }
501        LogicalOperator::Skip(skip) => {
502            substitute_in_operator(&mut skip.input, params)?;
503        }
504        LogicalOperator::Distinct(distinct) => {
505            substitute_in_operator(&mut distinct.input, params)?;
506        }
507        LogicalOperator::CreateNode(create) => {
508            for (_, expr) in &mut create.properties {
509                substitute_in_expression(expr, params)?;
510            }
511            if let Some(input) = &mut create.input {
512                substitute_in_operator(input, params)?;
513            }
514        }
515        LogicalOperator::CreateEdge(create) => {
516            for (_, expr) in &mut create.properties {
517                substitute_in_expression(expr, params)?;
518            }
519            substitute_in_operator(&mut create.input, params)?;
520        }
521        LogicalOperator::DeleteNode(delete) => {
522            substitute_in_operator(&mut delete.input, params)?;
523        }
524        LogicalOperator::DeleteEdge(delete) => {
525            substitute_in_operator(&mut delete.input, params)?;
526        }
527        LogicalOperator::SetProperty(set) => {
528            for (_, expr) in &mut set.properties {
529                substitute_in_expression(expr, params)?;
530            }
531            substitute_in_operator(&mut set.input, params)?;
532        }
533        LogicalOperator::Union(union) => {
534            for input in &mut union.inputs {
535                substitute_in_operator(input, params)?;
536            }
537        }
538        LogicalOperator::AntiJoin(anti) => {
539            substitute_in_operator(&mut anti.left, params)?;
540            substitute_in_operator(&mut anti.right, params)?;
541        }
542        LogicalOperator::Bind(bind) => {
543            substitute_in_expression(&mut bind.expression, params)?;
544            substitute_in_operator(&mut bind.input, params)?;
545        }
546        LogicalOperator::TripleScan(scan) => {
547            if let Some(input) = &mut scan.input {
548                substitute_in_operator(input, params)?;
549            }
550        }
551        LogicalOperator::Unwind(unwind) => {
552            substitute_in_expression(&mut unwind.expression, params)?;
553            substitute_in_operator(&mut unwind.input, params)?;
554        }
555        LogicalOperator::Merge(merge) => {
556            for (_, expr) in &mut merge.match_properties {
557                substitute_in_expression(expr, params)?;
558            }
559            for (_, expr) in &mut merge.on_create {
560                substitute_in_expression(expr, params)?;
561            }
562            for (_, expr) in &mut merge.on_match {
563                substitute_in_expression(expr, params)?;
564            }
565            substitute_in_operator(&mut merge.input, params)?;
566        }
567        LogicalOperator::AddLabel(add_label) => {
568            substitute_in_operator(&mut add_label.input, params)?;
569        }
570        LogicalOperator::RemoveLabel(remove_label) => {
571            substitute_in_operator(&mut remove_label.input, params)?;
572        }
573        LogicalOperator::ShortestPath(sp) => {
574            substitute_in_operator(&mut sp.input, params)?;
575        }
576        // SPARQL Update operators
577        LogicalOperator::InsertTriple(insert) => {
578            if let Some(ref mut input) = insert.input {
579                substitute_in_operator(input, params)?;
580            }
581        }
582        LogicalOperator::DeleteTriple(delete) => {
583            if let Some(ref mut input) = delete.input {
584                substitute_in_operator(input, params)?;
585            }
586        }
587        LogicalOperator::Modify(modify) => {
588            substitute_in_operator(&mut modify.where_clause, params)?;
589        }
590        LogicalOperator::ClearGraph(_)
591        | LogicalOperator::CreateGraph(_)
592        | LogicalOperator::DropGraph(_)
593        | LogicalOperator::LoadGraph(_)
594        | LogicalOperator::CopyGraph(_)
595        | LogicalOperator::MoveGraph(_)
596        | LogicalOperator::AddGraph(_) => {}
597        LogicalOperator::Empty => {}
598        LogicalOperator::VectorScan(scan) => {
599            substitute_in_expression(&mut scan.query_vector, params)?;
600            if let Some(ref mut input) = scan.input {
601                substitute_in_operator(input, params)?;
602            }
603        }
604        LogicalOperator::VectorJoin(join) => {
605            substitute_in_expression(&mut join.query_vector, params)?;
606            substitute_in_operator(&mut join.input, params)?;
607        }
608        // DDL operators have no expressions to substitute
609        LogicalOperator::CreatePropertyGraph(_) => {}
610    }
611    Ok(())
612}
613
614/// Substitutes parameters in an expression with their values.
615fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
616    use crate::query::plan::LogicalExpression;
617
618    match expr {
619        LogicalExpression::Parameter(name) => {
620            if let Some(value) = params.get(name) {
621                *expr = LogicalExpression::Literal(value.clone());
622            } else {
623                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
624            }
625        }
626        LogicalExpression::Binary { left, right, .. } => {
627            substitute_in_expression(left, params)?;
628            substitute_in_expression(right, params)?;
629        }
630        LogicalExpression::Unary { operand, .. } => {
631            substitute_in_expression(operand, params)?;
632        }
633        LogicalExpression::FunctionCall { args, .. } => {
634            for arg in args {
635                substitute_in_expression(arg, params)?;
636            }
637        }
638        LogicalExpression::List(items) => {
639            for item in items {
640                substitute_in_expression(item, params)?;
641            }
642        }
643        LogicalExpression::Map(pairs) => {
644            for (_, value) in pairs {
645                substitute_in_expression(value, params)?;
646            }
647        }
648        LogicalExpression::IndexAccess { base, index } => {
649            substitute_in_expression(base, params)?;
650            substitute_in_expression(index, params)?;
651        }
652        LogicalExpression::SliceAccess { base, start, end } => {
653            substitute_in_expression(base, params)?;
654            if let Some(s) = start {
655                substitute_in_expression(s, params)?;
656            }
657            if let Some(e) = end {
658                substitute_in_expression(e, params)?;
659            }
660        }
661        LogicalExpression::Case {
662            operand,
663            when_clauses,
664            else_clause,
665        } => {
666            if let Some(op) = operand {
667                substitute_in_expression(op, params)?;
668            }
669            for (cond, result) in when_clauses {
670                substitute_in_expression(cond, params)?;
671                substitute_in_expression(result, params)?;
672            }
673            if let Some(el) = else_clause {
674                substitute_in_expression(el, params)?;
675            }
676        }
677        LogicalExpression::Property { .. }
678        | LogicalExpression::Variable(_)
679        | LogicalExpression::Literal(_)
680        | LogicalExpression::Labels(_)
681        | LogicalExpression::Type(_)
682        | LogicalExpression::Id(_) => {}
683        LogicalExpression::ListComprehension {
684            list_expr,
685            filter_expr,
686            map_expr,
687            ..
688        } => {
689            substitute_in_expression(list_expr, params)?;
690            if let Some(filter) = filter_expr {
691                substitute_in_expression(filter, params)?;
692            }
693            substitute_in_expression(map_expr, params)?;
694        }
695        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
696            // Subqueries would need recursive parameter substitution
697        }
698    }
699    Ok(())
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    #[test]
707    fn test_query_language_is_lpg() {
708        #[cfg(feature = "gql")]
709        assert!(QueryLanguage::Gql.is_lpg());
710        #[cfg(feature = "cypher")]
711        assert!(QueryLanguage::Cypher.is_lpg());
712        #[cfg(feature = "sparql")]
713        assert!(!QueryLanguage::Sparql.is_lpg());
714    }
715
716    #[test]
717    fn test_processor_creation() {
718        let store = Arc::new(LpgStore::new());
719        let processor = QueryProcessor::for_lpg(store);
720        assert!(processor.lpg_store().node_count() == 0);
721    }
722
723    #[cfg(feature = "gql")]
724    #[test]
725    fn test_process_simple_gql() {
726        let store = Arc::new(LpgStore::new());
727        store.create_node(&["Person"]);
728        store.create_node(&["Person"]);
729
730        let processor = QueryProcessor::for_lpg(store);
731        let result = processor
732            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
733            .unwrap();
734
735        assert_eq!(result.row_count(), 2);
736        assert_eq!(result.columns[0], "n");
737    }
738
739    #[cfg(feature = "cypher")]
740    #[test]
741    fn test_process_simple_cypher() {
742        let store = Arc::new(LpgStore::new());
743        store.create_node(&["Person"]);
744
745        let processor = QueryProcessor::for_lpg(store);
746        let result = processor
747            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
748            .unwrap();
749
750        assert_eq!(result.row_count(), 1);
751    }
752
753    #[cfg(feature = "gql")]
754    #[test]
755    fn test_process_with_params() {
756        let store = Arc::new(LpgStore::new());
757        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
758        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
759        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
760
761        let processor = QueryProcessor::for_lpg(store);
762
763        // Query with parameter
764        let mut params = HashMap::new();
765        params.insert("min_age".to_string(), Value::Int64(30));
766
767        let result = processor
768            .process(
769                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
770                QueryLanguage::Gql,
771                Some(&params),
772            )
773            .unwrap();
774
775        // Should return 2 people (ages 35 and 45)
776        assert_eq!(result.row_count(), 2);
777    }
778
779    #[cfg(feature = "gql")]
780    #[test]
781    fn test_missing_param_error() {
782        let store = Arc::new(LpgStore::new());
783        store.create_node(&["Person"]);
784
785        let processor = QueryProcessor::for_lpg(store);
786
787        // Query with parameter but empty params map (missing the required param)
788        let params: HashMap<String, Value> = HashMap::new();
789        let result = processor.process(
790            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
791            QueryLanguage::Gql,
792            Some(&params),
793        );
794
795        // Should fail with missing parameter error
796        assert!(result.is_err());
797        let err = result.unwrap_err();
798        assert!(
799            err.to_string().contains("Missing parameter"),
800            "Expected 'Missing parameter' error, got: {}",
801            err
802        );
803    }
804
805    #[cfg(feature = "gql")]
806    #[test]
807    fn test_params_in_filter_with_property() {
808        // Tests parameter substitution in WHERE clause with property comparison
809        let store = Arc::new(LpgStore::new());
810        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
811        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
812
813        let processor = QueryProcessor::for_lpg(store);
814
815        let mut params = HashMap::new();
816        params.insert("threshold".to_string(), Value::Int64(15));
817
818        let result = processor
819            .process(
820                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
821                QueryLanguage::Gql,
822                Some(&params),
823            )
824            .unwrap();
825
826        // Only value=20 matches > 15
827        assert_eq!(result.row_count(), 1);
828        let row = &result.rows[0];
829        assert_eq!(row[0], Value::Int64(20));
830    }
831
832    #[cfg(feature = "gql")]
833    #[test]
834    fn test_params_in_multiple_where_conditions() {
835        // Tests multiple parameters in WHERE clause with AND
836        let store = Arc::new(LpgStore::new());
837        store.create_node_with_props(
838            &["Person"],
839            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
840        );
841        store.create_node_with_props(
842            &["Person"],
843            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
844        );
845        store.create_node_with_props(
846            &["Person"],
847            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
848        );
849
850        let processor = QueryProcessor::for_lpg(store);
851
852        let mut params = HashMap::new();
853        params.insert("min_age".to_string(), Value::Int64(30));
854        params.insert("min_score".to_string(), Value::Int64(75));
855
856        let result = processor
857            .process(
858                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
859                QueryLanguage::Gql,
860                Some(&params),
861            )
862            .unwrap();
863
864        // Only the person with age=35, score=90 matches both conditions
865        assert_eq!(result.row_count(), 1);
866    }
867
868    #[cfg(feature = "gql")]
869    #[test]
870    fn test_params_with_in_list() {
871        // Tests parameter as a value checked against IN list
872        let store = Arc::new(LpgStore::new());
873        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
874        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
875        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
876
877        let processor = QueryProcessor::for_lpg(store);
878
879        // Check if a parameter value matches any of the statuses
880        let mut params = HashMap::new();
881        params.insert("target".to_string(), Value::String("active".into()));
882
883        let result = processor
884            .process(
885                "MATCH (n:Item) WHERE n.status = $target RETURN n",
886                QueryLanguage::Gql,
887                Some(&params),
888            )
889            .unwrap();
890
891        assert_eq!(result.row_count(), 1);
892    }
893
894    #[cfg(feature = "gql")]
895    #[test]
896    fn test_params_same_type_comparison() {
897        // Tests that same-type parameter comparisons work correctly
898        let store = Arc::new(LpgStore::new());
899        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
900        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
901
902        let processor = QueryProcessor::for_lpg(store);
903
904        // Compare int property with int parameter
905        let mut params = HashMap::new();
906        params.insert("threshold".to_string(), Value::Int64(75));
907
908        let result = processor
909            .process(
910                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
911                QueryLanguage::Gql,
912                Some(&params),
913            )
914            .unwrap();
915
916        // Only value=100 matches > 75
917        assert_eq!(result.row_count(), 1);
918    }
919
920    #[cfg(feature = "gql")]
921    #[test]
922    fn test_process_empty_result_has_columns() {
923        // Tests that empty results still have correct column names
924        let store = Arc::new(LpgStore::new());
925        // Don't create any nodes
926
927        let processor = QueryProcessor::for_lpg(store);
928        let result = processor
929            .process(
930                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
931                QueryLanguage::Gql,
932                None,
933            )
934            .unwrap();
935
936        assert_eq!(result.row_count(), 0);
937        assert_eq!(result.columns.len(), 2);
938        assert_eq!(result.columns[0], "name");
939        assert_eq!(result.columns[1], "age");
940    }
941
942    #[cfg(feature = "gql")]
943    #[test]
944    fn test_params_string_equality() {
945        // Tests string parameter equality comparison
946        let store = Arc::new(LpgStore::new());
947        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
948        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
949        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
950
951        let processor = QueryProcessor::for_lpg(store);
952
953        let mut params = HashMap::new();
954        params.insert("target".to_string(), Value::String("beta".into()));
955
956        let result = processor
957            .process(
958                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
959                QueryLanguage::Gql,
960                Some(&params),
961            )
962            .unwrap();
963
964        assert_eq!(result.row_count(), 1);
965        assert_eq!(result.rows[0][0], Value::String("beta".into()));
966    }
967}