Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    transaction_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    transaction_context: Option<(EpochId, TransactionId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            transaction_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            transaction_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_transaction(
136        store: Arc<LpgStore>,
137        transaction_manager: Arc<TransactionManager>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&store);
140        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141        Self {
142            lpg_store: store,
143            graph_store,
144            transaction_manager,
145            catalog: Arc::new(Catalog::new()),
146            optimizer,
147            transaction_context: None,
148            #[cfg(feature = "rdf")]
149            rdf_store: None,
150        }
151    }
152
153    /// Creates a query processor backed by any GraphStoreMut implementation.
154    #[must_use]
155    pub fn for_graph_store_with_transaction(
156        store: Arc<dyn GraphStoreMut>,
157        transaction_manager: Arc<TransactionManager>,
158    ) -> Self {
159        let optimizer = Optimizer::from_graph_store(&*store);
160        Self {
161            lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy, not used
162            graph_store: store,
163            transaction_manager,
164            catalog: Arc::new(Catalog::new()),
165            optimizer,
166            transaction_context: None,
167            #[cfg(feature = "rdf")]
168            rdf_store: None,
169        }
170    }
171
172    /// Creates a new query processor with both LPG and RDF stores.
173    #[cfg(feature = "rdf")]
174    #[must_use]
175    pub fn with_rdf(
176        lpg_store: Arc<LpgStore>,
177        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
178    ) -> Self {
179        let optimizer = Optimizer::from_store(&lpg_store);
180        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
181        Self {
182            lpg_store,
183            graph_store,
184            transaction_manager: Arc::new(TransactionManager::new()),
185            catalog: Arc::new(Catalog::new()),
186            optimizer,
187            transaction_context: None,
188            rdf_store: Some(rdf_store),
189        }
190    }
191
192    /// Sets the transaction context for MVCC visibility.
193    ///
194    /// This should be called when the processor is used within a transaction.
195    #[must_use]
196    pub fn with_transaction_context(
197        mut self,
198        viewing_epoch: EpochId,
199        transaction_id: TransactionId,
200    ) -> Self {
201        self.transaction_context = Some((viewing_epoch, transaction_id));
202        self
203    }
204
205    /// Sets a custom catalog.
206    #[must_use]
207    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
208        self.catalog = catalog;
209        self
210    }
211
212    /// Sets a custom optimizer.
213    #[must_use]
214    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
215        self.optimizer = optimizer;
216        self
217    }
218
219    /// Processes a query string and returns results.
220    ///
221    /// Pipeline:
222    /// 1. Parse (language-specific parser → AST)
223    /// 2. Translate (AST → LogicalPlan)
224    /// 3. Bind (semantic validation)
225    /// 4. Optimize (filter pushdown, join reorder, etc.)
226    /// 5. Plan (logical → physical operators)
227    /// 6. Execute (run operators, collect results)
228    ///
229    /// # Arguments
230    ///
231    /// * `query` - The query string
232    /// * `language` - Which query language to use
233    /// * `params` - Optional query parameters for prepared statements
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if any stage of the pipeline fails.
238    pub fn process(
239        &self,
240        query: &str,
241        language: QueryLanguage,
242        params: Option<&QueryParams>,
243    ) -> Result<QueryResult> {
244        if language.is_lpg() {
245            self.process_lpg(query, language, params)
246        } else {
247            #[cfg(feature = "rdf")]
248            {
249                self.process_rdf(query, language, params)
250            }
251            #[cfg(not(feature = "rdf"))]
252            {
253                Err(Error::Internal(
254                    "RDF support not enabled. Compile with --features rdf".to_string(),
255                ))
256            }
257        }
258    }
259
260    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
261    fn process_lpg(
262        &self,
263        query: &str,
264        language: QueryLanguage,
265        params: Option<&QueryParams>,
266    ) -> Result<QueryResult> {
267        #[cfg(not(target_arch = "wasm32"))]
268        let start_time = std::time::Instant::now();
269
270        // 1. Parse and translate to logical plan
271        let mut logical_plan = self.translate_lpg(query, language)?;
272
273        // 2. Substitute parameters if provided
274        if let Some(params) = params {
275            substitute_params(&mut logical_plan, params)?;
276        }
277
278        // 3. Semantic validation
279        let mut binder = Binder::new();
280        let _binding_context = binder.bind(&logical_plan)?;
281
282        // 4. Optimize the plan
283        let optimized_plan = self.optimizer.optimize(logical_plan)?;
284
285        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
286        if optimized_plan.explain {
287            let mut plan = optimized_plan;
288            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
289            return Ok(explain_result(&plan));
290        }
291
292        // 5. Convert to physical plan with transaction context
293        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
294            Planner::with_context(
295                Arc::clone(&self.graph_store),
296                Arc::clone(&self.transaction_manager),
297                Some(transaction_id),
298                epoch,
299            )
300        } else {
301            Planner::with_context(
302                Arc::clone(&self.graph_store),
303                Arc::clone(&self.transaction_manager),
304                None,
305                self.transaction_manager.current_epoch(),
306            )
307        };
308        let mut physical_plan = planner.plan(&optimized_plan)?;
309
310        // 6. Execute and collect results
311        let executor = Executor::with_columns(physical_plan.columns.clone());
312        let mut result = executor.execute(physical_plan.operator.as_mut())?;
313
314        // Add execution metrics
315        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
316        #[cfg(not(target_arch = "wasm32"))]
317        {
318            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
319            result.execution_time_ms = Some(elapsed_ms);
320        }
321        result.rows_scanned = Some(rows_scanned);
322
323        Ok(result)
324    }
325
326    /// Translates an LPG query to a logical plan.
327    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
328        match language {
329            #[cfg(feature = "gql")]
330            QueryLanguage::Gql => {
331                use crate::query::translators::gql;
332                gql::translate(query)
333            }
334            #[cfg(feature = "cypher")]
335            QueryLanguage::Cypher => {
336                use crate::query::translators::cypher;
337                cypher::translate(query)
338            }
339            #[cfg(feature = "gremlin")]
340            QueryLanguage::Gremlin => {
341                use crate::query::translators::gremlin;
342                gremlin::translate(query)
343            }
344            #[cfg(feature = "graphql")]
345            QueryLanguage::GraphQL => {
346                use crate::query::translators::graphql;
347                graphql::translate(query)
348            }
349            #[cfg(feature = "sql-pgq")]
350            QueryLanguage::SqlPgq => {
351                use crate::query::translators::sql_pgq;
352                sql_pgq::translate(query)
353            }
354            #[allow(unreachable_patterns)]
355            _ => Err(Error::Internal(format!(
356                "Language {:?} is not an LPG language",
357                language
358            ))),
359        }
360    }
361
362    /// Processes an RDF query (SPARQL, GraphQL-RDF).
363    #[cfg(feature = "rdf")]
364    fn process_rdf(
365        &self,
366        query: &str,
367        language: QueryLanguage,
368        params: Option<&QueryParams>,
369    ) -> Result<QueryResult> {
370        use crate::query::planner::rdf::RdfPlanner;
371
372        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
373            Error::Internal("RDF store not configured for this processor".to_string())
374        })?;
375
376        // 1. Parse and translate to logical plan
377        let mut logical_plan = self.translate_rdf(query, language)?;
378
379        // 2. Substitute parameters if provided
380        if let Some(params) = params {
381            substitute_params(&mut logical_plan, params)?;
382        }
383
384        // 3. Semantic validation
385        let mut binder = Binder::new();
386        let _binding_context = binder.bind(&logical_plan)?;
387
388        // 3. Optimize the plan
389        let optimized_plan = self.optimizer.optimize(logical_plan)?;
390
391        // 3a. EXPLAIN: return the optimized plan tree without executing
392        if optimized_plan.explain {
393            return Ok(explain_result(&optimized_plan));
394        }
395
396        // 4. Convert to physical plan (using RDF planner)
397        let planner = RdfPlanner::new(Arc::clone(rdf_store));
398        let mut physical_plan = planner.plan(&optimized_plan)?;
399
400        // 5. Execute and collect results
401        let executor = Executor::with_columns(physical_plan.columns.clone());
402        executor.execute(physical_plan.operator.as_mut())
403    }
404
405    /// Translates an RDF query to a logical plan.
406    #[cfg(feature = "rdf")]
407    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
408        match language {
409            #[cfg(feature = "sparql")]
410            QueryLanguage::Sparql => {
411                use crate::query::translators::sparql;
412                sparql::translate(query)
413            }
414            #[cfg(all(feature = "graphql", feature = "rdf"))]
415            QueryLanguage::GraphQLRdf => {
416                use crate::query::translators::graphql_rdf;
417                // Default namespace for GraphQL-RDF queries
418                graphql_rdf::translate(query, "http://example.org/")
419            }
420            _ => Err(Error::Internal(format!(
421                "Language {:?} is not an RDF language",
422                language
423            ))),
424        }
425    }
426
427    /// Returns a reference to the LPG store.
428    #[must_use]
429    pub fn lpg_store(&self) -> &Arc<LpgStore> {
430        &self.lpg_store
431    }
432
433    /// Returns a reference to the catalog.
434    #[must_use]
435    pub fn catalog(&self) -> &Arc<Catalog> {
436        &self.catalog
437    }
438
439    /// Returns a reference to the optimizer.
440    #[must_use]
441    pub fn optimizer(&self) -> &Optimizer {
442        &self.optimizer
443    }
444
445    /// Returns a reference to the RDF store (if configured).
446    #[cfg(feature = "rdf")]
447    #[must_use]
448    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
449        self.rdf_store.as_ref()
450    }
451}
452
453impl QueryProcessor {
454    /// Returns a reference to the transaction manager.
455    #[must_use]
456    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
457        &self.transaction_manager
458    }
459}
460
461/// Annotates filter operators in the plan with pushdown hints.
462///
463/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
464/// whether a property index exists for equality predicates.
465pub(crate) fn annotate_pushdown_hints(
466    op: &mut LogicalOperator,
467    store: &dyn grafeo_core::graph::GraphStore,
468) {
469    use crate::query::plan::*;
470
471    match op {
472        LogicalOperator::Filter(filter) => {
473            // Recurse into children first
474            annotate_pushdown_hints(&mut filter.input, store);
475
476            // Annotate this filter if it sits on top of a NodeScan
477            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
478                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
479            }
480        }
481        LogicalOperator::NodeScan(op) => {
482            if let Some(input) = &mut op.input {
483                annotate_pushdown_hints(input, store);
484            }
485        }
486        LogicalOperator::EdgeScan(op) => {
487            if let Some(input) = &mut op.input {
488                annotate_pushdown_hints(input, store);
489            }
490        }
491        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
492        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
493        LogicalOperator::Join(op) => {
494            annotate_pushdown_hints(&mut op.left, store);
495            annotate_pushdown_hints(&mut op.right, store);
496        }
497        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
498        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
499        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
500        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
501        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
502        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
503        LogicalOperator::Union(op) => {
504            for input in &mut op.inputs {
505                annotate_pushdown_hints(input, store);
506            }
507        }
508        LogicalOperator::Apply(op) => {
509            annotate_pushdown_hints(&mut op.input, store);
510            annotate_pushdown_hints(&mut op.subplan, store);
511        }
512        LogicalOperator::Otherwise(op) => {
513            annotate_pushdown_hints(&mut op.left, store);
514            annotate_pushdown_hints(&mut op.right, store);
515        }
516        _ => {}
517    }
518}
519
520/// Infers the pushdown strategy for a filter predicate over a node scan.
521fn infer_pushdown(
522    predicate: &LogicalExpression,
523    scan: &crate::query::plan::NodeScanOp,
524    store: &dyn grafeo_core::graph::GraphStore,
525) -> Option<crate::query::plan::PushdownHint> {
526    use crate::query::plan::*;
527
528    match predicate {
529        // Equality: n.prop = value
530        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
531            if let Some(prop) = extract_property_name(left, &scan.variable)
532                .or_else(|| extract_property_name(right, &scan.variable))
533            {
534                if store.has_property_index(&prop) {
535                    return Some(PushdownHint::IndexLookup { property: prop });
536                }
537                if scan.label.is_some() {
538                    return Some(PushdownHint::LabelFirst);
539                }
540            }
541            None
542        }
543        // Range: n.prop > value, n.prop < value, etc.
544        LogicalExpression::Binary {
545            left,
546            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
547            right,
548        } => {
549            if let Some(prop) = extract_property_name(left, &scan.variable)
550                .or_else(|| extract_property_name(right, &scan.variable))
551            {
552                if store.has_property_index(&prop) {
553                    return Some(PushdownHint::RangeScan { property: prop });
554                }
555                if scan.label.is_some() {
556                    return Some(PushdownHint::LabelFirst);
557                }
558            }
559            None
560        }
561        // AND: check the left side (first conjunct) for pushdown
562        LogicalExpression::Binary {
563            left,
564            op: BinaryOp::And,
565            ..
566        } => infer_pushdown(left, scan, store),
567        _ => {
568            // Any other predicate on a labeled scan gets label-first
569            if scan.label.is_some() {
570                Some(PushdownHint::LabelFirst)
571            } else {
572                None
573            }
574        }
575    }
576}
577
578/// Extracts the property name if the expression is `Property { variable, property }`
579/// and the variable matches the scan variable.
580fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
581    if let LogicalExpression::Property { variable, property } = expr
582        && variable == scan_var
583    {
584        Some(property.clone())
585    } else {
586        None
587    }
588}
589
590/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
591pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
592    let tree_text = plan.root.explain_tree();
593    QueryResult {
594        columns: vec!["plan".to_string()],
595        column_types: vec![grafeo_common::types::LogicalType::String],
596        rows: vec![vec![Value::String(tree_text.into())]],
597        execution_time_ms: None,
598        rows_scanned: None,
599        status_message: None,
600        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
601    }
602}
603
604/// Substitutes parameters in a logical plan with their values.
605pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
606    substitute_in_operator(&mut plan.root, params)
607}
608
609/// Recursively substitutes parameters in an operator.
610fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
611    use crate::query::plan::*;
612
613    match op {
614        LogicalOperator::Filter(filter) => {
615            substitute_in_expression(&mut filter.predicate, params)?;
616            substitute_in_operator(&mut filter.input, params)?;
617        }
618        LogicalOperator::Return(ret) => {
619            for item in &mut ret.items {
620                substitute_in_expression(&mut item.expression, params)?;
621            }
622            substitute_in_operator(&mut ret.input, params)?;
623        }
624        LogicalOperator::Project(proj) => {
625            for p in &mut proj.projections {
626                substitute_in_expression(&mut p.expression, params)?;
627            }
628            substitute_in_operator(&mut proj.input, params)?;
629        }
630        LogicalOperator::NodeScan(scan) => {
631            if let Some(input) = &mut scan.input {
632                substitute_in_operator(input, params)?;
633            }
634        }
635        LogicalOperator::EdgeScan(scan) => {
636            if let Some(input) = &mut scan.input {
637                substitute_in_operator(input, params)?;
638            }
639        }
640        LogicalOperator::Expand(expand) => {
641            substitute_in_operator(&mut expand.input, params)?;
642        }
643        LogicalOperator::Join(join) => {
644            substitute_in_operator(&mut join.left, params)?;
645            substitute_in_operator(&mut join.right, params)?;
646            for cond in &mut join.conditions {
647                substitute_in_expression(&mut cond.left, params)?;
648                substitute_in_expression(&mut cond.right, params)?;
649            }
650        }
651        LogicalOperator::LeftJoin(join) => {
652            substitute_in_operator(&mut join.left, params)?;
653            substitute_in_operator(&mut join.right, params)?;
654            if let Some(cond) = &mut join.condition {
655                substitute_in_expression(cond, params)?;
656            }
657        }
658        LogicalOperator::Aggregate(agg) => {
659            for expr in &mut agg.group_by {
660                substitute_in_expression(expr, params)?;
661            }
662            for agg_expr in &mut agg.aggregates {
663                if let Some(expr) = &mut agg_expr.expression {
664                    substitute_in_expression(expr, params)?;
665                }
666            }
667            substitute_in_operator(&mut agg.input, params)?;
668        }
669        LogicalOperator::Sort(sort) => {
670            for key in &mut sort.keys {
671                substitute_in_expression(&mut key.expression, params)?;
672            }
673            substitute_in_operator(&mut sort.input, params)?;
674        }
675        LogicalOperator::Limit(limit) => {
676            resolve_count_param(&mut limit.count, params)?;
677            substitute_in_operator(&mut limit.input, params)?;
678        }
679        LogicalOperator::Skip(skip) => {
680            resolve_count_param(&mut skip.count, params)?;
681            substitute_in_operator(&mut skip.input, params)?;
682        }
683        LogicalOperator::Distinct(distinct) => {
684            substitute_in_operator(&mut distinct.input, params)?;
685        }
686        LogicalOperator::CreateNode(create) => {
687            for (_, expr) in &mut create.properties {
688                substitute_in_expression(expr, params)?;
689            }
690            if let Some(input) = &mut create.input {
691                substitute_in_operator(input, params)?;
692            }
693        }
694        LogicalOperator::CreateEdge(create) => {
695            for (_, expr) in &mut create.properties {
696                substitute_in_expression(expr, params)?;
697            }
698            substitute_in_operator(&mut create.input, params)?;
699        }
700        LogicalOperator::DeleteNode(delete) => {
701            substitute_in_operator(&mut delete.input, params)?;
702        }
703        LogicalOperator::DeleteEdge(delete) => {
704            substitute_in_operator(&mut delete.input, params)?;
705        }
706        LogicalOperator::SetProperty(set) => {
707            for (_, expr) in &mut set.properties {
708                substitute_in_expression(expr, params)?;
709            }
710            substitute_in_operator(&mut set.input, params)?;
711        }
712        LogicalOperator::Union(union) => {
713            for input in &mut union.inputs {
714                substitute_in_operator(input, params)?;
715            }
716        }
717        LogicalOperator::AntiJoin(anti) => {
718            substitute_in_operator(&mut anti.left, params)?;
719            substitute_in_operator(&mut anti.right, params)?;
720        }
721        LogicalOperator::Bind(bind) => {
722            substitute_in_expression(&mut bind.expression, params)?;
723            substitute_in_operator(&mut bind.input, params)?;
724        }
725        LogicalOperator::TripleScan(scan) => {
726            if let Some(input) = &mut scan.input {
727                substitute_in_operator(input, params)?;
728            }
729        }
730        LogicalOperator::Unwind(unwind) => {
731            substitute_in_expression(&mut unwind.expression, params)?;
732            substitute_in_operator(&mut unwind.input, params)?;
733        }
734        LogicalOperator::MapCollect(mc) => {
735            substitute_in_operator(&mut mc.input, params)?;
736        }
737        LogicalOperator::Merge(merge) => {
738            for (_, expr) in &mut merge.match_properties {
739                substitute_in_expression(expr, params)?;
740            }
741            for (_, expr) in &mut merge.on_create {
742                substitute_in_expression(expr, params)?;
743            }
744            for (_, expr) in &mut merge.on_match {
745                substitute_in_expression(expr, params)?;
746            }
747            substitute_in_operator(&mut merge.input, params)?;
748        }
749        LogicalOperator::MergeRelationship(merge_rel) => {
750            for (_, expr) in &mut merge_rel.match_properties {
751                substitute_in_expression(expr, params)?;
752            }
753            for (_, expr) in &mut merge_rel.on_create {
754                substitute_in_expression(expr, params)?;
755            }
756            for (_, expr) in &mut merge_rel.on_match {
757                substitute_in_expression(expr, params)?;
758            }
759            substitute_in_operator(&mut merge_rel.input, params)?;
760        }
761        LogicalOperator::AddLabel(add_label) => {
762            substitute_in_operator(&mut add_label.input, params)?;
763        }
764        LogicalOperator::RemoveLabel(remove_label) => {
765            substitute_in_operator(&mut remove_label.input, params)?;
766        }
767        LogicalOperator::ShortestPath(sp) => {
768            substitute_in_operator(&mut sp.input, params)?;
769        }
770        // SPARQL Update operators
771        LogicalOperator::InsertTriple(insert) => {
772            if let Some(ref mut input) = insert.input {
773                substitute_in_operator(input, params)?;
774            }
775        }
776        LogicalOperator::DeleteTriple(delete) => {
777            if let Some(ref mut input) = delete.input {
778                substitute_in_operator(input, params)?;
779            }
780        }
781        LogicalOperator::Modify(modify) => {
782            substitute_in_operator(&mut modify.where_clause, params)?;
783        }
784        LogicalOperator::ClearGraph(_)
785        | LogicalOperator::CreateGraph(_)
786        | LogicalOperator::DropGraph(_)
787        | LogicalOperator::LoadGraph(_)
788        | LogicalOperator::CopyGraph(_)
789        | LogicalOperator::MoveGraph(_)
790        | LogicalOperator::AddGraph(_) => {}
791        LogicalOperator::HorizontalAggregate(op) => {
792            substitute_in_operator(&mut op.input, params)?;
793        }
794        LogicalOperator::Empty => {}
795        LogicalOperator::VectorScan(scan) => {
796            substitute_in_expression(&mut scan.query_vector, params)?;
797            if let Some(ref mut input) = scan.input {
798                substitute_in_operator(input, params)?;
799            }
800        }
801        LogicalOperator::VectorJoin(join) => {
802            substitute_in_expression(&mut join.query_vector, params)?;
803            substitute_in_operator(&mut join.input, params)?;
804        }
805        LogicalOperator::Except(except) => {
806            substitute_in_operator(&mut except.left, params)?;
807            substitute_in_operator(&mut except.right, params)?;
808        }
809        LogicalOperator::Intersect(intersect) => {
810            substitute_in_operator(&mut intersect.left, params)?;
811            substitute_in_operator(&mut intersect.right, params)?;
812        }
813        LogicalOperator::Otherwise(otherwise) => {
814            substitute_in_operator(&mut otherwise.left, params)?;
815            substitute_in_operator(&mut otherwise.right, params)?;
816        }
817        LogicalOperator::Apply(apply) => {
818            substitute_in_operator(&mut apply.input, params)?;
819            substitute_in_operator(&mut apply.subplan, params)?;
820        }
821        // ParameterScan has no expressions to substitute
822        LogicalOperator::ParameterScan(_) => {}
823        LogicalOperator::MultiWayJoin(mwj) => {
824            for input in &mut mwj.inputs {
825                substitute_in_operator(input, params)?;
826            }
827            for cond in &mut mwj.conditions {
828                substitute_in_expression(&mut cond.left, params)?;
829                substitute_in_expression(&mut cond.right, params)?;
830            }
831        }
832        // DDL operators have no expressions to substitute
833        LogicalOperator::CreatePropertyGraph(_) => {}
834        // Procedure calls: arguments could contain parameters but we handle at execution time
835        LogicalOperator::CallProcedure(_) => {}
836        // LoadCsv: file path is a literal, no parameter substitution needed
837        LogicalOperator::LoadCsv(_) => {}
838    }
839    Ok(())
840}
841
842/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
843fn resolve_count_param(
844    count: &mut crate::query::plan::CountExpr,
845    params: &QueryParams,
846) -> Result<()> {
847    use crate::query::plan::CountExpr;
848    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
849
850    if let CountExpr::Parameter(name) = count {
851        let value = params.get(name.as_str()).ok_or_else(|| {
852            Error::Query(QueryError::new(
853                QueryErrorKind::Semantic,
854                format!("Missing parameter for SKIP/LIMIT: ${name}"),
855            ))
856        })?;
857        let n = match value {
858            Value::Int64(i) if *i >= 0 => *i as usize,
859            Value::Int64(i) => {
860                return Err(Error::Query(QueryError::new(
861                    QueryErrorKind::Semantic,
862                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
863                )));
864            }
865            other => {
866                return Err(Error::Query(QueryError::new(
867                    QueryErrorKind::Semantic,
868                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
869                )));
870            }
871        };
872        *count = CountExpr::Literal(n);
873    }
874    Ok(())
875}
876
877/// Substitutes parameters in an expression with their values.
878fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
879    use crate::query::plan::LogicalExpression;
880
881    match expr {
882        LogicalExpression::Parameter(name) => {
883            if let Some(value) = params.get(name) {
884                *expr = LogicalExpression::Literal(value.clone());
885            } else {
886                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
887            }
888        }
889        LogicalExpression::Binary { left, right, .. } => {
890            substitute_in_expression(left, params)?;
891            substitute_in_expression(right, params)?;
892        }
893        LogicalExpression::Unary { operand, .. } => {
894            substitute_in_expression(operand, params)?;
895        }
896        LogicalExpression::FunctionCall { args, .. } => {
897            for arg in args {
898                substitute_in_expression(arg, params)?;
899            }
900        }
901        LogicalExpression::List(items) => {
902            for item in items {
903                substitute_in_expression(item, params)?;
904            }
905        }
906        LogicalExpression::Map(pairs) => {
907            for (_, value) in pairs {
908                substitute_in_expression(value, params)?;
909            }
910        }
911        LogicalExpression::IndexAccess { base, index } => {
912            substitute_in_expression(base, params)?;
913            substitute_in_expression(index, params)?;
914        }
915        LogicalExpression::SliceAccess { base, start, end } => {
916            substitute_in_expression(base, params)?;
917            if let Some(s) = start {
918                substitute_in_expression(s, params)?;
919            }
920            if let Some(e) = end {
921                substitute_in_expression(e, params)?;
922            }
923        }
924        LogicalExpression::Case {
925            operand,
926            when_clauses,
927            else_clause,
928        } => {
929            if let Some(op) = operand {
930                substitute_in_expression(op, params)?;
931            }
932            for (cond, result) in when_clauses {
933                substitute_in_expression(cond, params)?;
934                substitute_in_expression(result, params)?;
935            }
936            if let Some(el) = else_clause {
937                substitute_in_expression(el, params)?;
938            }
939        }
940        LogicalExpression::Property { .. }
941        | LogicalExpression::Variable(_)
942        | LogicalExpression::Literal(_)
943        | LogicalExpression::Labels(_)
944        | LogicalExpression::Type(_)
945        | LogicalExpression::Id(_) => {}
946        LogicalExpression::ListComprehension {
947            list_expr,
948            filter_expr,
949            map_expr,
950            ..
951        } => {
952            substitute_in_expression(list_expr, params)?;
953            if let Some(filter) = filter_expr {
954                substitute_in_expression(filter, params)?;
955            }
956            substitute_in_expression(map_expr, params)?;
957        }
958        LogicalExpression::ListPredicate {
959            list_expr,
960            predicate,
961            ..
962        } => {
963            substitute_in_expression(list_expr, params)?;
964            substitute_in_expression(predicate, params)?;
965        }
966        LogicalExpression::ExistsSubquery(_)
967        | LogicalExpression::CountSubquery(_)
968        | LogicalExpression::ValueSubquery(_) => {
969            // Subqueries would need recursive parameter substitution
970        }
971        LogicalExpression::PatternComprehension { projection, .. } => {
972            substitute_in_expression(projection, params)?;
973        }
974        LogicalExpression::MapProjection { entries, .. } => {
975            for entry in entries {
976                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
977                    substitute_in_expression(expr, params)?;
978                }
979            }
980        }
981        LogicalExpression::Reduce {
982            initial,
983            list,
984            expression,
985            ..
986        } => {
987            substitute_in_expression(initial, params)?;
988            substitute_in_expression(list, params)?;
989            substitute_in_expression(expression, params)?;
990        }
991    }
992    Ok(())
993}
994
995#[cfg(test)]
996mod tests {
997    use super::*;
998
999    #[test]
1000    fn test_query_language_is_lpg() {
1001        #[cfg(feature = "gql")]
1002        assert!(QueryLanguage::Gql.is_lpg());
1003        #[cfg(feature = "cypher")]
1004        assert!(QueryLanguage::Cypher.is_lpg());
1005        #[cfg(feature = "sparql")]
1006        assert!(!QueryLanguage::Sparql.is_lpg());
1007    }
1008
1009    #[test]
1010    fn test_processor_creation() {
1011        let store = Arc::new(LpgStore::new().unwrap());
1012        let processor = QueryProcessor::for_lpg(store);
1013        assert!(processor.lpg_store().node_count() == 0);
1014    }
1015
1016    #[cfg(feature = "gql")]
1017    #[test]
1018    fn test_process_simple_gql() {
1019        let store = Arc::new(LpgStore::new().unwrap());
1020        store.create_node(&["Person"]);
1021        store.create_node(&["Person"]);
1022
1023        let processor = QueryProcessor::for_lpg(store);
1024        let result = processor
1025            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1026            .unwrap();
1027
1028        assert_eq!(result.row_count(), 2);
1029        assert_eq!(result.columns[0], "n");
1030    }
1031
1032    #[cfg(feature = "cypher")]
1033    #[test]
1034    fn test_process_simple_cypher() {
1035        let store = Arc::new(LpgStore::new().unwrap());
1036        store.create_node(&["Person"]);
1037
1038        let processor = QueryProcessor::for_lpg(store);
1039        let result = processor
1040            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1041            .unwrap();
1042
1043        assert_eq!(result.row_count(), 1);
1044    }
1045
1046    #[cfg(feature = "gql")]
1047    #[test]
1048    fn test_process_with_params() {
1049        let store = Arc::new(LpgStore::new().unwrap());
1050        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1051        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1052        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1053
1054        let processor = QueryProcessor::for_lpg(store);
1055
1056        // Query with parameter
1057        let mut params = HashMap::new();
1058        params.insert("min_age".to_string(), Value::Int64(30));
1059
1060        let result = processor
1061            .process(
1062                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1063                QueryLanguage::Gql,
1064                Some(&params),
1065            )
1066            .unwrap();
1067
1068        // Should return 2 people (ages 35 and 45)
1069        assert_eq!(result.row_count(), 2);
1070    }
1071
1072    #[cfg(feature = "gql")]
1073    #[test]
1074    fn test_missing_param_error() {
1075        let store = Arc::new(LpgStore::new().unwrap());
1076        store.create_node(&["Person"]);
1077
1078        let processor = QueryProcessor::for_lpg(store);
1079
1080        // Query with parameter but empty params map (missing the required param)
1081        let params: HashMap<String, Value> = HashMap::new();
1082        let result = processor.process(
1083            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1084            QueryLanguage::Gql,
1085            Some(&params),
1086        );
1087
1088        // Should fail with missing parameter error
1089        assert!(result.is_err());
1090        let err = result.unwrap_err();
1091        assert!(
1092            err.to_string().contains("Missing parameter"),
1093            "Expected 'Missing parameter' error, got: {}",
1094            err
1095        );
1096    }
1097
1098    #[cfg(feature = "gql")]
1099    #[test]
1100    fn test_params_in_filter_with_property() {
1101        // Tests parameter substitution in WHERE clause with property comparison
1102        let store = Arc::new(LpgStore::new().unwrap());
1103        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1104        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1105
1106        let processor = QueryProcessor::for_lpg(store);
1107
1108        let mut params = HashMap::new();
1109        params.insert("threshold".to_string(), Value::Int64(15));
1110
1111        let result = processor
1112            .process(
1113                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1114                QueryLanguage::Gql,
1115                Some(&params),
1116            )
1117            .unwrap();
1118
1119        // Only value=20 matches > 15
1120        assert_eq!(result.row_count(), 1);
1121        let row = &result.rows[0];
1122        assert_eq!(row[0], Value::Int64(20));
1123    }
1124
1125    #[cfg(feature = "gql")]
1126    #[test]
1127    fn test_params_in_multiple_where_conditions() {
1128        // Tests multiple parameters in WHERE clause with AND
1129        let store = Arc::new(LpgStore::new().unwrap());
1130        store.create_node_with_props(
1131            &["Person"],
1132            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1133        );
1134        store.create_node_with_props(
1135            &["Person"],
1136            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1137        );
1138        store.create_node_with_props(
1139            &["Person"],
1140            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1141        );
1142
1143        let processor = QueryProcessor::for_lpg(store);
1144
1145        let mut params = HashMap::new();
1146        params.insert("min_age".to_string(), Value::Int64(30));
1147        params.insert("min_score".to_string(), Value::Int64(75));
1148
1149        let result = processor
1150            .process(
1151                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1152                QueryLanguage::Gql,
1153                Some(&params),
1154            )
1155            .unwrap();
1156
1157        // Only the person with age=35, score=90 matches both conditions
1158        assert_eq!(result.row_count(), 1);
1159    }
1160
1161    #[cfg(feature = "gql")]
1162    #[test]
1163    fn test_params_with_in_list() {
1164        // Tests parameter as a value checked against IN list
1165        let store = Arc::new(LpgStore::new().unwrap());
1166        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1167        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1168        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1169
1170        let processor = QueryProcessor::for_lpg(store);
1171
1172        // Check if a parameter value matches any of the statuses
1173        let mut params = HashMap::new();
1174        params.insert("target".to_string(), Value::String("active".into()));
1175
1176        let result = processor
1177            .process(
1178                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1179                QueryLanguage::Gql,
1180                Some(&params),
1181            )
1182            .unwrap();
1183
1184        assert_eq!(result.row_count(), 1);
1185    }
1186
1187    #[cfg(feature = "gql")]
1188    #[test]
1189    fn test_params_same_type_comparison() {
1190        // Tests that same-type parameter comparisons work correctly
1191        let store = Arc::new(LpgStore::new().unwrap());
1192        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1193        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1194
1195        let processor = QueryProcessor::for_lpg(store);
1196
1197        // Compare int property with int parameter
1198        let mut params = HashMap::new();
1199        params.insert("threshold".to_string(), Value::Int64(75));
1200
1201        let result = processor
1202            .process(
1203                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1204                QueryLanguage::Gql,
1205                Some(&params),
1206            )
1207            .unwrap();
1208
1209        // Only value=100 matches > 75
1210        assert_eq!(result.row_count(), 1);
1211    }
1212
1213    #[cfg(feature = "gql")]
1214    #[test]
1215    fn test_process_empty_result_has_columns() {
1216        // Tests that empty results still have correct column names
1217        let store = Arc::new(LpgStore::new().unwrap());
1218        // Don't create any nodes
1219
1220        let processor = QueryProcessor::for_lpg(store);
1221        let result = processor
1222            .process(
1223                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1224                QueryLanguage::Gql,
1225                None,
1226            )
1227            .unwrap();
1228
1229        assert_eq!(result.row_count(), 0);
1230        assert_eq!(result.columns.len(), 2);
1231        assert_eq!(result.columns[0], "name");
1232        assert_eq!(result.columns[1], "age");
1233    }
1234
1235    #[cfg(feature = "gql")]
1236    #[test]
1237    fn test_params_string_equality() {
1238        // Tests string parameter equality comparison
1239        let store = Arc::new(LpgStore::new().unwrap());
1240        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1241        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1242        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1243
1244        let processor = QueryProcessor::for_lpg(store);
1245
1246        let mut params = HashMap::new();
1247        params.insert("target".to_string(), Value::String("beta".into()));
1248
1249        let result = processor
1250            .process(
1251                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1252                QueryLanguage::Gql,
1253                Some(&params),
1254            )
1255            .unwrap();
1256
1257        assert_eq!(result.row_count(), 1);
1258        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1259    }
1260}