Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    tx_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    tx_context: Option<(EpochId, TxId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            tx_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            tx_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
136        let optimizer = Optimizer::from_store(&store);
137        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
138        Self {
139            lpg_store: store,
140            graph_store,
141            tx_manager,
142            catalog: Arc::new(Catalog::new()),
143            optimizer,
144            tx_context: None,
145            #[cfg(feature = "rdf")]
146            rdf_store: None,
147        }
148    }
149
150    /// Creates a query processor backed by any GraphStoreMut implementation.
151    #[must_use]
152    pub fn for_graph_store_with_tx(
153        store: Arc<dyn GraphStoreMut>,
154        tx_manager: Arc<TransactionManager>,
155    ) -> Self {
156        let optimizer = Optimizer::from_graph_store(&*store);
157        Self {
158            lpg_store: Arc::new(LpgStore::new()), // dummy, not used
159            graph_store: store,
160            tx_manager,
161            catalog: Arc::new(Catalog::new()),
162            optimizer,
163            tx_context: None,
164            #[cfg(feature = "rdf")]
165            rdf_store: None,
166        }
167    }
168
169    /// Creates a new query processor with both LPG and RDF stores.
170    #[cfg(feature = "rdf")]
171    #[must_use]
172    pub fn with_rdf(
173        lpg_store: Arc<LpgStore>,
174        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
175    ) -> Self {
176        let optimizer = Optimizer::from_store(&lpg_store);
177        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
178        Self {
179            lpg_store,
180            graph_store,
181            tx_manager: Arc::new(TransactionManager::new()),
182            catalog: Arc::new(Catalog::new()),
183            optimizer,
184            tx_context: None,
185            rdf_store: Some(rdf_store),
186        }
187    }
188
189    /// Sets the transaction context for MVCC visibility.
190    ///
191    /// This should be called when the processor is used within a transaction.
192    #[must_use]
193    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
194        self.tx_context = Some((viewing_epoch, tx_id));
195        self
196    }
197
198    /// Sets a custom catalog.
199    #[must_use]
200    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
201        self.catalog = catalog;
202        self
203    }
204
205    /// Sets a custom optimizer.
206    #[must_use]
207    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
208        self.optimizer = optimizer;
209        self
210    }
211
212    /// Processes a query string and returns results.
213    ///
214    /// Pipeline:
215    /// 1. Parse (language-specific parser → AST)
216    /// 2. Translate (AST → LogicalPlan)
217    /// 3. Bind (semantic validation)
218    /// 4. Optimize (filter pushdown, join reorder, etc.)
219    /// 5. Plan (logical → physical operators)
220    /// 6. Execute (run operators, collect results)
221    ///
222    /// # Arguments
223    ///
224    /// * `query` - The query string
225    /// * `language` - Which query language to use
226    /// * `params` - Optional query parameters for prepared statements
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if any stage of the pipeline fails.
231    pub fn process(
232        &self,
233        query: &str,
234        language: QueryLanguage,
235        params: Option<&QueryParams>,
236    ) -> Result<QueryResult> {
237        if language.is_lpg() {
238            self.process_lpg(query, language, params)
239        } else {
240            #[cfg(feature = "rdf")]
241            {
242                self.process_rdf(query, language, params)
243            }
244            #[cfg(not(feature = "rdf"))]
245            {
246                Err(Error::Internal(
247                    "RDF support not enabled. Compile with --features rdf".to_string(),
248                ))
249            }
250        }
251    }
252
253    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
254    fn process_lpg(
255        &self,
256        query: &str,
257        language: QueryLanguage,
258        params: Option<&QueryParams>,
259    ) -> Result<QueryResult> {
260        let start_time = std::time::Instant::now();
261
262        // 1. Parse and translate to logical plan
263        let mut logical_plan = self.translate_lpg(query, language)?;
264
265        // 2. Substitute parameters if provided
266        if let Some(params) = params {
267            substitute_params(&mut logical_plan, params)?;
268        }
269
270        // 3. Semantic validation
271        let mut binder = Binder::new();
272        let _binding_context = binder.bind(&logical_plan)?;
273
274        // 4. Optimize the plan
275        let optimized_plan = self.optimizer.optimize(logical_plan)?;
276
277        // 5. Convert to physical plan with transaction context
278        let planner = if let Some((epoch, tx_id)) = self.tx_context {
279            Planner::with_context(
280                Arc::clone(&self.graph_store),
281                Arc::clone(&self.tx_manager),
282                Some(tx_id),
283                epoch,
284            )
285        } else {
286            Planner::with_context(
287                Arc::clone(&self.graph_store),
288                Arc::clone(&self.tx_manager),
289                None,
290                self.tx_manager.current_epoch(),
291            )
292        };
293        let mut physical_plan = planner.plan(&optimized_plan)?;
294
295        // 6. Execute and collect results
296        let executor = Executor::with_columns(physical_plan.columns.clone());
297        let mut result = executor.execute(physical_plan.operator.as_mut())?;
298
299        // Add execution metrics
300        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
301        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
302        result.execution_time_ms = Some(elapsed_ms);
303        result.rows_scanned = Some(rows_scanned);
304
305        Ok(result)
306    }
307
308    /// Translates an LPG query to a logical plan.
309    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
310        match language {
311            #[cfg(feature = "gql")]
312            QueryLanguage::Gql => {
313                use crate::query::gql_translator;
314                gql_translator::translate(query)
315            }
316            #[cfg(feature = "cypher")]
317            QueryLanguage::Cypher => {
318                use crate::query::cypher_translator;
319                cypher_translator::translate(query)
320            }
321            #[cfg(feature = "gremlin")]
322            QueryLanguage::Gremlin => {
323                use crate::query::gremlin_translator;
324                gremlin_translator::translate(query)
325            }
326            #[cfg(feature = "graphql")]
327            QueryLanguage::GraphQL => {
328                use crate::query::graphql_translator;
329                graphql_translator::translate(query)
330            }
331            #[cfg(feature = "sql-pgq")]
332            QueryLanguage::SqlPgq => {
333                use crate::query::sql_pgq_translator;
334                sql_pgq_translator::translate(query)
335            }
336            #[allow(unreachable_patterns)]
337            _ => Err(Error::Internal(format!(
338                "Language {:?} is not an LPG language",
339                language
340            ))),
341        }
342    }
343
344    /// Processes an RDF query (SPARQL, GraphQL-RDF).
345    #[cfg(feature = "rdf")]
346    fn process_rdf(
347        &self,
348        query: &str,
349        language: QueryLanguage,
350        _params: Option<&QueryParams>,
351    ) -> Result<QueryResult> {
352        use crate::query::planner_rdf::RdfPlanner;
353
354        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
355            Error::Internal("RDF store not configured for this processor".to_string())
356        })?;
357
358        // 1. Parse and translate to logical plan
359        let logical_plan = self.translate_rdf(query, language)?;
360
361        // 2. Semantic validation
362        let mut binder = Binder::new();
363        let _binding_context = binder.bind(&logical_plan)?;
364
365        // 3. Optimize the plan
366        let optimized_plan = self.optimizer.optimize(logical_plan)?;
367
368        // 4. Convert to physical plan (using RDF planner)
369        let planner = RdfPlanner::new(Arc::clone(rdf_store));
370        let mut physical_plan = planner.plan(&optimized_plan)?;
371
372        // 5. Execute and collect results
373        let executor = Executor::with_columns(physical_plan.columns.clone());
374        executor.execute(physical_plan.operator.as_mut())
375    }
376
377    /// Translates an RDF query to a logical plan.
378    #[cfg(feature = "rdf")]
379    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
380        match language {
381            #[cfg(feature = "sparql")]
382            QueryLanguage::Sparql => {
383                use crate::query::sparql_translator;
384                sparql_translator::translate(query)
385            }
386            #[cfg(all(feature = "graphql", feature = "rdf"))]
387            QueryLanguage::GraphQLRdf => {
388                use crate::query::graphql_rdf_translator;
389                // Default namespace for GraphQL-RDF queries
390                graphql_rdf_translator::translate(query, "http://example.org/")
391            }
392            _ => Err(Error::Internal(format!(
393                "Language {:?} is not an RDF language",
394                language
395            ))),
396        }
397    }
398
399    /// Returns a reference to the LPG store.
400    #[must_use]
401    pub fn lpg_store(&self) -> &Arc<LpgStore> {
402        &self.lpg_store
403    }
404
405    /// Returns a reference to the catalog.
406    #[must_use]
407    pub fn catalog(&self) -> &Arc<Catalog> {
408        &self.catalog
409    }
410
411    /// Returns a reference to the optimizer.
412    #[must_use]
413    pub fn optimizer(&self) -> &Optimizer {
414        &self.optimizer
415    }
416
417    /// Returns a reference to the RDF store (if configured).
418    #[cfg(feature = "rdf")]
419    #[must_use]
420    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
421        self.rdf_store.as_ref()
422    }
423}
424
425impl QueryProcessor {
426    /// Returns a reference to the transaction manager.
427    #[must_use]
428    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
429        &self.tx_manager
430    }
431}
432
433/// Substitutes parameters in a logical plan with their values.
434fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
435    substitute_in_operator(&mut plan.root, params)
436}
437
438/// Recursively substitutes parameters in an operator.
439fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
440    use crate::query::plan::*;
441
442    match op {
443        LogicalOperator::Filter(filter) => {
444            substitute_in_expression(&mut filter.predicate, params)?;
445            substitute_in_operator(&mut filter.input, params)?;
446        }
447        LogicalOperator::Return(ret) => {
448            for item in &mut ret.items {
449                substitute_in_expression(&mut item.expression, params)?;
450            }
451            substitute_in_operator(&mut ret.input, params)?;
452        }
453        LogicalOperator::Project(proj) => {
454            for p in &mut proj.projections {
455                substitute_in_expression(&mut p.expression, params)?;
456            }
457            substitute_in_operator(&mut proj.input, params)?;
458        }
459        LogicalOperator::NodeScan(scan) => {
460            if let Some(input) = &mut scan.input {
461                substitute_in_operator(input, params)?;
462            }
463        }
464        LogicalOperator::EdgeScan(scan) => {
465            if let Some(input) = &mut scan.input {
466                substitute_in_operator(input, params)?;
467            }
468        }
469        LogicalOperator::Expand(expand) => {
470            substitute_in_operator(&mut expand.input, params)?;
471        }
472        LogicalOperator::Join(join) => {
473            substitute_in_operator(&mut join.left, params)?;
474            substitute_in_operator(&mut join.right, params)?;
475            for cond in &mut join.conditions {
476                substitute_in_expression(&mut cond.left, params)?;
477                substitute_in_expression(&mut cond.right, params)?;
478            }
479        }
480        LogicalOperator::LeftJoin(join) => {
481            substitute_in_operator(&mut join.left, params)?;
482            substitute_in_operator(&mut join.right, params)?;
483            if let Some(cond) = &mut join.condition {
484                substitute_in_expression(cond, params)?;
485            }
486        }
487        LogicalOperator::Aggregate(agg) => {
488            for expr in &mut agg.group_by {
489                substitute_in_expression(expr, params)?;
490            }
491            for agg_expr in &mut agg.aggregates {
492                if let Some(expr) = &mut agg_expr.expression {
493                    substitute_in_expression(expr, params)?;
494                }
495            }
496            substitute_in_operator(&mut agg.input, params)?;
497        }
498        LogicalOperator::Sort(sort) => {
499            for key in &mut sort.keys {
500                substitute_in_expression(&mut key.expression, params)?;
501            }
502            substitute_in_operator(&mut sort.input, params)?;
503        }
504        LogicalOperator::Limit(limit) => {
505            substitute_in_operator(&mut limit.input, params)?;
506        }
507        LogicalOperator::Skip(skip) => {
508            substitute_in_operator(&mut skip.input, params)?;
509        }
510        LogicalOperator::Distinct(distinct) => {
511            substitute_in_operator(&mut distinct.input, params)?;
512        }
513        LogicalOperator::CreateNode(create) => {
514            for (_, expr) in &mut create.properties {
515                substitute_in_expression(expr, params)?;
516            }
517            if let Some(input) = &mut create.input {
518                substitute_in_operator(input, params)?;
519            }
520        }
521        LogicalOperator::CreateEdge(create) => {
522            for (_, expr) in &mut create.properties {
523                substitute_in_expression(expr, params)?;
524            }
525            substitute_in_operator(&mut create.input, params)?;
526        }
527        LogicalOperator::DeleteNode(delete) => {
528            substitute_in_operator(&mut delete.input, params)?;
529        }
530        LogicalOperator::DeleteEdge(delete) => {
531            substitute_in_operator(&mut delete.input, params)?;
532        }
533        LogicalOperator::SetProperty(set) => {
534            for (_, expr) in &mut set.properties {
535                substitute_in_expression(expr, params)?;
536            }
537            substitute_in_operator(&mut set.input, params)?;
538        }
539        LogicalOperator::Union(union) => {
540            for input in &mut union.inputs {
541                substitute_in_operator(input, params)?;
542            }
543        }
544        LogicalOperator::AntiJoin(anti) => {
545            substitute_in_operator(&mut anti.left, params)?;
546            substitute_in_operator(&mut anti.right, params)?;
547        }
548        LogicalOperator::Bind(bind) => {
549            substitute_in_expression(&mut bind.expression, params)?;
550            substitute_in_operator(&mut bind.input, params)?;
551        }
552        LogicalOperator::TripleScan(scan) => {
553            if let Some(input) = &mut scan.input {
554                substitute_in_operator(input, params)?;
555            }
556        }
557        LogicalOperator::Unwind(unwind) => {
558            substitute_in_expression(&mut unwind.expression, params)?;
559            substitute_in_operator(&mut unwind.input, params)?;
560        }
561        LogicalOperator::MapCollect(mc) => {
562            substitute_in_operator(&mut mc.input, params)?;
563        }
564        LogicalOperator::Merge(merge) => {
565            for (_, expr) in &mut merge.match_properties {
566                substitute_in_expression(expr, params)?;
567            }
568            for (_, expr) in &mut merge.on_create {
569                substitute_in_expression(expr, params)?;
570            }
571            for (_, expr) in &mut merge.on_match {
572                substitute_in_expression(expr, params)?;
573            }
574            substitute_in_operator(&mut merge.input, params)?;
575        }
576        LogicalOperator::MergeRelationship(merge_rel) => {
577            for (_, expr) in &mut merge_rel.match_properties {
578                substitute_in_expression(expr, params)?;
579            }
580            for (_, expr) in &mut merge_rel.on_create {
581                substitute_in_expression(expr, params)?;
582            }
583            for (_, expr) in &mut merge_rel.on_match {
584                substitute_in_expression(expr, params)?;
585            }
586            substitute_in_operator(&mut merge_rel.input, params)?;
587        }
588        LogicalOperator::AddLabel(add_label) => {
589            substitute_in_operator(&mut add_label.input, params)?;
590        }
591        LogicalOperator::RemoveLabel(remove_label) => {
592            substitute_in_operator(&mut remove_label.input, params)?;
593        }
594        LogicalOperator::ShortestPath(sp) => {
595            substitute_in_operator(&mut sp.input, params)?;
596        }
597        // SPARQL Update operators
598        LogicalOperator::InsertTriple(insert) => {
599            if let Some(ref mut input) = insert.input {
600                substitute_in_operator(input, params)?;
601            }
602        }
603        LogicalOperator::DeleteTriple(delete) => {
604            if let Some(ref mut input) = delete.input {
605                substitute_in_operator(input, params)?;
606            }
607        }
608        LogicalOperator::Modify(modify) => {
609            substitute_in_operator(&mut modify.where_clause, params)?;
610        }
611        LogicalOperator::ClearGraph(_)
612        | LogicalOperator::CreateGraph(_)
613        | LogicalOperator::DropGraph(_)
614        | LogicalOperator::LoadGraph(_)
615        | LogicalOperator::CopyGraph(_)
616        | LogicalOperator::MoveGraph(_)
617        | LogicalOperator::AddGraph(_) => {}
618        LogicalOperator::Empty => {}
619        LogicalOperator::VectorScan(scan) => {
620            substitute_in_expression(&mut scan.query_vector, params)?;
621            if let Some(ref mut input) = scan.input {
622                substitute_in_operator(input, params)?;
623            }
624        }
625        LogicalOperator::VectorJoin(join) => {
626            substitute_in_expression(&mut join.query_vector, params)?;
627            substitute_in_operator(&mut join.input, params)?;
628        }
629        // DDL operators have no expressions to substitute
630        LogicalOperator::CreatePropertyGraph(_) => {}
631        // Procedure calls: arguments could contain parameters but we handle at execution time
632        LogicalOperator::CallProcedure(_) => {}
633    }
634    Ok(())
635}
636
637/// Substitutes parameters in an expression with their values.
638fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
639    use crate::query::plan::LogicalExpression;
640
641    match expr {
642        LogicalExpression::Parameter(name) => {
643            if let Some(value) = params.get(name) {
644                *expr = LogicalExpression::Literal(value.clone());
645            } else {
646                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
647            }
648        }
649        LogicalExpression::Binary { left, right, .. } => {
650            substitute_in_expression(left, params)?;
651            substitute_in_expression(right, params)?;
652        }
653        LogicalExpression::Unary { operand, .. } => {
654            substitute_in_expression(operand, params)?;
655        }
656        LogicalExpression::FunctionCall { args, .. } => {
657            for arg in args {
658                substitute_in_expression(arg, params)?;
659            }
660        }
661        LogicalExpression::List(items) => {
662            for item in items {
663                substitute_in_expression(item, params)?;
664            }
665        }
666        LogicalExpression::Map(pairs) => {
667            for (_, value) in pairs {
668                substitute_in_expression(value, params)?;
669            }
670        }
671        LogicalExpression::IndexAccess { base, index } => {
672            substitute_in_expression(base, params)?;
673            substitute_in_expression(index, params)?;
674        }
675        LogicalExpression::SliceAccess { base, start, end } => {
676            substitute_in_expression(base, params)?;
677            if let Some(s) = start {
678                substitute_in_expression(s, params)?;
679            }
680            if let Some(e) = end {
681                substitute_in_expression(e, params)?;
682            }
683        }
684        LogicalExpression::Case {
685            operand,
686            when_clauses,
687            else_clause,
688        } => {
689            if let Some(op) = operand {
690                substitute_in_expression(op, params)?;
691            }
692            for (cond, result) in when_clauses {
693                substitute_in_expression(cond, params)?;
694                substitute_in_expression(result, params)?;
695            }
696            if let Some(el) = else_clause {
697                substitute_in_expression(el, params)?;
698            }
699        }
700        LogicalExpression::Property { .. }
701        | LogicalExpression::Variable(_)
702        | LogicalExpression::Literal(_)
703        | LogicalExpression::Labels(_)
704        | LogicalExpression::Type(_)
705        | LogicalExpression::Id(_) => {}
706        LogicalExpression::ListComprehension {
707            list_expr,
708            filter_expr,
709            map_expr,
710            ..
711        } => {
712            substitute_in_expression(list_expr, params)?;
713            if let Some(filter) = filter_expr {
714                substitute_in_expression(filter, params)?;
715            }
716            substitute_in_expression(map_expr, params)?;
717        }
718        LogicalExpression::ListPredicate {
719            list_expr,
720            predicate,
721            ..
722        } => {
723            substitute_in_expression(list_expr, params)?;
724            substitute_in_expression(predicate, params)?;
725        }
726        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
727            // Subqueries would need recursive parameter substitution
728        }
729    }
730    Ok(())
731}
732
733#[cfg(test)]
734mod tests {
735    use super::*;
736
737    #[test]
738    fn test_query_language_is_lpg() {
739        #[cfg(feature = "gql")]
740        assert!(QueryLanguage::Gql.is_lpg());
741        #[cfg(feature = "cypher")]
742        assert!(QueryLanguage::Cypher.is_lpg());
743        #[cfg(feature = "sparql")]
744        assert!(!QueryLanguage::Sparql.is_lpg());
745    }
746
747    #[test]
748    fn test_processor_creation() {
749        let store = Arc::new(LpgStore::new());
750        let processor = QueryProcessor::for_lpg(store);
751        assert!(processor.lpg_store().node_count() == 0);
752    }
753
754    #[cfg(feature = "gql")]
755    #[test]
756    fn test_process_simple_gql() {
757        let store = Arc::new(LpgStore::new());
758        store.create_node(&["Person"]);
759        store.create_node(&["Person"]);
760
761        let processor = QueryProcessor::for_lpg(store);
762        let result = processor
763            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
764            .unwrap();
765
766        assert_eq!(result.row_count(), 2);
767        assert_eq!(result.columns[0], "n");
768    }
769
770    #[cfg(feature = "cypher")]
771    #[test]
772    fn test_process_simple_cypher() {
773        let store = Arc::new(LpgStore::new());
774        store.create_node(&["Person"]);
775
776        let processor = QueryProcessor::for_lpg(store);
777        let result = processor
778            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
779            .unwrap();
780
781        assert_eq!(result.row_count(), 1);
782    }
783
784    #[cfg(feature = "gql")]
785    #[test]
786    fn test_process_with_params() {
787        let store = Arc::new(LpgStore::new());
788        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
789        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
790        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
791
792        let processor = QueryProcessor::for_lpg(store);
793
794        // Query with parameter
795        let mut params = HashMap::new();
796        params.insert("min_age".to_string(), Value::Int64(30));
797
798        let result = processor
799            .process(
800                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
801                QueryLanguage::Gql,
802                Some(&params),
803            )
804            .unwrap();
805
806        // Should return 2 people (ages 35 and 45)
807        assert_eq!(result.row_count(), 2);
808    }
809
810    #[cfg(feature = "gql")]
811    #[test]
812    fn test_missing_param_error() {
813        let store = Arc::new(LpgStore::new());
814        store.create_node(&["Person"]);
815
816        let processor = QueryProcessor::for_lpg(store);
817
818        // Query with parameter but empty params map (missing the required param)
819        let params: HashMap<String, Value> = HashMap::new();
820        let result = processor.process(
821            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
822            QueryLanguage::Gql,
823            Some(&params),
824        );
825
826        // Should fail with missing parameter error
827        assert!(result.is_err());
828        let err = result.unwrap_err();
829        assert!(
830            err.to_string().contains("Missing parameter"),
831            "Expected 'Missing parameter' error, got: {}",
832            err
833        );
834    }
835
836    #[cfg(feature = "gql")]
837    #[test]
838    fn test_params_in_filter_with_property() {
839        // Tests parameter substitution in WHERE clause with property comparison
840        let store = Arc::new(LpgStore::new());
841        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
842        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
843
844        let processor = QueryProcessor::for_lpg(store);
845
846        let mut params = HashMap::new();
847        params.insert("threshold".to_string(), Value::Int64(15));
848
849        let result = processor
850            .process(
851                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
852                QueryLanguage::Gql,
853                Some(&params),
854            )
855            .unwrap();
856
857        // Only value=20 matches > 15
858        assert_eq!(result.row_count(), 1);
859        let row = &result.rows[0];
860        assert_eq!(row[0], Value::Int64(20));
861    }
862
863    #[cfg(feature = "gql")]
864    #[test]
865    fn test_params_in_multiple_where_conditions() {
866        // Tests multiple parameters in WHERE clause with AND
867        let store = Arc::new(LpgStore::new());
868        store.create_node_with_props(
869            &["Person"],
870            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
871        );
872        store.create_node_with_props(
873            &["Person"],
874            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
875        );
876        store.create_node_with_props(
877            &["Person"],
878            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
879        );
880
881        let processor = QueryProcessor::for_lpg(store);
882
883        let mut params = HashMap::new();
884        params.insert("min_age".to_string(), Value::Int64(30));
885        params.insert("min_score".to_string(), Value::Int64(75));
886
887        let result = processor
888            .process(
889                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
890                QueryLanguage::Gql,
891                Some(&params),
892            )
893            .unwrap();
894
895        // Only the person with age=35, score=90 matches both conditions
896        assert_eq!(result.row_count(), 1);
897    }
898
899    #[cfg(feature = "gql")]
900    #[test]
901    fn test_params_with_in_list() {
902        // Tests parameter as a value checked against IN list
903        let store = Arc::new(LpgStore::new());
904        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
905        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
906        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
907
908        let processor = QueryProcessor::for_lpg(store);
909
910        // Check if a parameter value matches any of the statuses
911        let mut params = HashMap::new();
912        params.insert("target".to_string(), Value::String("active".into()));
913
914        let result = processor
915            .process(
916                "MATCH (n:Item) WHERE n.status = $target RETURN n",
917                QueryLanguage::Gql,
918                Some(&params),
919            )
920            .unwrap();
921
922        assert_eq!(result.row_count(), 1);
923    }
924
925    #[cfg(feature = "gql")]
926    #[test]
927    fn test_params_same_type_comparison() {
928        // Tests that same-type parameter comparisons work correctly
929        let store = Arc::new(LpgStore::new());
930        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
931        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
932
933        let processor = QueryProcessor::for_lpg(store);
934
935        // Compare int property with int parameter
936        let mut params = HashMap::new();
937        params.insert("threshold".to_string(), Value::Int64(75));
938
939        let result = processor
940            .process(
941                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
942                QueryLanguage::Gql,
943                Some(&params),
944            )
945            .unwrap();
946
947        // Only value=100 matches > 75
948        assert_eq!(result.row_count(), 1);
949    }
950
951    #[cfg(feature = "gql")]
952    #[test]
953    fn test_process_empty_result_has_columns() {
954        // Tests that empty results still have correct column names
955        let store = Arc::new(LpgStore::new());
956        // Don't create any nodes
957
958        let processor = QueryProcessor::for_lpg(store);
959        let result = processor
960            .process(
961                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
962                QueryLanguage::Gql,
963                None,
964            )
965            .unwrap();
966
967        assert_eq!(result.row_count(), 0);
968        assert_eq!(result.columns.len(), 2);
969        assert_eq!(result.columns[0], "name");
970        assert_eq!(result.columns[1], "age");
971    }
972
973    #[cfg(feature = "gql")]
974    #[test]
975    fn test_params_string_equality() {
976        // Tests string parameter equality comparison
977        let store = Arc::new(LpgStore::new());
978        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
979        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
980        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
981
982        let processor = QueryProcessor::for_lpg(store);
983
984        let mut params = HashMap::new();
985        params.insert("target".to_string(), Value::String("beta".into()));
986
987        let result = processor
988            .process(
989                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
990                QueryLanguage::Gql,
991                Some(&params),
992            )
993            .unwrap();
994
995        assert_eq!(result.row_count(), 1);
996        assert_eq!(result.rows[0][0], Value::String("beta".into()));
997    }
998}