Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::GraphStoreMut;
16use grafeo_core::graph::lpg::LpgStore;
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27/// Supported query languages.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum QueryLanguage {
30    /// GQL (ISO/IEC 39075:2024) - default for LPG
31    #[cfg(feature = "gql")]
32    Gql,
33    /// openCypher 9.0
34    #[cfg(feature = "cypher")]
35    Cypher,
36    /// Apache TinkerPop Gremlin
37    #[cfg(feature = "gremlin")]
38    Gremlin,
39    /// GraphQL for LPG
40    #[cfg(feature = "graphql")]
41    GraphQL,
42    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
43    #[cfg(feature = "sql-pgq")]
44    SqlPgq,
45    /// SPARQL 1.1 for RDF
46    #[cfg(feature = "sparql")]
47    Sparql,
48    /// GraphQL for RDF
49    #[cfg(all(feature = "graphql", feature = "rdf"))]
50    GraphQLRdf,
51}
52
53impl QueryLanguage {
54    /// Returns whether this language targets LPG (vs RDF).
55    #[must_use]
56    pub const fn is_lpg(&self) -> bool {
57        match self {
58            #[cfg(feature = "gql")]
59            Self::Gql => true,
60            #[cfg(feature = "cypher")]
61            Self::Cypher => true,
62            #[cfg(feature = "gremlin")]
63            Self::Gremlin => true,
64            #[cfg(feature = "graphql")]
65            Self::GraphQL => true,
66            #[cfg(feature = "sql-pgq")]
67            Self::SqlPgq => true,
68            #[cfg(feature = "sparql")]
69            Self::Sparql => false,
70            #[cfg(all(feature = "graphql", feature = "rdf"))]
71            Self::GraphQLRdf => false,
72        }
73    }
74}
75
76/// Query parameters for prepared statements.
77pub type QueryParams = HashMap<String, Value>;
78
79/// Processes queries through the full pipeline.
80///
81/// The processor holds references to the stores and provides a unified
82/// interface for executing queries in any supported language.
83///
84/// # Example
85///
86/// ```no_run
87/// # use std::sync::Arc;
88/// # use grafeo_core::graph::lpg::LpgStore;
89/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
90///
91/// # fn main() -> grafeo_common::utils::error::Result<()> {
92/// let store = Arc::new(LpgStore::new().unwrap());
93/// let processor = QueryProcessor::for_lpg(store);
94/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
95/// # Ok(())
96/// # }
97/// ```
98pub struct QueryProcessor {
99    /// LPG store for property graph queries.
100    lpg_store: Arc<LpgStore>,
101    /// Graph store trait object for pluggable storage backends.
102    graph_store: Arc<dyn GraphStoreMut>,
103    /// Transaction manager for MVCC operations.
104    transaction_manager: Arc<TransactionManager>,
105    /// Catalog for schema and index metadata.
106    catalog: Arc<Catalog>,
107    /// Query optimizer.
108    optimizer: Optimizer,
109    /// Current transaction context (if any).
110    transaction_context: Option<(EpochId, TransactionId)>,
111    /// RDF store for triple pattern queries (optional).
112    #[cfg(feature = "rdf")]
113    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
114}
115
116impl QueryProcessor {
117    /// Creates a new query processor for LPG queries.
118    #[must_use]
119    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
120        let optimizer = Optimizer::from_store(&store);
121        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
122        Self {
123            lpg_store: store,
124            graph_store,
125            transaction_manager: Arc::new(TransactionManager::new()),
126            catalog: Arc::new(Catalog::new()),
127            optimizer,
128            transaction_context: None,
129            #[cfg(feature = "rdf")]
130            rdf_store: None,
131        }
132    }
133
134    /// Creates a new query processor with a transaction manager.
135    #[must_use]
136    pub fn for_lpg_with_transaction(
137        store: Arc<LpgStore>,
138        transaction_manager: Arc<TransactionManager>,
139    ) -> Self {
140        let optimizer = Optimizer::from_store(&store);
141        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
142        Self {
143            lpg_store: store,
144            graph_store,
145            transaction_manager,
146            catalog: Arc::new(Catalog::new()),
147            optimizer,
148            transaction_context: None,
149            #[cfg(feature = "rdf")]
150            rdf_store: None,
151        }
152    }
153
154    /// Creates a query processor backed by any `GraphStoreMut` implementation.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the internal arena allocation fails (out of memory).
159    pub fn for_graph_store_with_transaction(
160        store: Arc<dyn GraphStoreMut>,
161        transaction_manager: Arc<TransactionManager>,
162    ) -> Result<Self> {
163        let optimizer = Optimizer::from_graph_store(&*store);
164        Ok(Self {
165            lpg_store: Arc::new(LpgStore::new()?),
166            graph_store: store,
167            transaction_manager,
168            catalog: Arc::new(Catalog::new()),
169            optimizer,
170            transaction_context: None,
171            #[cfg(feature = "rdf")]
172            rdf_store: None,
173        })
174    }
175
176    /// Sets the transaction context for MVCC visibility.
177    ///
178    /// This should be called when the processor is used within a transaction.
179    #[must_use]
180    pub fn with_transaction_context(
181        mut self,
182        viewing_epoch: EpochId,
183        transaction_id: TransactionId,
184    ) -> Self {
185        self.transaction_context = Some((viewing_epoch, transaction_id));
186        self
187    }
188
189    /// Sets a custom catalog.
190    #[must_use]
191    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
192        self.catalog = catalog;
193        self
194    }
195
196    /// Sets a custom optimizer.
197    #[must_use]
198    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
199        self.optimizer = optimizer;
200        self
201    }
202
203    /// Processes a query string and returns results.
204    ///
205    /// Pipeline:
206    /// 1. Parse (language-specific parser → AST)
207    /// 2. Translate (AST → LogicalPlan)
208    /// 3. Bind (semantic validation)
209    /// 4. Optimize (filter pushdown, join reorder, etc.)
210    /// 5. Plan (logical → physical operators)
211    /// 6. Execute (run operators, collect results)
212    ///
213    /// # Arguments
214    ///
215    /// * `query` - The query string
216    /// * `language` - Which query language to use
217    /// * `params` - Optional query parameters for prepared statements
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if any stage of the pipeline fails.
222    pub fn process(
223        &self,
224        query: &str,
225        language: QueryLanguage,
226        params: Option<&QueryParams>,
227    ) -> Result<QueryResult> {
228        if language.is_lpg() {
229            self.process_lpg(query, language, params)
230        } else {
231            #[cfg(feature = "rdf")]
232            {
233                self.process_rdf(query, language, params)
234            }
235            #[cfg(not(feature = "rdf"))]
236            {
237                Err(Error::Internal(
238                    "RDF support not enabled. Compile with --features rdf".to_string(),
239                ))
240            }
241        }
242    }
243
244    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
245    fn process_lpg(
246        &self,
247        query: &str,
248        language: QueryLanguage,
249        params: Option<&QueryParams>,
250    ) -> Result<QueryResult> {
251        #[cfg(not(target_arch = "wasm32"))]
252        let start_time = std::time::Instant::now();
253
254        // 1. Parse and translate to logical plan
255        let mut logical_plan = self.translate_lpg(query, language)?;
256
257        // 2. Substitute parameters if provided
258        if let Some(params) = params {
259            substitute_params(&mut logical_plan, params)?;
260        }
261
262        // 3. Semantic validation
263        let mut binder = Binder::new();
264        let _binding_context = binder.bind(&logical_plan)?;
265
266        // 4. Optimize the plan
267        let optimized_plan = self.optimizer.optimize(logical_plan)?;
268
269        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
270        if optimized_plan.explain {
271            let mut plan = optimized_plan;
272            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
273            return Ok(explain_result(&plan));
274        }
275
276        // 5. Convert to physical plan with transaction context
277        // Read-only fast path: safe when no mutations AND no active transaction
278        // (an active transaction may have prior uncommitted writes from earlier statements)
279        let is_read_only =
280            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
281        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
282            Planner::with_context(
283                Arc::clone(&self.graph_store),
284                Arc::clone(&self.transaction_manager),
285                Some(transaction_id),
286                epoch,
287            )
288        } else {
289            Planner::with_context(
290                Arc::clone(&self.graph_store),
291                Arc::clone(&self.transaction_manager),
292                None,
293                self.transaction_manager.current_epoch(),
294            )
295        }
296        .with_read_only(is_read_only);
297        let mut physical_plan = planner.plan(&optimized_plan)?;
298
299        // 6. Execute and collect results
300        let executor = Executor::with_columns(physical_plan.columns.clone());
301        let mut result = executor.execute(physical_plan.operator.as_mut())?;
302
303        // Add execution metrics
304        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
305        #[cfg(not(target_arch = "wasm32"))]
306        {
307            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
308            result.execution_time_ms = Some(elapsed_ms);
309        }
310        result.rows_scanned = Some(rows_scanned);
311
312        Ok(result)
313    }
314
315    /// Translates an LPG query to a logical plan.
316    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
317        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
318        match language {
319            #[cfg(feature = "gql")]
320            QueryLanguage::Gql => {
321                use crate::query::translators::gql;
322                gql::translate(query)
323            }
324            #[cfg(feature = "cypher")]
325            QueryLanguage::Cypher => {
326                use crate::query::translators::cypher;
327                cypher::translate(query)
328            }
329            #[cfg(feature = "gremlin")]
330            QueryLanguage::Gremlin => {
331                use crate::query::translators::gremlin;
332                gremlin::translate(query)
333            }
334            #[cfg(feature = "graphql")]
335            QueryLanguage::GraphQL => {
336                use crate::query::translators::graphql;
337                graphql::translate(query)
338            }
339            #[cfg(feature = "sql-pgq")]
340            QueryLanguage::SqlPgq => {
341                use crate::query::translators::sql_pgq;
342                sql_pgq::translate(query)
343            }
344            #[allow(unreachable_patterns)]
345            _ => Err(Error::Internal(format!(
346                "Language {:?} is not an LPG language",
347                language
348            ))),
349        }
350    }
351
352    /// Returns a reference to the LPG store.
353    #[must_use]
354    pub fn lpg_store(&self) -> &Arc<LpgStore> {
355        &self.lpg_store
356    }
357
358    /// Returns a reference to the catalog.
359    #[must_use]
360    pub fn catalog(&self) -> &Arc<Catalog> {
361        &self.catalog
362    }
363
364    /// Returns a reference to the optimizer.
365    #[must_use]
366    pub fn optimizer(&self) -> &Optimizer {
367        &self.optimizer
368    }
369}
370
371impl QueryProcessor {
372    /// Returns a reference to the transaction manager.
373    #[must_use]
374    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
375        &self.transaction_manager
376    }
377}
378
379// =========================================================================
380// RDF-specific methods (gated behind `rdf` feature)
381// =========================================================================
382
383#[cfg(feature = "rdf")]
384impl QueryProcessor {
385    /// Creates a new query processor with both LPG and RDF stores.
386    #[must_use]
387    pub fn with_rdf(
388        lpg_store: Arc<LpgStore>,
389        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
390    ) -> Self {
391        let optimizer = Optimizer::from_store(&lpg_store);
392        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
393        Self {
394            lpg_store,
395            graph_store,
396            transaction_manager: Arc::new(TransactionManager::new()),
397            catalog: Arc::new(Catalog::new()),
398            optimizer,
399            transaction_context: None,
400            rdf_store: Some(rdf_store),
401        }
402    }
403
404    /// Returns a reference to the RDF store (if configured).
405    #[must_use]
406    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
407        self.rdf_store.as_ref()
408    }
409
410    /// Processes an RDF query (SPARQL, GraphQL-RDF).
411    fn process_rdf(
412        &self,
413        query: &str,
414        language: QueryLanguage,
415        params: Option<&QueryParams>,
416    ) -> Result<QueryResult> {
417        use crate::query::planner::rdf::RdfPlanner;
418
419        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
420            Error::Internal("RDF store not configured for this processor".to_string())
421        })?;
422
423        // 1. Parse and translate to logical plan
424        let mut logical_plan = self.translate_rdf(query, language)?;
425
426        // 2. Substitute parameters if provided
427        if let Some(params) = params {
428            substitute_params(&mut logical_plan, params)?;
429        }
430
431        // 3. Semantic validation
432        let mut binder = Binder::new();
433        let _binding_context = binder.bind(&logical_plan)?;
434
435        // 3. Optimize the plan
436        let optimized_plan = self.optimizer.optimize(logical_plan)?;
437
438        // 3a. EXPLAIN: return the optimized plan tree without executing
439        if optimized_plan.explain {
440            return Ok(explain_result(&optimized_plan));
441        }
442
443        // 4. Convert to physical plan (using RDF planner)
444        let planner = RdfPlanner::new(Arc::clone(rdf_store));
445        let mut physical_plan = planner.plan(&optimized_plan)?;
446
447        // 5. Execute and collect results
448        let executor = Executor::with_columns(physical_plan.columns.clone());
449        executor.execute(physical_plan.operator.as_mut())
450    }
451
452    /// Translates an RDF query to a logical plan.
453    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
454        match language {
455            #[cfg(feature = "sparql")]
456            QueryLanguage::Sparql => {
457                use crate::query::translators::sparql;
458                sparql::translate(query)
459            }
460            #[cfg(all(feature = "graphql", feature = "rdf"))]
461            QueryLanguage::GraphQLRdf => {
462                use crate::query::translators::graphql_rdf;
463                // Default namespace for GraphQL-RDF queries
464                graphql_rdf::translate(query, "http://example.org/")
465            }
466            _ => Err(Error::Internal(format!(
467                "Language {:?} is not an RDF language",
468                language
469            ))),
470        }
471    }
472}
473
474/// Annotates filter operators in the plan with pushdown hints.
475///
476/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
477/// whether a property index exists for equality predicates.
478pub(crate) fn annotate_pushdown_hints(
479    op: &mut LogicalOperator,
480    store: &dyn grafeo_core::graph::GraphStore,
481) {
482    #[allow(clippy::wildcard_imports)]
483    use crate::query::plan::*;
484
485    match op {
486        LogicalOperator::Filter(filter) => {
487            // Recurse into children first
488            annotate_pushdown_hints(&mut filter.input, store);
489
490            // Annotate this filter if it sits on top of a NodeScan
491            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
492                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
493            }
494        }
495        LogicalOperator::NodeScan(op) => {
496            if let Some(input) = &mut op.input {
497                annotate_pushdown_hints(input, store);
498            }
499        }
500        LogicalOperator::EdgeScan(op) => {
501            if let Some(input) = &mut op.input {
502                annotate_pushdown_hints(input, store);
503            }
504        }
505        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
506        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
507        LogicalOperator::Join(op) => {
508            annotate_pushdown_hints(&mut op.left, store);
509            annotate_pushdown_hints(&mut op.right, store);
510        }
511        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
512        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
513        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
514        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
515        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
516        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
517        LogicalOperator::Union(op) => {
518            for input in &mut op.inputs {
519                annotate_pushdown_hints(input, store);
520            }
521        }
522        LogicalOperator::Apply(op) => {
523            annotate_pushdown_hints(&mut op.input, store);
524            annotate_pushdown_hints(&mut op.subplan, store);
525        }
526        LogicalOperator::Otherwise(op) => {
527            annotate_pushdown_hints(&mut op.left, store);
528            annotate_pushdown_hints(&mut op.right, store);
529        }
530        _ => {}
531    }
532}
533
534/// Infers the pushdown strategy for a filter predicate over a node scan.
535fn infer_pushdown(
536    predicate: &LogicalExpression,
537    scan: &crate::query::plan::NodeScanOp,
538    store: &dyn grafeo_core::graph::GraphStore,
539) -> Option<crate::query::plan::PushdownHint> {
540    #[allow(clippy::wildcard_imports)]
541    use crate::query::plan::*;
542
543    match predicate {
544        // Equality: n.prop = value
545        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
546            if let Some(prop) = extract_property_name(left, &scan.variable)
547                .or_else(|| extract_property_name(right, &scan.variable))
548            {
549                if store.has_property_index(&prop) {
550                    return Some(PushdownHint::IndexLookup { property: prop });
551                }
552                if scan.label.is_some() {
553                    return Some(PushdownHint::LabelFirst);
554                }
555            }
556            None
557        }
558        // Range: n.prop > value, n.prop < value, etc.
559        LogicalExpression::Binary {
560            left,
561            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
562            right,
563        } => {
564            if let Some(prop) = extract_property_name(left, &scan.variable)
565                .or_else(|| extract_property_name(right, &scan.variable))
566            {
567                if store.has_property_index(&prop) {
568                    return Some(PushdownHint::RangeScan { property: prop });
569                }
570                if scan.label.is_some() {
571                    return Some(PushdownHint::LabelFirst);
572                }
573            }
574            None
575        }
576        // AND: check the left side (first conjunct) for pushdown
577        LogicalExpression::Binary {
578            left,
579            op: BinaryOp::And,
580            ..
581        } => infer_pushdown(left, scan, store),
582        _ => {
583            // Any other predicate on a labeled scan gets label-first
584            if scan.label.is_some() {
585                Some(PushdownHint::LabelFirst)
586            } else {
587                None
588            }
589        }
590    }
591}
592
593/// Extracts the property name if the expression is `Property { variable, property }`
594/// and the variable matches the scan variable.
595fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
596    if let LogicalExpression::Property { variable, property } = expr
597        && variable == scan_var
598    {
599        Some(property.clone())
600    } else {
601        None
602    }
603}
604
605/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
606pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
607    let tree_text = plan.root.explain_tree();
608    QueryResult {
609        columns: vec!["plan".to_string()],
610        column_types: vec![grafeo_common::types::LogicalType::String],
611        rows: vec![vec![Value::String(tree_text.into())]],
612        execution_time_ms: None,
613        rows_scanned: None,
614        status_message: None,
615        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
616    }
617}
618
619/// Substitutes parameters in a logical plan with their values.
620pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
621    substitute_in_operator(&mut plan.root, params)
622}
623
624/// Recursively substitutes parameters in an operator.
625fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
626    #[allow(clippy::wildcard_imports)]
627    use crate::query::plan::*;
628
629    match op {
630        LogicalOperator::Filter(filter) => {
631            substitute_in_expression(&mut filter.predicate, params)?;
632            substitute_in_operator(&mut filter.input, params)?;
633        }
634        LogicalOperator::Return(ret) => {
635            for item in &mut ret.items {
636                substitute_in_expression(&mut item.expression, params)?;
637            }
638            substitute_in_operator(&mut ret.input, params)?;
639        }
640        LogicalOperator::Project(proj) => {
641            for p in &mut proj.projections {
642                substitute_in_expression(&mut p.expression, params)?;
643            }
644            substitute_in_operator(&mut proj.input, params)?;
645        }
646        LogicalOperator::NodeScan(scan) => {
647            if let Some(input) = &mut scan.input {
648                substitute_in_operator(input, params)?;
649            }
650        }
651        LogicalOperator::EdgeScan(scan) => {
652            if let Some(input) = &mut scan.input {
653                substitute_in_operator(input, params)?;
654            }
655        }
656        LogicalOperator::Expand(expand) => {
657            substitute_in_operator(&mut expand.input, params)?;
658        }
659        LogicalOperator::Join(join) => {
660            substitute_in_operator(&mut join.left, params)?;
661            substitute_in_operator(&mut join.right, params)?;
662            for cond in &mut join.conditions {
663                substitute_in_expression(&mut cond.left, params)?;
664                substitute_in_expression(&mut cond.right, params)?;
665            }
666        }
667        LogicalOperator::LeftJoin(join) => {
668            substitute_in_operator(&mut join.left, params)?;
669            substitute_in_operator(&mut join.right, params)?;
670            if let Some(cond) = &mut join.condition {
671                substitute_in_expression(cond, params)?;
672            }
673        }
674        LogicalOperator::Aggregate(agg) => {
675            for expr in &mut agg.group_by {
676                substitute_in_expression(expr, params)?;
677            }
678            for agg_expr in &mut agg.aggregates {
679                if let Some(expr) = &mut agg_expr.expression {
680                    substitute_in_expression(expr, params)?;
681                }
682            }
683            substitute_in_operator(&mut agg.input, params)?;
684        }
685        LogicalOperator::Sort(sort) => {
686            for key in &mut sort.keys {
687                substitute_in_expression(&mut key.expression, params)?;
688            }
689            substitute_in_operator(&mut sort.input, params)?;
690        }
691        LogicalOperator::Limit(limit) => {
692            resolve_count_param(&mut limit.count, params)?;
693            substitute_in_operator(&mut limit.input, params)?;
694        }
695        LogicalOperator::Skip(skip) => {
696            resolve_count_param(&mut skip.count, params)?;
697            substitute_in_operator(&mut skip.input, params)?;
698        }
699        LogicalOperator::Distinct(distinct) => {
700            substitute_in_operator(&mut distinct.input, params)?;
701        }
702        LogicalOperator::CreateNode(create) => {
703            for (_, expr) in &mut create.properties {
704                substitute_in_expression(expr, params)?;
705            }
706            if let Some(input) = &mut create.input {
707                substitute_in_operator(input, params)?;
708            }
709        }
710        LogicalOperator::CreateEdge(create) => {
711            for (_, expr) in &mut create.properties {
712                substitute_in_expression(expr, params)?;
713            }
714            substitute_in_operator(&mut create.input, params)?;
715        }
716        LogicalOperator::DeleteNode(delete) => {
717            substitute_in_operator(&mut delete.input, params)?;
718        }
719        LogicalOperator::DeleteEdge(delete) => {
720            substitute_in_operator(&mut delete.input, params)?;
721        }
722        LogicalOperator::SetProperty(set) => {
723            for (_, expr) in &mut set.properties {
724                substitute_in_expression(expr, params)?;
725            }
726            substitute_in_operator(&mut set.input, params)?;
727        }
728        LogicalOperator::Union(union) => {
729            for input in &mut union.inputs {
730                substitute_in_operator(input, params)?;
731            }
732        }
733        LogicalOperator::AntiJoin(anti) => {
734            substitute_in_operator(&mut anti.left, params)?;
735            substitute_in_operator(&mut anti.right, params)?;
736        }
737        LogicalOperator::Bind(bind) => {
738            substitute_in_expression(&mut bind.expression, params)?;
739            substitute_in_operator(&mut bind.input, params)?;
740        }
741        LogicalOperator::TripleScan(scan) => {
742            if let Some(input) = &mut scan.input {
743                substitute_in_operator(input, params)?;
744            }
745        }
746        LogicalOperator::Unwind(unwind) => {
747            substitute_in_expression(&mut unwind.expression, params)?;
748            substitute_in_operator(&mut unwind.input, params)?;
749        }
750        LogicalOperator::MapCollect(mc) => {
751            substitute_in_operator(&mut mc.input, params)?;
752        }
753        LogicalOperator::Merge(merge) => {
754            for (_, expr) in &mut merge.match_properties {
755                substitute_in_expression(expr, params)?;
756            }
757            for (_, expr) in &mut merge.on_create {
758                substitute_in_expression(expr, params)?;
759            }
760            for (_, expr) in &mut merge.on_match {
761                substitute_in_expression(expr, params)?;
762            }
763            substitute_in_operator(&mut merge.input, params)?;
764        }
765        LogicalOperator::MergeRelationship(merge_rel) => {
766            for (_, expr) in &mut merge_rel.match_properties {
767                substitute_in_expression(expr, params)?;
768            }
769            for (_, expr) in &mut merge_rel.on_create {
770                substitute_in_expression(expr, params)?;
771            }
772            for (_, expr) in &mut merge_rel.on_match {
773                substitute_in_expression(expr, params)?;
774            }
775            substitute_in_operator(&mut merge_rel.input, params)?;
776        }
777        LogicalOperator::AddLabel(add_label) => {
778            substitute_in_operator(&mut add_label.input, params)?;
779        }
780        LogicalOperator::RemoveLabel(remove_label) => {
781            substitute_in_operator(&mut remove_label.input, params)?;
782        }
783        LogicalOperator::ShortestPath(sp) => {
784            substitute_in_operator(&mut sp.input, params)?;
785        }
786        // SPARQL Update operators
787        LogicalOperator::InsertTriple(insert) => {
788            if let Some(ref mut input) = insert.input {
789                substitute_in_operator(input, params)?;
790            }
791        }
792        LogicalOperator::DeleteTriple(delete) => {
793            if let Some(ref mut input) = delete.input {
794                substitute_in_operator(input, params)?;
795            }
796        }
797        LogicalOperator::Modify(modify) => {
798            substitute_in_operator(&mut modify.where_clause, params)?;
799        }
800        LogicalOperator::ClearGraph(_)
801        | LogicalOperator::CreateGraph(_)
802        | LogicalOperator::DropGraph(_)
803        | LogicalOperator::LoadGraph(_)
804        | LogicalOperator::CopyGraph(_)
805        | LogicalOperator::MoveGraph(_)
806        | LogicalOperator::AddGraph(_) => {}
807        LogicalOperator::HorizontalAggregate(op) => {
808            substitute_in_operator(&mut op.input, params)?;
809        }
810        LogicalOperator::Empty => {}
811        LogicalOperator::VectorScan(scan) => {
812            substitute_in_expression(&mut scan.query_vector, params)?;
813            if let Some(ref mut input) = scan.input {
814                substitute_in_operator(input, params)?;
815            }
816        }
817        LogicalOperator::VectorJoin(join) => {
818            substitute_in_expression(&mut join.query_vector, params)?;
819            substitute_in_operator(&mut join.input, params)?;
820        }
821        LogicalOperator::Except(except) => {
822            substitute_in_operator(&mut except.left, params)?;
823            substitute_in_operator(&mut except.right, params)?;
824        }
825        LogicalOperator::Intersect(intersect) => {
826            substitute_in_operator(&mut intersect.left, params)?;
827            substitute_in_operator(&mut intersect.right, params)?;
828        }
829        LogicalOperator::Otherwise(otherwise) => {
830            substitute_in_operator(&mut otherwise.left, params)?;
831            substitute_in_operator(&mut otherwise.right, params)?;
832        }
833        LogicalOperator::Apply(apply) => {
834            substitute_in_operator(&mut apply.input, params)?;
835            substitute_in_operator(&mut apply.subplan, params)?;
836        }
837        // ParameterScan has no expressions to substitute
838        LogicalOperator::ParameterScan(_) => {}
839        LogicalOperator::MultiWayJoin(mwj) => {
840            for input in &mut mwj.inputs {
841                substitute_in_operator(input, params)?;
842            }
843            for cond in &mut mwj.conditions {
844                substitute_in_expression(&mut cond.left, params)?;
845                substitute_in_expression(&mut cond.right, params)?;
846            }
847        }
848        // DDL operators have no expressions to substitute
849        LogicalOperator::CreatePropertyGraph(_) => {}
850        // Procedure calls: arguments could contain parameters but we handle at execution time
851        LogicalOperator::CallProcedure(_) => {}
852        // LoadData: file path is a literal, no parameter substitution needed
853        LogicalOperator::LoadData(_) => {}
854    }
855    Ok(())
856}
857
858/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
859fn resolve_count_param(
860    count: &mut crate::query::plan::CountExpr,
861    params: &QueryParams,
862) -> Result<()> {
863    use crate::query::plan::CountExpr;
864    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
865
866    if let CountExpr::Parameter(name) = count {
867        let value = params.get(name.as_str()).ok_or_else(|| {
868            Error::Query(QueryError::new(
869                QueryErrorKind::Semantic,
870                format!("Missing parameter for SKIP/LIMIT: ${name}"),
871            ))
872        })?;
873        let n = match value {
874            Value::Int64(i) if *i >= 0 => *i as usize,
875            Value::Int64(i) => {
876                return Err(Error::Query(QueryError::new(
877                    QueryErrorKind::Semantic,
878                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
879                )));
880            }
881            other => {
882                return Err(Error::Query(QueryError::new(
883                    QueryErrorKind::Semantic,
884                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
885                )));
886            }
887        };
888        *count = CountExpr::Literal(n);
889    }
890    Ok(())
891}
892
893/// Substitutes parameters in an expression with their values.
894fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
895    use crate::query::plan::LogicalExpression;
896
897    match expr {
898        LogicalExpression::Parameter(name) => {
899            if let Some(value) = params.get(name) {
900                *expr = LogicalExpression::Literal(value.clone());
901            } else {
902                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
903            }
904        }
905        LogicalExpression::Binary { left, right, .. } => {
906            substitute_in_expression(left, params)?;
907            substitute_in_expression(right, params)?;
908        }
909        LogicalExpression::Unary { operand, .. } => {
910            substitute_in_expression(operand, params)?;
911        }
912        LogicalExpression::FunctionCall { args, .. } => {
913            for arg in args {
914                substitute_in_expression(arg, params)?;
915            }
916        }
917        LogicalExpression::List(items) => {
918            for item in items {
919                substitute_in_expression(item, params)?;
920            }
921        }
922        LogicalExpression::Map(pairs) => {
923            for (_, value) in pairs {
924                substitute_in_expression(value, params)?;
925            }
926        }
927        LogicalExpression::IndexAccess { base, index } => {
928            substitute_in_expression(base, params)?;
929            substitute_in_expression(index, params)?;
930        }
931        LogicalExpression::SliceAccess { base, start, end } => {
932            substitute_in_expression(base, params)?;
933            if let Some(s) = start {
934                substitute_in_expression(s, params)?;
935            }
936            if let Some(e) = end {
937                substitute_in_expression(e, params)?;
938            }
939        }
940        LogicalExpression::Case {
941            operand,
942            when_clauses,
943            else_clause,
944        } => {
945            if let Some(op) = operand {
946                substitute_in_expression(op, params)?;
947            }
948            for (cond, result) in when_clauses {
949                substitute_in_expression(cond, params)?;
950                substitute_in_expression(result, params)?;
951            }
952            if let Some(el) = else_clause {
953                substitute_in_expression(el, params)?;
954            }
955        }
956        LogicalExpression::Property { .. }
957        | LogicalExpression::Variable(_)
958        | LogicalExpression::Literal(_)
959        | LogicalExpression::Labels(_)
960        | LogicalExpression::Type(_)
961        | LogicalExpression::Id(_) => {}
962        LogicalExpression::ListComprehension {
963            list_expr,
964            filter_expr,
965            map_expr,
966            ..
967        } => {
968            substitute_in_expression(list_expr, params)?;
969            if let Some(filter) = filter_expr {
970                substitute_in_expression(filter, params)?;
971            }
972            substitute_in_expression(map_expr, params)?;
973        }
974        LogicalExpression::ListPredicate {
975            list_expr,
976            predicate,
977            ..
978        } => {
979            substitute_in_expression(list_expr, params)?;
980            substitute_in_expression(predicate, params)?;
981        }
982        LogicalExpression::ExistsSubquery(_)
983        | LogicalExpression::CountSubquery(_)
984        | LogicalExpression::ValueSubquery(_) => {
985            // Subqueries would need recursive parameter substitution
986        }
987        LogicalExpression::PatternComprehension { projection, .. } => {
988            substitute_in_expression(projection, params)?;
989        }
990        LogicalExpression::MapProjection { entries, .. } => {
991            for entry in entries {
992                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
993                    substitute_in_expression(expr, params)?;
994                }
995            }
996        }
997        LogicalExpression::Reduce {
998            initial,
999            list,
1000            expression,
1001            ..
1002        } => {
1003            substitute_in_expression(initial, params)?;
1004            substitute_in_expression(list, params)?;
1005            substitute_in_expression(expression, params)?;
1006        }
1007    }
1008    Ok(())
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013    use super::*;
1014
1015    #[test]
1016    fn test_query_language_is_lpg() {
1017        #[cfg(feature = "gql")]
1018        assert!(QueryLanguage::Gql.is_lpg());
1019        #[cfg(feature = "cypher")]
1020        assert!(QueryLanguage::Cypher.is_lpg());
1021        #[cfg(feature = "sparql")]
1022        assert!(!QueryLanguage::Sparql.is_lpg());
1023    }
1024
1025    #[test]
1026    fn test_processor_creation() {
1027        let store = Arc::new(LpgStore::new().unwrap());
1028        let processor = QueryProcessor::for_lpg(store);
1029        assert!(processor.lpg_store().node_count() == 0);
1030    }
1031
1032    #[cfg(feature = "gql")]
1033    #[test]
1034    fn test_process_simple_gql() {
1035        let store = Arc::new(LpgStore::new().unwrap());
1036        store.create_node(&["Person"]);
1037        store.create_node(&["Person"]);
1038
1039        let processor = QueryProcessor::for_lpg(store);
1040        let result = processor
1041            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1042            .unwrap();
1043
1044        assert_eq!(result.row_count(), 2);
1045        assert_eq!(result.columns[0], "n");
1046    }
1047
1048    #[cfg(feature = "cypher")]
1049    #[test]
1050    fn test_process_simple_cypher() {
1051        let store = Arc::new(LpgStore::new().unwrap());
1052        store.create_node(&["Person"]);
1053
1054        let processor = QueryProcessor::for_lpg(store);
1055        let result = processor
1056            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1057            .unwrap();
1058
1059        assert_eq!(result.row_count(), 1);
1060    }
1061
1062    #[cfg(feature = "gql")]
1063    #[test]
1064    fn test_process_with_params() {
1065        let store = Arc::new(LpgStore::new().unwrap());
1066        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1067        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1068        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1069
1070        let processor = QueryProcessor::for_lpg(store);
1071
1072        // Query with parameter
1073        let mut params = HashMap::new();
1074        params.insert("min_age".to_string(), Value::Int64(30));
1075
1076        let result = processor
1077            .process(
1078                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1079                QueryLanguage::Gql,
1080                Some(&params),
1081            )
1082            .unwrap();
1083
1084        // Should return 2 people (ages 35 and 45)
1085        assert_eq!(result.row_count(), 2);
1086    }
1087
1088    #[cfg(feature = "gql")]
1089    #[test]
1090    fn test_missing_param_error() {
1091        let store = Arc::new(LpgStore::new().unwrap());
1092        store.create_node(&["Person"]);
1093
1094        let processor = QueryProcessor::for_lpg(store);
1095
1096        // Query with parameter but empty params map (missing the required param)
1097        let params: HashMap<String, Value> = HashMap::new();
1098        let result = processor.process(
1099            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1100            QueryLanguage::Gql,
1101            Some(&params),
1102        );
1103
1104        // Should fail with missing parameter error
1105        assert!(result.is_err());
1106        let err = result.unwrap_err();
1107        assert!(
1108            err.to_string().contains("Missing parameter"),
1109            "Expected 'Missing parameter' error, got: {}",
1110            err
1111        );
1112    }
1113
1114    #[cfg(feature = "gql")]
1115    #[test]
1116    fn test_params_in_filter_with_property() {
1117        // Tests parameter substitution in WHERE clause with property comparison
1118        let store = Arc::new(LpgStore::new().unwrap());
1119        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1120        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1121
1122        let processor = QueryProcessor::for_lpg(store);
1123
1124        let mut params = HashMap::new();
1125        params.insert("threshold".to_string(), Value::Int64(15));
1126
1127        let result = processor
1128            .process(
1129                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1130                QueryLanguage::Gql,
1131                Some(&params),
1132            )
1133            .unwrap();
1134
1135        // Only value=20 matches > 15
1136        assert_eq!(result.row_count(), 1);
1137        let row = &result.rows[0];
1138        assert_eq!(row[0], Value::Int64(20));
1139    }
1140
1141    #[cfg(feature = "gql")]
1142    #[test]
1143    fn test_params_in_multiple_where_conditions() {
1144        // Tests multiple parameters in WHERE clause with AND
1145        let store = Arc::new(LpgStore::new().unwrap());
1146        store.create_node_with_props(
1147            &["Person"],
1148            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1149        );
1150        store.create_node_with_props(
1151            &["Person"],
1152            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1153        );
1154        store.create_node_with_props(
1155            &["Person"],
1156            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1157        );
1158
1159        let processor = QueryProcessor::for_lpg(store);
1160
1161        let mut params = HashMap::new();
1162        params.insert("min_age".to_string(), Value::Int64(30));
1163        params.insert("min_score".to_string(), Value::Int64(75));
1164
1165        let result = processor
1166            .process(
1167                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1168                QueryLanguage::Gql,
1169                Some(&params),
1170            )
1171            .unwrap();
1172
1173        // Only the person with age=35, score=90 matches both conditions
1174        assert_eq!(result.row_count(), 1);
1175    }
1176
1177    #[cfg(feature = "gql")]
1178    #[test]
1179    fn test_params_with_in_list() {
1180        // Tests parameter as a value checked against IN list
1181        let store = Arc::new(LpgStore::new().unwrap());
1182        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1183        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1184        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1185
1186        let processor = QueryProcessor::for_lpg(store);
1187
1188        // Check if a parameter value matches any of the statuses
1189        let mut params = HashMap::new();
1190        params.insert("target".to_string(), Value::String("active".into()));
1191
1192        let result = processor
1193            .process(
1194                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1195                QueryLanguage::Gql,
1196                Some(&params),
1197            )
1198            .unwrap();
1199
1200        assert_eq!(result.row_count(), 1);
1201    }
1202
1203    #[cfg(feature = "gql")]
1204    #[test]
1205    fn test_params_same_type_comparison() {
1206        // Tests that same-type parameter comparisons work correctly
1207        let store = Arc::new(LpgStore::new().unwrap());
1208        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1209        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1210
1211        let processor = QueryProcessor::for_lpg(store);
1212
1213        // Compare int property with int parameter
1214        let mut params = HashMap::new();
1215        params.insert("threshold".to_string(), Value::Int64(75));
1216
1217        let result = processor
1218            .process(
1219                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1220                QueryLanguage::Gql,
1221                Some(&params),
1222            )
1223            .unwrap();
1224
1225        // Only value=100 matches > 75
1226        assert_eq!(result.row_count(), 1);
1227    }
1228
1229    #[cfg(feature = "gql")]
1230    #[test]
1231    fn test_process_empty_result_has_columns() {
1232        // Tests that empty results still have correct column names
1233        let store = Arc::new(LpgStore::new().unwrap());
1234        // Don't create any nodes
1235
1236        let processor = QueryProcessor::for_lpg(store);
1237        let result = processor
1238            .process(
1239                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1240                QueryLanguage::Gql,
1241                None,
1242            )
1243            .unwrap();
1244
1245        assert_eq!(result.row_count(), 0);
1246        assert_eq!(result.columns.len(), 2);
1247        assert_eq!(result.columns[0], "name");
1248        assert_eq!(result.columns[1], "age");
1249    }
1250
1251    #[cfg(feature = "gql")]
1252    #[test]
1253    fn test_params_string_equality() {
1254        // Tests string parameter equality comparison
1255        let store = Arc::new(LpgStore::new().unwrap());
1256        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1257        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1258        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1259
1260        let processor = QueryProcessor::for_lpg(store);
1261
1262        let mut params = HashMap::new();
1263        params.insert("target".to_string(), Value::String("beta".into()));
1264
1265        let result = processor
1266            .process(
1267                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1268                QueryLanguage::Gql,
1269                Some(&params),
1270            )
1271            .unwrap();
1272
1273        assert_eq!(result.row_count(), 1);
1274        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1275    }
1276}