Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    transaction_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    transaction_context: Option<(EpochId, TransactionId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            transaction_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            transaction_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_transaction(
136        store: Arc<LpgStore>,
137        transaction_manager: Arc<TransactionManager>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&store);
140        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141        Self {
142            lpg_store: store,
143            graph_store,
144            transaction_manager,
145            catalog: Arc::new(Catalog::new()),
146            optimizer,
147            transaction_context: None,
148            #[cfg(feature = "rdf")]
149            rdf_store: None,
150        }
151    }
152
153    /// Creates a query processor backed by any `GraphStoreMut` implementation.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the internal arena allocation fails (out of memory).
158    pub fn for_graph_store_with_transaction(
159        store: Arc<dyn GraphStoreMut>,
160        transaction_manager: Arc<TransactionManager>,
161    ) -> Result<Self> {
162        let optimizer = Optimizer::from_graph_store(&*store);
163        Ok(Self {
164            lpg_store: Arc::new(LpgStore::new()?),
165            graph_store: store,
166            transaction_manager,
167            catalog: Arc::new(Catalog::new()),
168            optimizer,
169            transaction_context: None,
170            #[cfg(feature = "rdf")]
171            rdf_store: None,
172        })
173    }
174
175    /// Sets the transaction context for MVCC visibility.
176    ///
177    /// This should be called when the processor is used within a transaction.
178    #[must_use]
179    pub fn with_transaction_context(
180        mut self,
181        viewing_epoch: EpochId,
182        transaction_id: TransactionId,
183    ) -> Self {
184        self.transaction_context = Some((viewing_epoch, transaction_id));
185        self
186    }
187
188    /// Sets a custom catalog.
189    #[must_use]
190    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
191        self.catalog = catalog;
192        self
193    }
194
195    /// Sets a custom optimizer.
196    #[must_use]
197    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
198        self.optimizer = optimizer;
199        self
200    }
201
202    /// Processes a query string and returns results.
203    ///
204    /// Pipeline:
205    /// 1. Parse (language-specific parser → AST)
206    /// 2. Translate (AST → LogicalPlan)
207    /// 3. Bind (semantic validation)
208    /// 4. Optimize (filter pushdown, join reorder, etc.)
209    /// 5. Plan (logical → physical operators)
210    /// 6. Execute (run operators, collect results)
211    ///
212    /// # Arguments
213    ///
214    /// * `query` - The query string
215    /// * `language` - Which query language to use
216    /// * `params` - Optional query parameters for prepared statements
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if any stage of the pipeline fails.
221    pub fn process(
222        &self,
223        query: &str,
224        language: QueryLanguage,
225        params: Option<&QueryParams>,
226    ) -> Result<QueryResult> {
227        if language.is_lpg() {
228            self.process_lpg(query, language, params)
229        } else {
230            #[cfg(feature = "rdf")]
231            {
232                self.process_rdf(query, language, params)
233            }
234            #[cfg(not(feature = "rdf"))]
235            {
236                Err(Error::Internal(
237                    "RDF support not enabled. Compile with --features rdf".to_string(),
238                ))
239            }
240        }
241    }
242
243    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
244    fn process_lpg(
245        &self,
246        query: &str,
247        language: QueryLanguage,
248        params: Option<&QueryParams>,
249    ) -> Result<QueryResult> {
250        #[cfg(not(target_arch = "wasm32"))]
251        let start_time = std::time::Instant::now();
252
253        // 1. Parse and translate to logical plan
254        let mut logical_plan = self.translate_lpg(query, language)?;
255
256        // 2. Substitute parameters if provided
257        if let Some(params) = params {
258            substitute_params(&mut logical_plan, params)?;
259        }
260
261        // 3. Semantic validation
262        let mut binder = Binder::new();
263        let _binding_context = binder.bind(&logical_plan)?;
264
265        // 4. Optimize the plan
266        let optimized_plan = self.optimizer.optimize(logical_plan)?;
267
268        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
269        if optimized_plan.explain {
270            let mut plan = optimized_plan;
271            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
272            return Ok(explain_result(&plan));
273        }
274
275        // 5. Convert to physical plan with transaction context
276        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
277            Planner::with_context(
278                Arc::clone(&self.graph_store),
279                Arc::clone(&self.transaction_manager),
280                Some(transaction_id),
281                epoch,
282            )
283        } else {
284            Planner::with_context(
285                Arc::clone(&self.graph_store),
286                Arc::clone(&self.transaction_manager),
287                None,
288                self.transaction_manager.current_epoch(),
289            )
290        };
291        let mut physical_plan = planner.plan(&optimized_plan)?;
292
293        // 6. Execute and collect results
294        let executor = Executor::with_columns(physical_plan.columns.clone());
295        let mut result = executor.execute(physical_plan.operator.as_mut())?;
296
297        // Add execution metrics
298        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
299        #[cfg(not(target_arch = "wasm32"))]
300        {
301            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
302            result.execution_time_ms = Some(elapsed_ms);
303        }
304        result.rows_scanned = Some(rows_scanned);
305
306        Ok(result)
307    }
308
309    /// Translates an LPG query to a logical plan.
310    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
311        match language {
312            #[cfg(feature = "gql")]
313            QueryLanguage::Gql => {
314                use crate::query::translators::gql;
315                gql::translate(query)
316            }
317            #[cfg(feature = "cypher")]
318            QueryLanguage::Cypher => {
319                use crate::query::translators::cypher;
320                cypher::translate(query)
321            }
322            #[cfg(feature = "gremlin")]
323            QueryLanguage::Gremlin => {
324                use crate::query::translators::gremlin;
325                gremlin::translate(query)
326            }
327            #[cfg(feature = "graphql")]
328            QueryLanguage::GraphQL => {
329                use crate::query::translators::graphql;
330                graphql::translate(query)
331            }
332            #[cfg(feature = "sql-pgq")]
333            QueryLanguage::SqlPgq => {
334                use crate::query::translators::sql_pgq;
335                sql_pgq::translate(query)
336            }
337            #[allow(unreachable_patterns)]
338            _ => Err(Error::Internal(format!(
339                "Language {:?} is not an LPG language",
340                language
341            ))),
342        }
343    }
344
345    /// Returns a reference to the LPG store.
346    #[must_use]
347    pub fn lpg_store(&self) -> &Arc<LpgStore> {
348        &self.lpg_store
349    }
350
351    /// Returns a reference to the catalog.
352    #[must_use]
353    pub fn catalog(&self) -> &Arc<Catalog> {
354        &self.catalog
355    }
356
357    /// Returns a reference to the optimizer.
358    #[must_use]
359    pub fn optimizer(&self) -> &Optimizer {
360        &self.optimizer
361    }
362}
363
364impl QueryProcessor {
365    /// Returns a reference to the transaction manager.
366    #[must_use]
367    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
368        &self.transaction_manager
369    }
370}
371
372// =========================================================================
373// RDF-specific methods (gated behind `rdf` feature)
374// =========================================================================
375
376#[cfg(feature = "rdf")]
377impl QueryProcessor {
378    /// Creates a new query processor with both LPG and RDF stores.
379    #[must_use]
380    pub fn with_rdf(
381        lpg_store: Arc<LpgStore>,
382        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
383    ) -> Self {
384        let optimizer = Optimizer::from_store(&lpg_store);
385        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
386        Self {
387            lpg_store,
388            graph_store,
389            transaction_manager: Arc::new(TransactionManager::new()),
390            catalog: Arc::new(Catalog::new()),
391            optimizer,
392            transaction_context: None,
393            rdf_store: Some(rdf_store),
394        }
395    }
396
397    /// Returns a reference to the RDF store (if configured).
398    #[must_use]
399    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
400        self.rdf_store.as_ref()
401    }
402
403    /// Processes an RDF query (SPARQL, GraphQL-RDF).
404    fn process_rdf(
405        &self,
406        query: &str,
407        language: QueryLanguage,
408        params: Option<&QueryParams>,
409    ) -> Result<QueryResult> {
410        use crate::query::planner::rdf::RdfPlanner;
411
412        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
413            Error::Internal("RDF store not configured for this processor".to_string())
414        })?;
415
416        // 1. Parse and translate to logical plan
417        let mut logical_plan = self.translate_rdf(query, language)?;
418
419        // 2. Substitute parameters if provided
420        if let Some(params) = params {
421            substitute_params(&mut logical_plan, params)?;
422        }
423
424        // 3. Semantic validation
425        let mut binder = Binder::new();
426        let _binding_context = binder.bind(&logical_plan)?;
427
428        // 3. Optimize the plan
429        let optimized_plan = self.optimizer.optimize(logical_plan)?;
430
431        // 3a. EXPLAIN: return the optimized plan tree without executing
432        if optimized_plan.explain {
433            return Ok(explain_result(&optimized_plan));
434        }
435
436        // 4. Convert to physical plan (using RDF planner)
437        let planner = RdfPlanner::new(Arc::clone(rdf_store));
438        let mut physical_plan = planner.plan(&optimized_plan)?;
439
440        // 5. Execute and collect results
441        let executor = Executor::with_columns(physical_plan.columns.clone());
442        executor.execute(physical_plan.operator.as_mut())
443    }
444
445    /// Translates an RDF query to a logical plan.
446    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
447        match language {
448            #[cfg(feature = "sparql")]
449            QueryLanguage::Sparql => {
450                use crate::query::translators::sparql;
451                sparql::translate(query)
452            }
453            #[cfg(all(feature = "graphql", feature = "rdf"))]
454            QueryLanguage::GraphQLRdf => {
455                use crate::query::translators::graphql_rdf;
456                // Default namespace for GraphQL-RDF queries
457                graphql_rdf::translate(query, "http://example.org/")
458            }
459            _ => Err(Error::Internal(format!(
460                "Language {:?} is not an RDF language",
461                language
462            ))),
463        }
464    }
465}
466
467/// Annotates filter operators in the plan with pushdown hints.
468///
469/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
470/// whether a property index exists for equality predicates.
471pub(crate) fn annotate_pushdown_hints(
472    op: &mut LogicalOperator,
473    store: &dyn grafeo_core::graph::GraphStore,
474) {
475    #[allow(clippy::wildcard_imports)]
476    use crate::query::plan::*;
477
478    match op {
479        LogicalOperator::Filter(filter) => {
480            // Recurse into children first
481            annotate_pushdown_hints(&mut filter.input, store);
482
483            // Annotate this filter if it sits on top of a NodeScan
484            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
485                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
486            }
487        }
488        LogicalOperator::NodeScan(op) => {
489            if let Some(input) = &mut op.input {
490                annotate_pushdown_hints(input, store);
491            }
492        }
493        LogicalOperator::EdgeScan(op) => {
494            if let Some(input) = &mut op.input {
495                annotate_pushdown_hints(input, store);
496            }
497        }
498        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
499        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
500        LogicalOperator::Join(op) => {
501            annotate_pushdown_hints(&mut op.left, store);
502            annotate_pushdown_hints(&mut op.right, store);
503        }
504        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
505        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
506        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
507        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
508        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
509        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
510        LogicalOperator::Union(op) => {
511            for input in &mut op.inputs {
512                annotate_pushdown_hints(input, store);
513            }
514        }
515        LogicalOperator::Apply(op) => {
516            annotate_pushdown_hints(&mut op.input, store);
517            annotate_pushdown_hints(&mut op.subplan, store);
518        }
519        LogicalOperator::Otherwise(op) => {
520            annotate_pushdown_hints(&mut op.left, store);
521            annotate_pushdown_hints(&mut op.right, store);
522        }
523        _ => {}
524    }
525}
526
527/// Infers the pushdown strategy for a filter predicate over a node scan.
528fn infer_pushdown(
529    predicate: &LogicalExpression,
530    scan: &crate::query::plan::NodeScanOp,
531    store: &dyn grafeo_core::graph::GraphStore,
532) -> Option<crate::query::plan::PushdownHint> {
533    #[allow(clippy::wildcard_imports)]
534    use crate::query::plan::*;
535
536    match predicate {
537        // Equality: n.prop = value
538        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
539            if let Some(prop) = extract_property_name(left, &scan.variable)
540                .or_else(|| extract_property_name(right, &scan.variable))
541            {
542                if store.has_property_index(&prop) {
543                    return Some(PushdownHint::IndexLookup { property: prop });
544                }
545                if scan.label.is_some() {
546                    return Some(PushdownHint::LabelFirst);
547                }
548            }
549            None
550        }
551        // Range: n.prop > value, n.prop < value, etc.
552        LogicalExpression::Binary {
553            left,
554            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
555            right,
556        } => {
557            if let Some(prop) = extract_property_name(left, &scan.variable)
558                .or_else(|| extract_property_name(right, &scan.variable))
559            {
560                if store.has_property_index(&prop) {
561                    return Some(PushdownHint::RangeScan { property: prop });
562                }
563                if scan.label.is_some() {
564                    return Some(PushdownHint::LabelFirst);
565                }
566            }
567            None
568        }
569        // AND: check the left side (first conjunct) for pushdown
570        LogicalExpression::Binary {
571            left,
572            op: BinaryOp::And,
573            ..
574        } => infer_pushdown(left, scan, store),
575        _ => {
576            // Any other predicate on a labeled scan gets label-first
577            if scan.label.is_some() {
578                Some(PushdownHint::LabelFirst)
579            } else {
580                None
581            }
582        }
583    }
584}
585
586/// Extracts the property name if the expression is `Property { variable, property }`
587/// and the variable matches the scan variable.
588fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
589    if let LogicalExpression::Property { variable, property } = expr
590        && variable == scan_var
591    {
592        Some(property.clone())
593    } else {
594        None
595    }
596}
597
598/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
599pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
600    let tree_text = plan.root.explain_tree();
601    QueryResult {
602        columns: vec!["plan".to_string()],
603        column_types: vec![grafeo_common::types::LogicalType::String],
604        rows: vec![vec![Value::String(tree_text.into())]],
605        execution_time_ms: None,
606        rows_scanned: None,
607        status_message: None,
608        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
609    }
610}
611
612/// Substitutes parameters in a logical plan with their values.
613pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
614    substitute_in_operator(&mut plan.root, params)
615}
616
617/// Recursively substitutes parameters in an operator.
618fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
619    #[allow(clippy::wildcard_imports)]
620    use crate::query::plan::*;
621
622    match op {
623        LogicalOperator::Filter(filter) => {
624            substitute_in_expression(&mut filter.predicate, params)?;
625            substitute_in_operator(&mut filter.input, params)?;
626        }
627        LogicalOperator::Return(ret) => {
628            for item in &mut ret.items {
629                substitute_in_expression(&mut item.expression, params)?;
630            }
631            substitute_in_operator(&mut ret.input, params)?;
632        }
633        LogicalOperator::Project(proj) => {
634            for p in &mut proj.projections {
635                substitute_in_expression(&mut p.expression, params)?;
636            }
637            substitute_in_operator(&mut proj.input, params)?;
638        }
639        LogicalOperator::NodeScan(scan) => {
640            if let Some(input) = &mut scan.input {
641                substitute_in_operator(input, params)?;
642            }
643        }
644        LogicalOperator::EdgeScan(scan) => {
645            if let Some(input) = &mut scan.input {
646                substitute_in_operator(input, params)?;
647            }
648        }
649        LogicalOperator::Expand(expand) => {
650            substitute_in_operator(&mut expand.input, params)?;
651        }
652        LogicalOperator::Join(join) => {
653            substitute_in_operator(&mut join.left, params)?;
654            substitute_in_operator(&mut join.right, params)?;
655            for cond in &mut join.conditions {
656                substitute_in_expression(&mut cond.left, params)?;
657                substitute_in_expression(&mut cond.right, params)?;
658            }
659        }
660        LogicalOperator::LeftJoin(join) => {
661            substitute_in_operator(&mut join.left, params)?;
662            substitute_in_operator(&mut join.right, params)?;
663            if let Some(cond) = &mut join.condition {
664                substitute_in_expression(cond, params)?;
665            }
666        }
667        LogicalOperator::Aggregate(agg) => {
668            for expr in &mut agg.group_by {
669                substitute_in_expression(expr, params)?;
670            }
671            for agg_expr in &mut agg.aggregates {
672                if let Some(expr) = &mut agg_expr.expression {
673                    substitute_in_expression(expr, params)?;
674                }
675            }
676            substitute_in_operator(&mut agg.input, params)?;
677        }
678        LogicalOperator::Sort(sort) => {
679            for key in &mut sort.keys {
680                substitute_in_expression(&mut key.expression, params)?;
681            }
682            substitute_in_operator(&mut sort.input, params)?;
683        }
684        LogicalOperator::Limit(limit) => {
685            resolve_count_param(&mut limit.count, params)?;
686            substitute_in_operator(&mut limit.input, params)?;
687        }
688        LogicalOperator::Skip(skip) => {
689            resolve_count_param(&mut skip.count, params)?;
690            substitute_in_operator(&mut skip.input, params)?;
691        }
692        LogicalOperator::Distinct(distinct) => {
693            substitute_in_operator(&mut distinct.input, params)?;
694        }
695        LogicalOperator::CreateNode(create) => {
696            for (_, expr) in &mut create.properties {
697                substitute_in_expression(expr, params)?;
698            }
699            if let Some(input) = &mut create.input {
700                substitute_in_operator(input, params)?;
701            }
702        }
703        LogicalOperator::CreateEdge(create) => {
704            for (_, expr) in &mut create.properties {
705                substitute_in_expression(expr, params)?;
706            }
707            substitute_in_operator(&mut create.input, params)?;
708        }
709        LogicalOperator::DeleteNode(delete) => {
710            substitute_in_operator(&mut delete.input, params)?;
711        }
712        LogicalOperator::DeleteEdge(delete) => {
713            substitute_in_operator(&mut delete.input, params)?;
714        }
715        LogicalOperator::SetProperty(set) => {
716            for (_, expr) in &mut set.properties {
717                substitute_in_expression(expr, params)?;
718            }
719            substitute_in_operator(&mut set.input, params)?;
720        }
721        LogicalOperator::Union(union) => {
722            for input in &mut union.inputs {
723                substitute_in_operator(input, params)?;
724            }
725        }
726        LogicalOperator::AntiJoin(anti) => {
727            substitute_in_operator(&mut anti.left, params)?;
728            substitute_in_operator(&mut anti.right, params)?;
729        }
730        LogicalOperator::Bind(bind) => {
731            substitute_in_expression(&mut bind.expression, params)?;
732            substitute_in_operator(&mut bind.input, params)?;
733        }
734        LogicalOperator::TripleScan(scan) => {
735            if let Some(input) = &mut scan.input {
736                substitute_in_operator(input, params)?;
737            }
738        }
739        LogicalOperator::Unwind(unwind) => {
740            substitute_in_expression(&mut unwind.expression, params)?;
741            substitute_in_operator(&mut unwind.input, params)?;
742        }
743        LogicalOperator::MapCollect(mc) => {
744            substitute_in_operator(&mut mc.input, params)?;
745        }
746        LogicalOperator::Merge(merge) => {
747            for (_, expr) in &mut merge.match_properties {
748                substitute_in_expression(expr, params)?;
749            }
750            for (_, expr) in &mut merge.on_create {
751                substitute_in_expression(expr, params)?;
752            }
753            for (_, expr) in &mut merge.on_match {
754                substitute_in_expression(expr, params)?;
755            }
756            substitute_in_operator(&mut merge.input, params)?;
757        }
758        LogicalOperator::MergeRelationship(merge_rel) => {
759            for (_, expr) in &mut merge_rel.match_properties {
760                substitute_in_expression(expr, params)?;
761            }
762            for (_, expr) in &mut merge_rel.on_create {
763                substitute_in_expression(expr, params)?;
764            }
765            for (_, expr) in &mut merge_rel.on_match {
766                substitute_in_expression(expr, params)?;
767            }
768            substitute_in_operator(&mut merge_rel.input, params)?;
769        }
770        LogicalOperator::AddLabel(add_label) => {
771            substitute_in_operator(&mut add_label.input, params)?;
772        }
773        LogicalOperator::RemoveLabel(remove_label) => {
774            substitute_in_operator(&mut remove_label.input, params)?;
775        }
776        LogicalOperator::ShortestPath(sp) => {
777            substitute_in_operator(&mut sp.input, params)?;
778        }
779        // SPARQL Update operators
780        LogicalOperator::InsertTriple(insert) => {
781            if let Some(ref mut input) = insert.input {
782                substitute_in_operator(input, params)?;
783            }
784        }
785        LogicalOperator::DeleteTriple(delete) => {
786            if let Some(ref mut input) = delete.input {
787                substitute_in_operator(input, params)?;
788            }
789        }
790        LogicalOperator::Modify(modify) => {
791            substitute_in_operator(&mut modify.where_clause, params)?;
792        }
793        LogicalOperator::ClearGraph(_)
794        | LogicalOperator::CreateGraph(_)
795        | LogicalOperator::DropGraph(_)
796        | LogicalOperator::LoadGraph(_)
797        | LogicalOperator::CopyGraph(_)
798        | LogicalOperator::MoveGraph(_)
799        | LogicalOperator::AddGraph(_) => {}
800        LogicalOperator::HorizontalAggregate(op) => {
801            substitute_in_operator(&mut op.input, params)?;
802        }
803        LogicalOperator::Empty => {}
804        LogicalOperator::VectorScan(scan) => {
805            substitute_in_expression(&mut scan.query_vector, params)?;
806            if let Some(ref mut input) = scan.input {
807                substitute_in_operator(input, params)?;
808            }
809        }
810        LogicalOperator::VectorJoin(join) => {
811            substitute_in_expression(&mut join.query_vector, params)?;
812            substitute_in_operator(&mut join.input, params)?;
813        }
814        LogicalOperator::Except(except) => {
815            substitute_in_operator(&mut except.left, params)?;
816            substitute_in_operator(&mut except.right, params)?;
817        }
818        LogicalOperator::Intersect(intersect) => {
819            substitute_in_operator(&mut intersect.left, params)?;
820            substitute_in_operator(&mut intersect.right, params)?;
821        }
822        LogicalOperator::Otherwise(otherwise) => {
823            substitute_in_operator(&mut otherwise.left, params)?;
824            substitute_in_operator(&mut otherwise.right, params)?;
825        }
826        LogicalOperator::Apply(apply) => {
827            substitute_in_operator(&mut apply.input, params)?;
828            substitute_in_operator(&mut apply.subplan, params)?;
829        }
830        // ParameterScan has no expressions to substitute
831        LogicalOperator::ParameterScan(_) => {}
832        LogicalOperator::MultiWayJoin(mwj) => {
833            for input in &mut mwj.inputs {
834                substitute_in_operator(input, params)?;
835            }
836            for cond in &mut mwj.conditions {
837                substitute_in_expression(&mut cond.left, params)?;
838                substitute_in_expression(&mut cond.right, params)?;
839            }
840        }
841        // DDL operators have no expressions to substitute
842        LogicalOperator::CreatePropertyGraph(_) => {}
843        // Procedure calls: arguments could contain parameters but we handle at execution time
844        LogicalOperator::CallProcedure(_) => {}
845        // LoadData: file path is a literal, no parameter substitution needed
846        LogicalOperator::LoadData(_) => {}
847    }
848    Ok(())
849}
850
851/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
852fn resolve_count_param(
853    count: &mut crate::query::plan::CountExpr,
854    params: &QueryParams,
855) -> Result<()> {
856    use crate::query::plan::CountExpr;
857    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
858
859    if let CountExpr::Parameter(name) = count {
860        let value = params.get(name.as_str()).ok_or_else(|| {
861            Error::Query(QueryError::new(
862                QueryErrorKind::Semantic,
863                format!("Missing parameter for SKIP/LIMIT: ${name}"),
864            ))
865        })?;
866        let n = match value {
867            Value::Int64(i) if *i >= 0 => *i as usize,
868            Value::Int64(i) => {
869                return Err(Error::Query(QueryError::new(
870                    QueryErrorKind::Semantic,
871                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
872                )));
873            }
874            other => {
875                return Err(Error::Query(QueryError::new(
876                    QueryErrorKind::Semantic,
877                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
878                )));
879            }
880        };
881        *count = CountExpr::Literal(n);
882    }
883    Ok(())
884}
885
886/// Substitutes parameters in an expression with their values.
887fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
888    use crate::query::plan::LogicalExpression;
889
890    match expr {
891        LogicalExpression::Parameter(name) => {
892            if let Some(value) = params.get(name) {
893                *expr = LogicalExpression::Literal(value.clone());
894            } else {
895                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
896            }
897        }
898        LogicalExpression::Binary { left, right, .. } => {
899            substitute_in_expression(left, params)?;
900            substitute_in_expression(right, params)?;
901        }
902        LogicalExpression::Unary { operand, .. } => {
903            substitute_in_expression(operand, params)?;
904        }
905        LogicalExpression::FunctionCall { args, .. } => {
906            for arg in args {
907                substitute_in_expression(arg, params)?;
908            }
909        }
910        LogicalExpression::List(items) => {
911            for item in items {
912                substitute_in_expression(item, params)?;
913            }
914        }
915        LogicalExpression::Map(pairs) => {
916            for (_, value) in pairs {
917                substitute_in_expression(value, params)?;
918            }
919        }
920        LogicalExpression::IndexAccess { base, index } => {
921            substitute_in_expression(base, params)?;
922            substitute_in_expression(index, params)?;
923        }
924        LogicalExpression::SliceAccess { base, start, end } => {
925            substitute_in_expression(base, params)?;
926            if let Some(s) = start {
927                substitute_in_expression(s, params)?;
928            }
929            if let Some(e) = end {
930                substitute_in_expression(e, params)?;
931            }
932        }
933        LogicalExpression::Case {
934            operand,
935            when_clauses,
936            else_clause,
937        } => {
938            if let Some(op) = operand {
939                substitute_in_expression(op, params)?;
940            }
941            for (cond, result) in when_clauses {
942                substitute_in_expression(cond, params)?;
943                substitute_in_expression(result, params)?;
944            }
945            if let Some(el) = else_clause {
946                substitute_in_expression(el, params)?;
947            }
948        }
949        LogicalExpression::Property { .. }
950        | LogicalExpression::Variable(_)
951        | LogicalExpression::Literal(_)
952        | LogicalExpression::Labels(_)
953        | LogicalExpression::Type(_)
954        | LogicalExpression::Id(_) => {}
955        LogicalExpression::ListComprehension {
956            list_expr,
957            filter_expr,
958            map_expr,
959            ..
960        } => {
961            substitute_in_expression(list_expr, params)?;
962            if let Some(filter) = filter_expr {
963                substitute_in_expression(filter, params)?;
964            }
965            substitute_in_expression(map_expr, params)?;
966        }
967        LogicalExpression::ListPredicate {
968            list_expr,
969            predicate,
970            ..
971        } => {
972            substitute_in_expression(list_expr, params)?;
973            substitute_in_expression(predicate, params)?;
974        }
975        LogicalExpression::ExistsSubquery(_)
976        | LogicalExpression::CountSubquery(_)
977        | LogicalExpression::ValueSubquery(_) => {
978            // Subqueries would need recursive parameter substitution
979        }
980        LogicalExpression::PatternComprehension { projection, .. } => {
981            substitute_in_expression(projection, params)?;
982        }
983        LogicalExpression::MapProjection { entries, .. } => {
984            for entry in entries {
985                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
986                    substitute_in_expression(expr, params)?;
987                }
988            }
989        }
990        LogicalExpression::Reduce {
991            initial,
992            list,
993            expression,
994            ..
995        } => {
996            substitute_in_expression(initial, params)?;
997            substitute_in_expression(list, params)?;
998            substitute_in_expression(expression, params)?;
999        }
1000    }
1001    Ok(())
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007
1008    #[test]
1009    fn test_query_language_is_lpg() {
1010        #[cfg(feature = "gql")]
1011        assert!(QueryLanguage::Gql.is_lpg());
1012        #[cfg(feature = "cypher")]
1013        assert!(QueryLanguage::Cypher.is_lpg());
1014        #[cfg(feature = "sparql")]
1015        assert!(!QueryLanguage::Sparql.is_lpg());
1016    }
1017
1018    #[test]
1019    fn test_processor_creation() {
1020        let store = Arc::new(LpgStore::new().unwrap());
1021        let processor = QueryProcessor::for_lpg(store);
1022        assert!(processor.lpg_store().node_count() == 0);
1023    }
1024
1025    #[cfg(feature = "gql")]
1026    #[test]
1027    fn test_process_simple_gql() {
1028        let store = Arc::new(LpgStore::new().unwrap());
1029        store.create_node(&["Person"]);
1030        store.create_node(&["Person"]);
1031
1032        let processor = QueryProcessor::for_lpg(store);
1033        let result = processor
1034            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1035            .unwrap();
1036
1037        assert_eq!(result.row_count(), 2);
1038        assert_eq!(result.columns[0], "n");
1039    }
1040
1041    #[cfg(feature = "cypher")]
1042    #[test]
1043    fn test_process_simple_cypher() {
1044        let store = Arc::new(LpgStore::new().unwrap());
1045        store.create_node(&["Person"]);
1046
1047        let processor = QueryProcessor::for_lpg(store);
1048        let result = processor
1049            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1050            .unwrap();
1051
1052        assert_eq!(result.row_count(), 1);
1053    }
1054
1055    #[cfg(feature = "gql")]
1056    #[test]
1057    fn test_process_with_params() {
1058        let store = Arc::new(LpgStore::new().unwrap());
1059        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1060        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1061        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1062
1063        let processor = QueryProcessor::for_lpg(store);
1064
1065        // Query with parameter
1066        let mut params = HashMap::new();
1067        params.insert("min_age".to_string(), Value::Int64(30));
1068
1069        let result = processor
1070            .process(
1071                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1072                QueryLanguage::Gql,
1073                Some(&params),
1074            )
1075            .unwrap();
1076
1077        // Should return 2 people (ages 35 and 45)
1078        assert_eq!(result.row_count(), 2);
1079    }
1080
1081    #[cfg(feature = "gql")]
1082    #[test]
1083    fn test_missing_param_error() {
1084        let store = Arc::new(LpgStore::new().unwrap());
1085        store.create_node(&["Person"]);
1086
1087        let processor = QueryProcessor::for_lpg(store);
1088
1089        // Query with parameter but empty params map (missing the required param)
1090        let params: HashMap<String, Value> = HashMap::new();
1091        let result = processor.process(
1092            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1093            QueryLanguage::Gql,
1094            Some(&params),
1095        );
1096
1097        // Should fail with missing parameter error
1098        assert!(result.is_err());
1099        let err = result.unwrap_err();
1100        assert!(
1101            err.to_string().contains("Missing parameter"),
1102            "Expected 'Missing parameter' error, got: {}",
1103            err
1104        );
1105    }
1106
1107    #[cfg(feature = "gql")]
1108    #[test]
1109    fn test_params_in_filter_with_property() {
1110        // Tests parameter substitution in WHERE clause with property comparison
1111        let store = Arc::new(LpgStore::new().unwrap());
1112        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1113        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1114
1115        let processor = QueryProcessor::for_lpg(store);
1116
1117        let mut params = HashMap::new();
1118        params.insert("threshold".to_string(), Value::Int64(15));
1119
1120        let result = processor
1121            .process(
1122                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1123                QueryLanguage::Gql,
1124                Some(&params),
1125            )
1126            .unwrap();
1127
1128        // Only value=20 matches > 15
1129        assert_eq!(result.row_count(), 1);
1130        let row = &result.rows[0];
1131        assert_eq!(row[0], Value::Int64(20));
1132    }
1133
1134    #[cfg(feature = "gql")]
1135    #[test]
1136    fn test_params_in_multiple_where_conditions() {
1137        // Tests multiple parameters in WHERE clause with AND
1138        let store = Arc::new(LpgStore::new().unwrap());
1139        store.create_node_with_props(
1140            &["Person"],
1141            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1142        );
1143        store.create_node_with_props(
1144            &["Person"],
1145            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1146        );
1147        store.create_node_with_props(
1148            &["Person"],
1149            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1150        );
1151
1152        let processor = QueryProcessor::for_lpg(store);
1153
1154        let mut params = HashMap::new();
1155        params.insert("min_age".to_string(), Value::Int64(30));
1156        params.insert("min_score".to_string(), Value::Int64(75));
1157
1158        let result = processor
1159            .process(
1160                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1161                QueryLanguage::Gql,
1162                Some(&params),
1163            )
1164            .unwrap();
1165
1166        // Only the person with age=35, score=90 matches both conditions
1167        assert_eq!(result.row_count(), 1);
1168    }
1169
1170    #[cfg(feature = "gql")]
1171    #[test]
1172    fn test_params_with_in_list() {
1173        // Tests parameter as a value checked against IN list
1174        let store = Arc::new(LpgStore::new().unwrap());
1175        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1176        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1177        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1178
1179        let processor = QueryProcessor::for_lpg(store);
1180
1181        // Check if a parameter value matches any of the statuses
1182        let mut params = HashMap::new();
1183        params.insert("target".to_string(), Value::String("active".into()));
1184
1185        let result = processor
1186            .process(
1187                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1188                QueryLanguage::Gql,
1189                Some(&params),
1190            )
1191            .unwrap();
1192
1193        assert_eq!(result.row_count(), 1);
1194    }
1195
1196    #[cfg(feature = "gql")]
1197    #[test]
1198    fn test_params_same_type_comparison() {
1199        // Tests that same-type parameter comparisons work correctly
1200        let store = Arc::new(LpgStore::new().unwrap());
1201        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1202        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1203
1204        let processor = QueryProcessor::for_lpg(store);
1205
1206        // Compare int property with int parameter
1207        let mut params = HashMap::new();
1208        params.insert("threshold".to_string(), Value::Int64(75));
1209
1210        let result = processor
1211            .process(
1212                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1213                QueryLanguage::Gql,
1214                Some(&params),
1215            )
1216            .unwrap();
1217
1218        // Only value=100 matches > 75
1219        assert_eq!(result.row_count(), 1);
1220    }
1221
1222    #[cfg(feature = "gql")]
1223    #[test]
1224    fn test_process_empty_result_has_columns() {
1225        // Tests that empty results still have correct column names
1226        let store = Arc::new(LpgStore::new().unwrap());
1227        // Don't create any nodes
1228
1229        let processor = QueryProcessor::for_lpg(store);
1230        let result = processor
1231            .process(
1232                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1233                QueryLanguage::Gql,
1234                None,
1235            )
1236            .unwrap();
1237
1238        assert_eq!(result.row_count(), 0);
1239        assert_eq!(result.columns.len(), 2);
1240        assert_eq!(result.columns[0], "name");
1241        assert_eq!(result.columns[1], "age");
1242    }
1243
1244    #[cfg(feature = "gql")]
1245    #[test]
1246    fn test_params_string_equality() {
1247        // Tests string parameter equality comparison
1248        let store = Arc::new(LpgStore::new().unwrap());
1249        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1250        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1251        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1252
1253        let processor = QueryProcessor::for_lpg(store);
1254
1255        let mut params = HashMap::new();
1256        params.insert("target".to_string(), Value::String("beta".into()));
1257
1258        let result = processor
1259            .process(
1260                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1261                QueryLanguage::Gql,
1262                Some(&params),
1263            )
1264            .unwrap();
1265
1266        assert_eq!(result.row_count(), 1);
1267        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1268    }
1269}