Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    tx_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    tx_context: Option<(EpochId, TxId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            tx_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            tx_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
136        let optimizer = Optimizer::from_store(&store);
137        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
138        Self {
139            lpg_store: store,
140            graph_store,
141            tx_manager,
142            catalog: Arc::new(Catalog::new()),
143            optimizer,
144            tx_context: None,
145            #[cfg(feature = "rdf")]
146            rdf_store: None,
147        }
148    }
149
150    /// Creates a query processor backed by any GraphStoreMut implementation.
151    #[must_use]
152    pub fn for_graph_store_with_tx(
153        store: Arc<dyn GraphStoreMut>,
154        tx_manager: Arc<TransactionManager>,
155    ) -> Self {
156        let optimizer = Optimizer::from_graph_store(&*store);
157        Self {
158            lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy, not used
159            graph_store: store,
160            tx_manager,
161            catalog: Arc::new(Catalog::new()),
162            optimizer,
163            tx_context: None,
164            #[cfg(feature = "rdf")]
165            rdf_store: None,
166        }
167    }
168
169    /// Creates a new query processor with both LPG and RDF stores.
170    #[cfg(feature = "rdf")]
171    #[must_use]
172    pub fn with_rdf(
173        lpg_store: Arc<LpgStore>,
174        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
175    ) -> Self {
176        let optimizer = Optimizer::from_store(&lpg_store);
177        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
178        Self {
179            lpg_store,
180            graph_store,
181            tx_manager: Arc::new(TransactionManager::new()),
182            catalog: Arc::new(Catalog::new()),
183            optimizer,
184            tx_context: None,
185            rdf_store: Some(rdf_store),
186        }
187    }
188
189    /// Sets the transaction context for MVCC visibility.
190    ///
191    /// This should be called when the processor is used within a transaction.
192    #[must_use]
193    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
194        self.tx_context = Some((viewing_epoch, tx_id));
195        self
196    }
197
198    /// Sets a custom catalog.
199    #[must_use]
200    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
201        self.catalog = catalog;
202        self
203    }
204
205    /// Sets a custom optimizer.
206    #[must_use]
207    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
208        self.optimizer = optimizer;
209        self
210    }
211
212    /// Processes a query string and returns results.
213    ///
214    /// Pipeline:
215    /// 1. Parse (language-specific parser → AST)
216    /// 2. Translate (AST → LogicalPlan)
217    /// 3. Bind (semantic validation)
218    /// 4. Optimize (filter pushdown, join reorder, etc.)
219    /// 5. Plan (logical → physical operators)
220    /// 6. Execute (run operators, collect results)
221    ///
222    /// # Arguments
223    ///
224    /// * `query` - The query string
225    /// * `language` - Which query language to use
226    /// * `params` - Optional query parameters for prepared statements
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if any stage of the pipeline fails.
231    pub fn process(
232        &self,
233        query: &str,
234        language: QueryLanguage,
235        params: Option<&QueryParams>,
236    ) -> Result<QueryResult> {
237        if language.is_lpg() {
238            self.process_lpg(query, language, params)
239        } else {
240            #[cfg(feature = "rdf")]
241            {
242                self.process_rdf(query, language, params)
243            }
244            #[cfg(not(feature = "rdf"))]
245            {
246                Err(Error::Internal(
247                    "RDF support not enabled. Compile with --features rdf".to_string(),
248                ))
249            }
250        }
251    }
252
253    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
254    fn process_lpg(
255        &self,
256        query: &str,
257        language: QueryLanguage,
258        params: Option<&QueryParams>,
259    ) -> Result<QueryResult> {
260        #[cfg(not(target_arch = "wasm32"))]
261        let start_time = std::time::Instant::now();
262
263        // 1. Parse and translate to logical plan
264        let mut logical_plan = self.translate_lpg(query, language)?;
265
266        // 2. Substitute parameters if provided
267        if let Some(params) = params {
268            substitute_params(&mut logical_plan, params)?;
269        }
270
271        // 3. Semantic validation
272        let mut binder = Binder::new();
273        let _binding_context = binder.bind(&logical_plan)?;
274
275        // 4. Optimize the plan
276        let optimized_plan = self.optimizer.optimize(logical_plan)?;
277
278        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
279        if optimized_plan.explain {
280            let mut plan = optimized_plan;
281            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
282            return Ok(explain_result(&plan));
283        }
284
285        // 5. Convert to physical plan with transaction context
286        let planner = if let Some((epoch, tx_id)) = self.tx_context {
287            Planner::with_context(
288                Arc::clone(&self.graph_store),
289                Arc::clone(&self.tx_manager),
290                Some(tx_id),
291                epoch,
292            )
293        } else {
294            Planner::with_context(
295                Arc::clone(&self.graph_store),
296                Arc::clone(&self.tx_manager),
297                None,
298                self.tx_manager.current_epoch(),
299            )
300        };
301        let mut physical_plan = planner.plan(&optimized_plan)?;
302
303        // 6. Execute and collect results
304        let executor = Executor::with_columns(physical_plan.columns.clone());
305        let mut result = executor.execute(physical_plan.operator.as_mut())?;
306
307        // Add execution metrics
308        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
309        #[cfg(not(target_arch = "wasm32"))]
310        {
311            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
312            result.execution_time_ms = Some(elapsed_ms);
313        }
314        result.rows_scanned = Some(rows_scanned);
315
316        Ok(result)
317    }
318
319    /// Translates an LPG query to a logical plan.
320    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
321        match language {
322            #[cfg(feature = "gql")]
323            QueryLanguage::Gql => {
324                use crate::query::translators::gql;
325                gql::translate(query)
326            }
327            #[cfg(feature = "cypher")]
328            QueryLanguage::Cypher => {
329                use crate::query::translators::cypher;
330                cypher::translate(query)
331            }
332            #[cfg(feature = "gremlin")]
333            QueryLanguage::Gremlin => {
334                use crate::query::translators::gremlin;
335                gremlin::translate(query)
336            }
337            #[cfg(feature = "graphql")]
338            QueryLanguage::GraphQL => {
339                use crate::query::translators::graphql;
340                graphql::translate(query)
341            }
342            #[cfg(feature = "sql-pgq")]
343            QueryLanguage::SqlPgq => {
344                use crate::query::translators::sql_pgq;
345                sql_pgq::translate(query)
346            }
347            #[allow(unreachable_patterns)]
348            _ => Err(Error::Internal(format!(
349                "Language {:?} is not an LPG language",
350                language
351            ))),
352        }
353    }
354
355    /// Processes an RDF query (SPARQL, GraphQL-RDF).
356    #[cfg(feature = "rdf")]
357    fn process_rdf(
358        &self,
359        query: &str,
360        language: QueryLanguage,
361        _params: Option<&QueryParams>,
362    ) -> Result<QueryResult> {
363        use crate::query::planner::rdf::RdfPlanner;
364
365        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
366            Error::Internal("RDF store not configured for this processor".to_string())
367        })?;
368
369        // 1. Parse and translate to logical plan
370        let logical_plan = self.translate_rdf(query, language)?;
371
372        // 2. Semantic validation
373        let mut binder = Binder::new();
374        let _binding_context = binder.bind(&logical_plan)?;
375
376        // 3. Optimize the plan
377        let optimized_plan = self.optimizer.optimize(logical_plan)?;
378
379        // 3a. EXPLAIN: return the optimized plan tree without executing
380        if optimized_plan.explain {
381            return Ok(explain_result(&optimized_plan));
382        }
383
384        // 4. Convert to physical plan (using RDF planner)
385        let planner = RdfPlanner::new(Arc::clone(rdf_store));
386        let mut physical_plan = planner.plan(&optimized_plan)?;
387
388        // 5. Execute and collect results
389        let executor = Executor::with_columns(physical_plan.columns.clone());
390        executor.execute(physical_plan.operator.as_mut())
391    }
392
393    /// Translates an RDF query to a logical plan.
394    #[cfg(feature = "rdf")]
395    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
396        match language {
397            #[cfg(feature = "sparql")]
398            QueryLanguage::Sparql => {
399                use crate::query::translators::sparql;
400                sparql::translate(query)
401            }
402            #[cfg(all(feature = "graphql", feature = "rdf"))]
403            QueryLanguage::GraphQLRdf => {
404                use crate::query::translators::graphql_rdf;
405                // Default namespace for GraphQL-RDF queries
406                graphql_rdf::translate(query, "http://example.org/")
407            }
408            _ => Err(Error::Internal(format!(
409                "Language {:?} is not an RDF language",
410                language
411            ))),
412        }
413    }
414
415    /// Returns a reference to the LPG store.
416    #[must_use]
417    pub fn lpg_store(&self) -> &Arc<LpgStore> {
418        &self.lpg_store
419    }
420
421    /// Returns a reference to the catalog.
422    #[must_use]
423    pub fn catalog(&self) -> &Arc<Catalog> {
424        &self.catalog
425    }
426
427    /// Returns a reference to the optimizer.
428    #[must_use]
429    pub fn optimizer(&self) -> &Optimizer {
430        &self.optimizer
431    }
432
433    /// Returns a reference to the RDF store (if configured).
434    #[cfg(feature = "rdf")]
435    #[must_use]
436    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
437        self.rdf_store.as_ref()
438    }
439}
440
441impl QueryProcessor {
442    /// Returns a reference to the transaction manager.
443    #[must_use]
444    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
445        &self.tx_manager
446    }
447}
448
449/// Annotates filter operators in the plan with pushdown hints.
450///
451/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
452/// whether a property index exists for equality predicates.
453pub(crate) fn annotate_pushdown_hints(
454    op: &mut LogicalOperator,
455    store: &dyn grafeo_core::graph::GraphStore,
456) {
457    use crate::query::plan::*;
458
459    match op {
460        LogicalOperator::Filter(filter) => {
461            // Recurse into children first
462            annotate_pushdown_hints(&mut filter.input, store);
463
464            // Annotate this filter if it sits on top of a NodeScan
465            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
466                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
467            }
468        }
469        LogicalOperator::NodeScan(op) => {
470            if let Some(input) = &mut op.input {
471                annotate_pushdown_hints(input, store);
472            }
473        }
474        LogicalOperator::EdgeScan(op) => {
475            if let Some(input) = &mut op.input {
476                annotate_pushdown_hints(input, store);
477            }
478        }
479        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
480        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
481        LogicalOperator::Join(op) => {
482            annotate_pushdown_hints(&mut op.left, store);
483            annotate_pushdown_hints(&mut op.right, store);
484        }
485        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
486        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
487        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
488        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
489        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
490        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
491        LogicalOperator::Union(op) => {
492            for input in &mut op.inputs {
493                annotate_pushdown_hints(input, store);
494            }
495        }
496        LogicalOperator::Apply(op) => {
497            annotate_pushdown_hints(&mut op.input, store);
498            annotate_pushdown_hints(&mut op.subplan, store);
499        }
500        LogicalOperator::Otherwise(op) => {
501            annotate_pushdown_hints(&mut op.left, store);
502            annotate_pushdown_hints(&mut op.right, store);
503        }
504        _ => {}
505    }
506}
507
508/// Infers the pushdown strategy for a filter predicate over a node scan.
509fn infer_pushdown(
510    predicate: &LogicalExpression,
511    scan: &crate::query::plan::NodeScanOp,
512    store: &dyn grafeo_core::graph::GraphStore,
513) -> Option<crate::query::plan::PushdownHint> {
514    use crate::query::plan::*;
515
516    match predicate {
517        // Equality: n.prop = value
518        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
519            if let Some(prop) = extract_property_name(left, &scan.variable)
520                .or_else(|| extract_property_name(right, &scan.variable))
521            {
522                if store.has_property_index(&prop) {
523                    return Some(PushdownHint::IndexLookup { property: prop });
524                }
525                if scan.label.is_some() {
526                    return Some(PushdownHint::LabelFirst);
527                }
528            }
529            None
530        }
531        // Range: n.prop > value, n.prop < value, etc.
532        LogicalExpression::Binary {
533            left,
534            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
535            right,
536        } => {
537            if let Some(prop) = extract_property_name(left, &scan.variable)
538                .or_else(|| extract_property_name(right, &scan.variable))
539            {
540                if store.has_property_index(&prop) {
541                    return Some(PushdownHint::RangeScan { property: prop });
542                }
543                if scan.label.is_some() {
544                    return Some(PushdownHint::LabelFirst);
545                }
546            }
547            None
548        }
549        // AND: check the left side (first conjunct) for pushdown
550        LogicalExpression::Binary {
551            left,
552            op: BinaryOp::And,
553            ..
554        } => infer_pushdown(left, scan, store),
555        _ => {
556            // Any other predicate on a labeled scan gets label-first
557            if scan.label.is_some() {
558                Some(PushdownHint::LabelFirst)
559            } else {
560                None
561            }
562        }
563    }
564}
565
566/// Extracts the property name if the expression is `Property { variable, property }`
567/// and the variable matches the scan variable.
568fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
569    if let LogicalExpression::Property { variable, property } = expr
570        && variable == scan_var
571    {
572        Some(property.clone())
573    } else {
574        None
575    }
576}
577
578/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
579pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
580    let tree_text = plan.root.explain_tree();
581    QueryResult {
582        columns: vec!["plan".to_string()],
583        column_types: vec![grafeo_common::types::LogicalType::String],
584        rows: vec![vec![Value::String(tree_text.into())]],
585        execution_time_ms: None,
586        rows_scanned: None,
587        status_message: None,
588    }
589}
590
591/// Substitutes parameters in a logical plan with their values.
592fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
593    substitute_in_operator(&mut plan.root, params)
594}
595
596/// Recursively substitutes parameters in an operator.
597fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
598    use crate::query::plan::*;
599
600    match op {
601        LogicalOperator::Filter(filter) => {
602            substitute_in_expression(&mut filter.predicate, params)?;
603            substitute_in_operator(&mut filter.input, params)?;
604        }
605        LogicalOperator::Return(ret) => {
606            for item in &mut ret.items {
607                substitute_in_expression(&mut item.expression, params)?;
608            }
609            substitute_in_operator(&mut ret.input, params)?;
610        }
611        LogicalOperator::Project(proj) => {
612            for p in &mut proj.projections {
613                substitute_in_expression(&mut p.expression, params)?;
614            }
615            substitute_in_operator(&mut proj.input, params)?;
616        }
617        LogicalOperator::NodeScan(scan) => {
618            if let Some(input) = &mut scan.input {
619                substitute_in_operator(input, params)?;
620            }
621        }
622        LogicalOperator::EdgeScan(scan) => {
623            if let Some(input) = &mut scan.input {
624                substitute_in_operator(input, params)?;
625            }
626        }
627        LogicalOperator::Expand(expand) => {
628            substitute_in_operator(&mut expand.input, params)?;
629        }
630        LogicalOperator::Join(join) => {
631            substitute_in_operator(&mut join.left, params)?;
632            substitute_in_operator(&mut join.right, params)?;
633            for cond in &mut join.conditions {
634                substitute_in_expression(&mut cond.left, params)?;
635                substitute_in_expression(&mut cond.right, params)?;
636            }
637        }
638        LogicalOperator::LeftJoin(join) => {
639            substitute_in_operator(&mut join.left, params)?;
640            substitute_in_operator(&mut join.right, params)?;
641            if let Some(cond) = &mut join.condition {
642                substitute_in_expression(cond, params)?;
643            }
644        }
645        LogicalOperator::Aggregate(agg) => {
646            for expr in &mut agg.group_by {
647                substitute_in_expression(expr, params)?;
648            }
649            for agg_expr in &mut agg.aggregates {
650                if let Some(expr) = &mut agg_expr.expression {
651                    substitute_in_expression(expr, params)?;
652                }
653            }
654            substitute_in_operator(&mut agg.input, params)?;
655        }
656        LogicalOperator::Sort(sort) => {
657            for key in &mut sort.keys {
658                substitute_in_expression(&mut key.expression, params)?;
659            }
660            substitute_in_operator(&mut sort.input, params)?;
661        }
662        LogicalOperator::Limit(limit) => {
663            substitute_in_operator(&mut limit.input, params)?;
664        }
665        LogicalOperator::Skip(skip) => {
666            substitute_in_operator(&mut skip.input, params)?;
667        }
668        LogicalOperator::Distinct(distinct) => {
669            substitute_in_operator(&mut distinct.input, params)?;
670        }
671        LogicalOperator::CreateNode(create) => {
672            for (_, expr) in &mut create.properties {
673                substitute_in_expression(expr, params)?;
674            }
675            if let Some(input) = &mut create.input {
676                substitute_in_operator(input, params)?;
677            }
678        }
679        LogicalOperator::CreateEdge(create) => {
680            for (_, expr) in &mut create.properties {
681                substitute_in_expression(expr, params)?;
682            }
683            substitute_in_operator(&mut create.input, params)?;
684        }
685        LogicalOperator::DeleteNode(delete) => {
686            substitute_in_operator(&mut delete.input, params)?;
687        }
688        LogicalOperator::DeleteEdge(delete) => {
689            substitute_in_operator(&mut delete.input, params)?;
690        }
691        LogicalOperator::SetProperty(set) => {
692            for (_, expr) in &mut set.properties {
693                substitute_in_expression(expr, params)?;
694            }
695            substitute_in_operator(&mut set.input, params)?;
696        }
697        LogicalOperator::Union(union) => {
698            for input in &mut union.inputs {
699                substitute_in_operator(input, params)?;
700            }
701        }
702        LogicalOperator::AntiJoin(anti) => {
703            substitute_in_operator(&mut anti.left, params)?;
704            substitute_in_operator(&mut anti.right, params)?;
705        }
706        LogicalOperator::Bind(bind) => {
707            substitute_in_expression(&mut bind.expression, params)?;
708            substitute_in_operator(&mut bind.input, params)?;
709        }
710        LogicalOperator::TripleScan(scan) => {
711            if let Some(input) = &mut scan.input {
712                substitute_in_operator(input, params)?;
713            }
714        }
715        LogicalOperator::Unwind(unwind) => {
716            substitute_in_expression(&mut unwind.expression, params)?;
717            substitute_in_operator(&mut unwind.input, params)?;
718        }
719        LogicalOperator::MapCollect(mc) => {
720            substitute_in_operator(&mut mc.input, params)?;
721        }
722        LogicalOperator::Merge(merge) => {
723            for (_, expr) in &mut merge.match_properties {
724                substitute_in_expression(expr, params)?;
725            }
726            for (_, expr) in &mut merge.on_create {
727                substitute_in_expression(expr, params)?;
728            }
729            for (_, expr) in &mut merge.on_match {
730                substitute_in_expression(expr, params)?;
731            }
732            substitute_in_operator(&mut merge.input, params)?;
733        }
734        LogicalOperator::MergeRelationship(merge_rel) => {
735            for (_, expr) in &mut merge_rel.match_properties {
736                substitute_in_expression(expr, params)?;
737            }
738            for (_, expr) in &mut merge_rel.on_create {
739                substitute_in_expression(expr, params)?;
740            }
741            for (_, expr) in &mut merge_rel.on_match {
742                substitute_in_expression(expr, params)?;
743            }
744            substitute_in_operator(&mut merge_rel.input, params)?;
745        }
746        LogicalOperator::AddLabel(add_label) => {
747            substitute_in_operator(&mut add_label.input, params)?;
748        }
749        LogicalOperator::RemoveLabel(remove_label) => {
750            substitute_in_operator(&mut remove_label.input, params)?;
751        }
752        LogicalOperator::ShortestPath(sp) => {
753            substitute_in_operator(&mut sp.input, params)?;
754        }
755        // SPARQL Update operators
756        LogicalOperator::InsertTriple(insert) => {
757            if let Some(ref mut input) = insert.input {
758                substitute_in_operator(input, params)?;
759            }
760        }
761        LogicalOperator::DeleteTriple(delete) => {
762            if let Some(ref mut input) = delete.input {
763                substitute_in_operator(input, params)?;
764            }
765        }
766        LogicalOperator::Modify(modify) => {
767            substitute_in_operator(&mut modify.where_clause, params)?;
768        }
769        LogicalOperator::ClearGraph(_)
770        | LogicalOperator::CreateGraph(_)
771        | LogicalOperator::DropGraph(_)
772        | LogicalOperator::LoadGraph(_)
773        | LogicalOperator::CopyGraph(_)
774        | LogicalOperator::MoveGraph(_)
775        | LogicalOperator::AddGraph(_) => {}
776        LogicalOperator::HorizontalAggregate(op) => {
777            substitute_in_operator(&mut op.input, params)?;
778        }
779        LogicalOperator::Empty => {}
780        LogicalOperator::VectorScan(scan) => {
781            substitute_in_expression(&mut scan.query_vector, params)?;
782            if let Some(ref mut input) = scan.input {
783                substitute_in_operator(input, params)?;
784            }
785        }
786        LogicalOperator::VectorJoin(join) => {
787            substitute_in_expression(&mut join.query_vector, params)?;
788            substitute_in_operator(&mut join.input, params)?;
789        }
790        LogicalOperator::Except(except) => {
791            substitute_in_operator(&mut except.left, params)?;
792            substitute_in_operator(&mut except.right, params)?;
793        }
794        LogicalOperator::Intersect(intersect) => {
795            substitute_in_operator(&mut intersect.left, params)?;
796            substitute_in_operator(&mut intersect.right, params)?;
797        }
798        LogicalOperator::Otherwise(otherwise) => {
799            substitute_in_operator(&mut otherwise.left, params)?;
800            substitute_in_operator(&mut otherwise.right, params)?;
801        }
802        LogicalOperator::Apply(apply) => {
803            substitute_in_operator(&mut apply.input, params)?;
804            substitute_in_operator(&mut apply.subplan, params)?;
805        }
806        // ParameterScan has no expressions to substitute
807        LogicalOperator::ParameterScan(_) => {}
808        LogicalOperator::MultiWayJoin(mwj) => {
809            for input in &mut mwj.inputs {
810                substitute_in_operator(input, params)?;
811            }
812            for cond in &mut mwj.conditions {
813                substitute_in_expression(&mut cond.left, params)?;
814                substitute_in_expression(&mut cond.right, params)?;
815            }
816        }
817        // DDL operators have no expressions to substitute
818        LogicalOperator::CreatePropertyGraph(_) => {}
819        // Procedure calls: arguments could contain parameters but we handle at execution time
820        LogicalOperator::CallProcedure(_) => {}
821    }
822    Ok(())
823}
824
825/// Substitutes parameters in an expression with their values.
826fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
827    use crate::query::plan::LogicalExpression;
828
829    match expr {
830        LogicalExpression::Parameter(name) => {
831            if let Some(value) = params.get(name) {
832                *expr = LogicalExpression::Literal(value.clone());
833            } else {
834                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
835            }
836        }
837        LogicalExpression::Binary { left, right, .. } => {
838            substitute_in_expression(left, params)?;
839            substitute_in_expression(right, params)?;
840        }
841        LogicalExpression::Unary { operand, .. } => {
842            substitute_in_expression(operand, params)?;
843        }
844        LogicalExpression::FunctionCall { args, .. } => {
845            for arg in args {
846                substitute_in_expression(arg, params)?;
847            }
848        }
849        LogicalExpression::List(items) => {
850            for item in items {
851                substitute_in_expression(item, params)?;
852            }
853        }
854        LogicalExpression::Map(pairs) => {
855            for (_, value) in pairs {
856                substitute_in_expression(value, params)?;
857            }
858        }
859        LogicalExpression::IndexAccess { base, index } => {
860            substitute_in_expression(base, params)?;
861            substitute_in_expression(index, params)?;
862        }
863        LogicalExpression::SliceAccess { base, start, end } => {
864            substitute_in_expression(base, params)?;
865            if let Some(s) = start {
866                substitute_in_expression(s, params)?;
867            }
868            if let Some(e) = end {
869                substitute_in_expression(e, params)?;
870            }
871        }
872        LogicalExpression::Case {
873            operand,
874            when_clauses,
875            else_clause,
876        } => {
877            if let Some(op) = operand {
878                substitute_in_expression(op, params)?;
879            }
880            for (cond, result) in when_clauses {
881                substitute_in_expression(cond, params)?;
882                substitute_in_expression(result, params)?;
883            }
884            if let Some(el) = else_clause {
885                substitute_in_expression(el, params)?;
886            }
887        }
888        LogicalExpression::Property { .. }
889        | LogicalExpression::Variable(_)
890        | LogicalExpression::Literal(_)
891        | LogicalExpression::Labels(_)
892        | LogicalExpression::Type(_)
893        | LogicalExpression::Id(_) => {}
894        LogicalExpression::ListComprehension {
895            list_expr,
896            filter_expr,
897            map_expr,
898            ..
899        } => {
900            substitute_in_expression(list_expr, params)?;
901            if let Some(filter) = filter_expr {
902                substitute_in_expression(filter, params)?;
903            }
904            substitute_in_expression(map_expr, params)?;
905        }
906        LogicalExpression::ListPredicate {
907            list_expr,
908            predicate,
909            ..
910        } => {
911            substitute_in_expression(list_expr, params)?;
912            substitute_in_expression(predicate, params)?;
913        }
914        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
915            // Subqueries would need recursive parameter substitution
916        }
917        LogicalExpression::PatternComprehension { projection, .. } => {
918            substitute_in_expression(projection, params)?;
919        }
920        LogicalExpression::MapProjection { entries, .. } => {
921            for entry in entries {
922                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
923                    substitute_in_expression(expr, params)?;
924                }
925            }
926        }
927        LogicalExpression::Reduce {
928            initial,
929            list,
930            expression,
931            ..
932        } => {
933            substitute_in_expression(initial, params)?;
934            substitute_in_expression(list, params)?;
935            substitute_in_expression(expression, params)?;
936        }
937    }
938    Ok(())
939}
940
941#[cfg(test)]
942mod tests {
943    use super::*;
944
945    #[test]
946    fn test_query_language_is_lpg() {
947        #[cfg(feature = "gql")]
948        assert!(QueryLanguage::Gql.is_lpg());
949        #[cfg(feature = "cypher")]
950        assert!(QueryLanguage::Cypher.is_lpg());
951        #[cfg(feature = "sparql")]
952        assert!(!QueryLanguage::Sparql.is_lpg());
953    }
954
955    #[test]
956    fn test_processor_creation() {
957        let store = Arc::new(LpgStore::new().unwrap());
958        let processor = QueryProcessor::for_lpg(store);
959        assert!(processor.lpg_store().node_count() == 0);
960    }
961
962    #[cfg(feature = "gql")]
963    #[test]
964    fn test_process_simple_gql() {
965        let store = Arc::new(LpgStore::new().unwrap());
966        store.create_node(&["Person"]);
967        store.create_node(&["Person"]);
968
969        let processor = QueryProcessor::for_lpg(store);
970        let result = processor
971            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
972            .unwrap();
973
974        assert_eq!(result.row_count(), 2);
975        assert_eq!(result.columns[0], "n");
976    }
977
978    #[cfg(feature = "cypher")]
979    #[test]
980    fn test_process_simple_cypher() {
981        let store = Arc::new(LpgStore::new().unwrap());
982        store.create_node(&["Person"]);
983
984        let processor = QueryProcessor::for_lpg(store);
985        let result = processor
986            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
987            .unwrap();
988
989        assert_eq!(result.row_count(), 1);
990    }
991
992    #[cfg(feature = "gql")]
993    #[test]
994    fn test_process_with_params() {
995        let store = Arc::new(LpgStore::new().unwrap());
996        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
997        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
998        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
999
1000        let processor = QueryProcessor::for_lpg(store);
1001
1002        // Query with parameter
1003        let mut params = HashMap::new();
1004        params.insert("min_age".to_string(), Value::Int64(30));
1005
1006        let result = processor
1007            .process(
1008                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1009                QueryLanguage::Gql,
1010                Some(&params),
1011            )
1012            .unwrap();
1013
1014        // Should return 2 people (ages 35 and 45)
1015        assert_eq!(result.row_count(), 2);
1016    }
1017
1018    #[cfg(feature = "gql")]
1019    #[test]
1020    fn test_missing_param_error() {
1021        let store = Arc::new(LpgStore::new().unwrap());
1022        store.create_node(&["Person"]);
1023
1024        let processor = QueryProcessor::for_lpg(store);
1025
1026        // Query with parameter but empty params map (missing the required param)
1027        let params: HashMap<String, Value> = HashMap::new();
1028        let result = processor.process(
1029            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1030            QueryLanguage::Gql,
1031            Some(&params),
1032        );
1033
1034        // Should fail with missing parameter error
1035        assert!(result.is_err());
1036        let err = result.unwrap_err();
1037        assert!(
1038            err.to_string().contains("Missing parameter"),
1039            "Expected 'Missing parameter' error, got: {}",
1040            err
1041        );
1042    }
1043
1044    #[cfg(feature = "gql")]
1045    #[test]
1046    fn test_params_in_filter_with_property() {
1047        // Tests parameter substitution in WHERE clause with property comparison
1048        let store = Arc::new(LpgStore::new().unwrap());
1049        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1050        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1051
1052        let processor = QueryProcessor::for_lpg(store);
1053
1054        let mut params = HashMap::new();
1055        params.insert("threshold".to_string(), Value::Int64(15));
1056
1057        let result = processor
1058            .process(
1059                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1060                QueryLanguage::Gql,
1061                Some(&params),
1062            )
1063            .unwrap();
1064
1065        // Only value=20 matches > 15
1066        assert_eq!(result.row_count(), 1);
1067        let row = &result.rows[0];
1068        assert_eq!(row[0], Value::Int64(20));
1069    }
1070
1071    #[cfg(feature = "gql")]
1072    #[test]
1073    fn test_params_in_multiple_where_conditions() {
1074        // Tests multiple parameters in WHERE clause with AND
1075        let store = Arc::new(LpgStore::new().unwrap());
1076        store.create_node_with_props(
1077            &["Person"],
1078            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1079        );
1080        store.create_node_with_props(
1081            &["Person"],
1082            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1083        );
1084        store.create_node_with_props(
1085            &["Person"],
1086            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1087        );
1088
1089        let processor = QueryProcessor::for_lpg(store);
1090
1091        let mut params = HashMap::new();
1092        params.insert("min_age".to_string(), Value::Int64(30));
1093        params.insert("min_score".to_string(), Value::Int64(75));
1094
1095        let result = processor
1096            .process(
1097                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1098                QueryLanguage::Gql,
1099                Some(&params),
1100            )
1101            .unwrap();
1102
1103        // Only the person with age=35, score=90 matches both conditions
1104        assert_eq!(result.row_count(), 1);
1105    }
1106
1107    #[cfg(feature = "gql")]
1108    #[test]
1109    fn test_params_with_in_list() {
1110        // Tests parameter as a value checked against IN list
1111        let store = Arc::new(LpgStore::new().unwrap());
1112        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1113        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1114        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1115
1116        let processor = QueryProcessor::for_lpg(store);
1117
1118        // Check if a parameter value matches any of the statuses
1119        let mut params = HashMap::new();
1120        params.insert("target".to_string(), Value::String("active".into()));
1121
1122        let result = processor
1123            .process(
1124                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1125                QueryLanguage::Gql,
1126                Some(&params),
1127            )
1128            .unwrap();
1129
1130        assert_eq!(result.row_count(), 1);
1131    }
1132
1133    #[cfg(feature = "gql")]
1134    #[test]
1135    fn test_params_same_type_comparison() {
1136        // Tests that same-type parameter comparisons work correctly
1137        let store = Arc::new(LpgStore::new().unwrap());
1138        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1139        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1140
1141        let processor = QueryProcessor::for_lpg(store);
1142
1143        // Compare int property with int parameter
1144        let mut params = HashMap::new();
1145        params.insert("threshold".to_string(), Value::Int64(75));
1146
1147        let result = processor
1148            .process(
1149                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1150                QueryLanguage::Gql,
1151                Some(&params),
1152            )
1153            .unwrap();
1154
1155        // Only value=100 matches > 75
1156        assert_eq!(result.row_count(), 1);
1157    }
1158
1159    #[cfg(feature = "gql")]
1160    #[test]
1161    fn test_process_empty_result_has_columns() {
1162        // Tests that empty results still have correct column names
1163        let store = Arc::new(LpgStore::new().unwrap());
1164        // Don't create any nodes
1165
1166        let processor = QueryProcessor::for_lpg(store);
1167        let result = processor
1168            .process(
1169                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1170                QueryLanguage::Gql,
1171                None,
1172            )
1173            .unwrap();
1174
1175        assert_eq!(result.row_count(), 0);
1176        assert_eq!(result.columns.len(), 2);
1177        assert_eq!(result.columns[0], "name");
1178        assert_eq!(result.columns[1], "age");
1179    }
1180
1181    #[cfg(feature = "gql")]
1182    #[test]
1183    fn test_params_string_equality() {
1184        // Tests string parameter equality comparison
1185        let store = Arc::new(LpgStore::new().unwrap());
1186        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1187        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1188        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1189
1190        let processor = QueryProcessor::for_lpg(store);
1191
1192        let mut params = HashMap::new();
1193        params.insert("target".to_string(), Value::String("beta".into()));
1194
1195        let result = processor
1196            .process(
1197                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1198                QueryLanguage::Gql,
1199                Some(&params),
1200            )
1201            .unwrap();
1202
1203        assert_eq!(result.row_count(), 1);
1204        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1205    }
1206}