Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::lpg::LpgStore;
16use grafeo_core::graph::{GraphStore, GraphStoreMut};
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27/// Supported query languages.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum QueryLanguage {
30    /// GQL (ISO/IEC 39075:2024) - default for LPG
31    #[cfg(feature = "gql")]
32    Gql,
33    /// openCypher 9.0
34    #[cfg(feature = "cypher")]
35    Cypher,
36    /// Apache TinkerPop Gremlin
37    #[cfg(feature = "gremlin")]
38    Gremlin,
39    /// GraphQL for LPG
40    #[cfg(feature = "graphql")]
41    GraphQL,
42    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
43    #[cfg(feature = "sql-pgq")]
44    SqlPgq,
45    /// SPARQL 1.1 for RDF
46    #[cfg(feature = "sparql")]
47    Sparql,
48    /// GraphQL for RDF
49    #[cfg(all(feature = "graphql", feature = "rdf"))]
50    GraphQLRdf,
51}
52
53impl QueryLanguage {
54    /// Returns whether this language targets LPG (vs RDF).
55    #[must_use]
56    pub const fn is_lpg(&self) -> bool {
57        match self {
58            #[cfg(feature = "gql")]
59            Self::Gql => true,
60            #[cfg(feature = "cypher")]
61            Self::Cypher => true,
62            #[cfg(feature = "gremlin")]
63            Self::Gremlin => true,
64            #[cfg(feature = "graphql")]
65            Self::GraphQL => true,
66            #[cfg(feature = "sql-pgq")]
67            Self::SqlPgq => true,
68            #[cfg(feature = "sparql")]
69            Self::Sparql => false,
70            #[cfg(all(feature = "graphql", feature = "rdf"))]
71            Self::GraphQLRdf => false,
72        }
73    }
74}
75
76/// Query parameters for prepared statements.
77pub type QueryParams = HashMap<String, Value>;
78
79/// Processes queries through the full pipeline.
80///
81/// The processor holds references to the stores and provides a unified
82/// interface for executing queries in any supported language.
83///
84/// # Example
85///
86/// ```no_run
87/// # use std::sync::Arc;
88/// # use grafeo_core::graph::lpg::LpgStore;
89/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
90///
91/// # fn main() -> grafeo_common::utils::error::Result<()> {
92/// let store = Arc::new(LpgStore::new().unwrap());
93/// let processor = QueryProcessor::for_lpg(store);
94/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
95/// # Ok(())
96/// # }
97/// ```
98pub struct QueryProcessor {
99    /// LPG store for property graph queries.
100    lpg_store: Arc<LpgStore>,
101    /// Graph store trait object for pluggable storage backends (read path).
102    graph_store: Arc<dyn GraphStore>,
103    /// Writable graph store (None when read-only).
104    write_store: Option<Arc<dyn GraphStoreMut>>,
105    /// Transaction manager for MVCC operations.
106    transaction_manager: Arc<TransactionManager>,
107    /// Catalog for schema and index metadata.
108    catalog: Arc<Catalog>,
109    /// Query optimizer.
110    optimizer: Optimizer,
111    /// Current transaction context (if any).
112    transaction_context: Option<(EpochId, TransactionId)>,
113    /// RDF store for triple pattern queries (optional).
114    #[cfg(feature = "rdf")]
115    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
116}
117
118impl QueryProcessor {
119    /// Creates a new query processor for LPG queries.
120    #[must_use]
121    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
122        let optimizer = Optimizer::from_store(&store);
123        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
124        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
125        Self {
126            lpg_store: store,
127            graph_store,
128            write_store,
129            transaction_manager: Arc::new(TransactionManager::new()),
130            catalog: Arc::new(Catalog::new()),
131            optimizer,
132            transaction_context: None,
133            #[cfg(feature = "rdf")]
134            rdf_store: None,
135        }
136    }
137
138    /// Creates a new query processor with a transaction manager.
139    #[must_use]
140    pub fn for_lpg_with_transaction(
141        store: Arc<LpgStore>,
142        transaction_manager: Arc<TransactionManager>,
143    ) -> Self {
144        let optimizer = Optimizer::from_store(&store);
145        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
146        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
147        Self {
148            lpg_store: store,
149            graph_store,
150            write_store,
151            transaction_manager,
152            catalog: Arc::new(Catalog::new()),
153            optimizer,
154            transaction_context: None,
155            #[cfg(feature = "rdf")]
156            rdf_store: None,
157        }
158    }
159
160    /// Creates a query processor backed by any `GraphStoreMut` implementation.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if the internal arena allocation fails (out of memory).
165    pub fn for_graph_store_with_transaction(
166        store: Arc<dyn GraphStoreMut>,
167        transaction_manager: Arc<TransactionManager>,
168    ) -> Result<Self> {
169        let optimizer = Optimizer::from_graph_store(&*store);
170        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
171        Ok(Self {
172            lpg_store: Arc::new(LpgStore::new()?),
173            graph_store: read_store,
174            write_store: Some(store),
175            transaction_manager,
176            catalog: Arc::new(Catalog::new()),
177            optimizer,
178            transaction_context: None,
179            #[cfg(feature = "rdf")]
180            rdf_store: None,
181        })
182    }
183
184    /// Creates a query processor from split read/write stores.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the internal arena allocation fails (out of memory).
189    pub fn for_stores_with_transaction(
190        read_store: Arc<dyn GraphStore>,
191        write_store: Option<Arc<dyn GraphStoreMut>>,
192        transaction_manager: Arc<TransactionManager>,
193    ) -> Result<Self> {
194        let optimizer = Optimizer::from_graph_store(&*read_store);
195        Ok(Self {
196            lpg_store: Arc::new(LpgStore::new()?),
197            graph_store: read_store,
198            write_store,
199            transaction_manager,
200            catalog: Arc::new(Catalog::new()),
201            optimizer,
202            transaction_context: None,
203            #[cfg(feature = "rdf")]
204            rdf_store: None,
205        })
206    }
207
208    /// Sets the transaction context for MVCC visibility.
209    ///
210    /// This should be called when the processor is used within a transaction.
211    #[must_use]
212    pub fn with_transaction_context(
213        mut self,
214        viewing_epoch: EpochId,
215        transaction_id: TransactionId,
216    ) -> Self {
217        self.transaction_context = Some((viewing_epoch, transaction_id));
218        self
219    }
220
221    /// Sets a custom catalog.
222    #[must_use]
223    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
224        self.catalog = catalog;
225        self
226    }
227
228    /// Sets a custom optimizer.
229    #[must_use]
230    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
231        self.optimizer = optimizer;
232        self
233    }
234
235    /// Processes a query string and returns results.
236    ///
237    /// Pipeline:
238    /// 1. Parse (language-specific parser → AST)
239    /// 2. Translate (AST → LogicalPlan)
240    /// 3. Bind (semantic validation)
241    /// 4. Optimize (filter pushdown, join reorder, etc.)
242    /// 5. Plan (logical → physical operators)
243    /// 6. Execute (run operators, collect results)
244    ///
245    /// # Arguments
246    ///
247    /// * `query` - The query string
248    /// * `language` - Which query language to use
249    /// * `params` - Optional query parameters for prepared statements
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if any stage of the pipeline fails.
254    pub fn process(
255        &self,
256        query: &str,
257        language: QueryLanguage,
258        params: Option<&QueryParams>,
259    ) -> Result<QueryResult> {
260        if language.is_lpg() {
261            self.process_lpg(query, language, params)
262        } else {
263            #[cfg(feature = "rdf")]
264            {
265                self.process_rdf(query, language, params)
266            }
267            #[cfg(not(feature = "rdf"))]
268            {
269                Err(Error::Internal(
270                    "RDF support not enabled. Compile with --features rdf".to_string(),
271                ))
272            }
273        }
274    }
275
276    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
277    fn process_lpg(
278        &self,
279        query: &str,
280        language: QueryLanguage,
281        params: Option<&QueryParams>,
282    ) -> Result<QueryResult> {
283        #[cfg(not(target_arch = "wasm32"))]
284        let start_time = std::time::Instant::now();
285
286        // 1. Parse and translate to logical plan
287        let mut logical_plan = self.translate_lpg(query, language)?;
288
289        // 2. Substitute parameters if provided (merge defaults from the plan first)
290        let has_defaults = !logical_plan.default_params.is_empty();
291        if params.is_some() || has_defaults {
292            let merged = if has_defaults {
293                let mut merged = logical_plan.default_params.clone();
294                if let Some(params) = params {
295                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
296                }
297                merged
298            } else {
299                params.cloned().unwrap_or_default()
300            };
301            substitute_params(&mut logical_plan, &merged)?;
302        }
303
304        // 3. Semantic validation
305        let mut binder = Binder::new();
306        let _binding_context = binder.bind(&logical_plan)?;
307
308        // 4. Optimize the plan
309        let optimized_plan = self.optimizer.optimize(logical_plan)?;
310
311        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
312        if optimized_plan.explain {
313            let mut plan = optimized_plan;
314            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
315            return Ok(explain_result(&plan));
316        }
317
318        // 5. Convert to physical plan with transaction context
319        // Read-only fast path: safe when no mutations AND no active transaction
320        // (an active transaction may have prior uncommitted writes from earlier statements)
321        let is_read_only =
322            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
323        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
324            Planner::with_context(
325                Arc::clone(&self.graph_store),
326                self.write_store.as_ref().map(Arc::clone),
327                Arc::clone(&self.transaction_manager),
328                Some(transaction_id),
329                epoch,
330            )
331        } else {
332            Planner::with_context(
333                Arc::clone(&self.graph_store),
334                self.write_store.as_ref().map(Arc::clone),
335                Arc::clone(&self.transaction_manager),
336                None,
337                self.transaction_manager.current_epoch(),
338            )
339        }
340        .with_read_only(is_read_only);
341        let mut physical_plan = planner.plan(&optimized_plan)?;
342
343        // 6. Execute and collect results
344        let executor = Executor::with_columns(physical_plan.columns.clone());
345        let mut result = executor.execute(physical_plan.operator.as_mut())?;
346
347        // Add execution metrics
348        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
349        #[cfg(not(target_arch = "wasm32"))]
350        {
351            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
352            result.execution_time_ms = Some(elapsed_ms);
353        }
354        result.rows_scanned = Some(rows_scanned);
355
356        Ok(result)
357    }
358
359    /// Translates an LPG query to a logical plan.
360    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
361        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
362        match language {
363            #[cfg(feature = "gql")]
364            QueryLanguage::Gql => {
365                use crate::query::translators::gql;
366                gql::translate(query)
367            }
368            #[cfg(feature = "cypher")]
369            QueryLanguage::Cypher => {
370                use crate::query::translators::cypher;
371                cypher::translate(query)
372            }
373            #[cfg(feature = "gremlin")]
374            QueryLanguage::Gremlin => {
375                use crate::query::translators::gremlin;
376                gremlin::translate(query)
377            }
378            #[cfg(feature = "graphql")]
379            QueryLanguage::GraphQL => {
380                use crate::query::translators::graphql;
381                graphql::translate(query)
382            }
383            #[cfg(feature = "sql-pgq")]
384            QueryLanguage::SqlPgq => {
385                use crate::query::translators::sql_pgq;
386                sql_pgq::translate(query)
387            }
388            #[allow(unreachable_patterns)]
389            _ => Err(Error::Internal(format!(
390                "Language {:?} is not an LPG language",
391                language
392            ))),
393        }
394    }
395
396    /// Returns a reference to the LPG store.
397    #[must_use]
398    pub fn lpg_store(&self) -> &Arc<LpgStore> {
399        &self.lpg_store
400    }
401
402    /// Returns a reference to the catalog.
403    #[must_use]
404    pub fn catalog(&self) -> &Arc<Catalog> {
405        &self.catalog
406    }
407
408    /// Returns a reference to the optimizer.
409    #[must_use]
410    pub fn optimizer(&self) -> &Optimizer {
411        &self.optimizer
412    }
413}
414
415impl QueryProcessor {
416    /// Returns a reference to the transaction manager.
417    #[must_use]
418    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
419        &self.transaction_manager
420    }
421}
422
423// =========================================================================
424// RDF-specific methods (gated behind `rdf` feature)
425// =========================================================================
426
427#[cfg(feature = "rdf")]
428impl QueryProcessor {
429    /// Creates a new query processor with both LPG and RDF stores.
430    #[must_use]
431    pub fn with_rdf(
432        lpg_store: Arc<LpgStore>,
433        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
434    ) -> Self {
435        let optimizer = Optimizer::from_store(&lpg_store);
436        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
437        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
438        Self {
439            lpg_store,
440            graph_store,
441            write_store,
442            transaction_manager: Arc::new(TransactionManager::new()),
443            catalog: Arc::new(Catalog::new()),
444            optimizer,
445            transaction_context: None,
446            rdf_store: Some(rdf_store),
447        }
448    }
449
450    /// Returns a reference to the RDF store (if configured).
451    #[must_use]
452    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
453        self.rdf_store.as_ref()
454    }
455
456    /// Processes an RDF query (SPARQL, GraphQL-RDF).
457    fn process_rdf(
458        &self,
459        query: &str,
460        language: QueryLanguage,
461        params: Option<&QueryParams>,
462    ) -> Result<QueryResult> {
463        use crate::query::planner::rdf::RdfPlanner;
464
465        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
466            Error::Internal("RDF store not configured for this processor".to_string())
467        })?;
468
469        // 1. Parse and translate to logical plan
470        let mut logical_plan = self.translate_rdf(query, language)?;
471
472        // 2. Substitute parameters if provided (merge defaults from the plan first)
473        let has_defaults = !logical_plan.default_params.is_empty();
474        if params.is_some() || has_defaults {
475            let merged = if has_defaults {
476                let mut merged = logical_plan.default_params.clone();
477                if let Some(params) = params {
478                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
479                }
480                merged
481            } else {
482                params.cloned().unwrap_or_default()
483            };
484            substitute_params(&mut logical_plan, &merged)?;
485        }
486
487        // 3. Semantic validation
488        let mut binder = Binder::new();
489        let _binding_context = binder.bind(&logical_plan)?;
490
491        // 3. Optimize the plan
492        let optimized_plan = self.optimizer.optimize(logical_plan)?;
493
494        // 3a. EXPLAIN: return the optimized plan tree without executing
495        if optimized_plan.explain {
496            return Ok(explain_result(&optimized_plan));
497        }
498
499        // 4. Convert to physical plan (using RDF planner)
500        let planner = RdfPlanner::new(Arc::clone(rdf_store));
501        let mut physical_plan = planner.plan(&optimized_plan)?;
502
503        // 5. Execute and collect results
504        let executor = Executor::with_columns(physical_plan.columns.clone());
505        executor.execute(physical_plan.operator.as_mut())
506    }
507
508    /// Translates an RDF query to a logical plan.
509    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
510        match language {
511            #[cfg(feature = "sparql")]
512            QueryLanguage::Sparql => {
513                use crate::query::translators::sparql;
514                sparql::translate(query)
515            }
516            #[cfg(all(feature = "graphql", feature = "rdf"))]
517            QueryLanguage::GraphQLRdf => {
518                use crate::query::translators::graphql_rdf;
519                // Default namespace for GraphQL-RDF queries
520                graphql_rdf::translate(query, "http://example.org/")
521            }
522            _ => Err(Error::Internal(format!(
523                "Language {:?} is not an RDF language",
524                language
525            ))),
526        }
527    }
528}
529
530/// Annotates filter operators in the plan with pushdown hints.
531///
532/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
533/// whether a property index exists for equality predicates.
534pub(crate) fn annotate_pushdown_hints(
535    op: &mut LogicalOperator,
536    store: &dyn grafeo_core::graph::GraphStore,
537) {
538    #[allow(clippy::wildcard_imports)]
539    use crate::query::plan::*;
540
541    match op {
542        LogicalOperator::Filter(filter) => {
543            // Recurse into children first
544            annotate_pushdown_hints(&mut filter.input, store);
545
546            // Annotate this filter if it sits on top of a NodeScan
547            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
548                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
549            }
550        }
551        LogicalOperator::NodeScan(op) => {
552            if let Some(input) = &mut op.input {
553                annotate_pushdown_hints(input, store);
554            }
555        }
556        LogicalOperator::EdgeScan(op) => {
557            if let Some(input) = &mut op.input {
558                annotate_pushdown_hints(input, store);
559            }
560        }
561        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
562        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
563        LogicalOperator::Join(op) => {
564            annotate_pushdown_hints(&mut op.left, store);
565            annotate_pushdown_hints(&mut op.right, store);
566        }
567        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
568        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
569        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
570        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
571        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
572        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
573        LogicalOperator::Union(op) => {
574            for input in &mut op.inputs {
575                annotate_pushdown_hints(input, store);
576            }
577        }
578        LogicalOperator::Apply(op) => {
579            annotate_pushdown_hints(&mut op.input, store);
580            annotate_pushdown_hints(&mut op.subplan, store);
581        }
582        LogicalOperator::Otherwise(op) => {
583            annotate_pushdown_hints(&mut op.left, store);
584            annotate_pushdown_hints(&mut op.right, store);
585        }
586        _ => {}
587    }
588}
589
590/// Infers the pushdown strategy for a filter predicate over a node scan.
591fn infer_pushdown(
592    predicate: &LogicalExpression,
593    scan: &crate::query::plan::NodeScanOp,
594    store: &dyn grafeo_core::graph::GraphStore,
595) -> Option<crate::query::plan::PushdownHint> {
596    #[allow(clippy::wildcard_imports)]
597    use crate::query::plan::*;
598
599    match predicate {
600        // Equality: n.prop = value
601        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
602            if let Some(prop) = extract_property_name(left, &scan.variable)
603                .or_else(|| extract_property_name(right, &scan.variable))
604            {
605                if store.has_property_index(&prop) {
606                    return Some(PushdownHint::IndexLookup { property: prop });
607                }
608                if scan.label.is_some() {
609                    return Some(PushdownHint::LabelFirst);
610                }
611            }
612            None
613        }
614        // Range: n.prop > value, n.prop < value, etc.
615        LogicalExpression::Binary {
616            left,
617            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
618            right,
619        } => {
620            if let Some(prop) = extract_property_name(left, &scan.variable)
621                .or_else(|| extract_property_name(right, &scan.variable))
622            {
623                if store.has_property_index(&prop) {
624                    return Some(PushdownHint::RangeScan { property: prop });
625                }
626                if scan.label.is_some() {
627                    return Some(PushdownHint::LabelFirst);
628                }
629            }
630            None
631        }
632        // AND: check the left side (first conjunct) for pushdown
633        LogicalExpression::Binary {
634            left,
635            op: BinaryOp::And,
636            ..
637        } => infer_pushdown(left, scan, store),
638        _ => {
639            // Any other predicate on a labeled scan gets label-first
640            if scan.label.is_some() {
641                Some(PushdownHint::LabelFirst)
642            } else {
643                None
644            }
645        }
646    }
647}
648
649/// Extracts the property name if the expression is `Property { variable, property }`
650/// and the variable matches the scan variable.
651fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
652    if let LogicalExpression::Property { variable, property } = expr
653        && variable == scan_var
654    {
655        Some(property.clone())
656    } else {
657        None
658    }
659}
660
661/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
662pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
663    let tree_text = plan.root.explain_tree();
664    QueryResult {
665        columns: vec!["plan".to_string()],
666        column_types: vec![grafeo_common::types::LogicalType::String],
667        rows: vec![vec![Value::String(tree_text.into())]],
668        execution_time_ms: None,
669        rows_scanned: None,
670        status_message: None,
671        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
672    }
673}
674
675/// Substitutes parameters in a logical plan with their values.
676pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
677    substitute_in_operator(&mut plan.root, params)
678}
679
680/// Recursively substitutes parameters in an operator.
681fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
682    #[allow(clippy::wildcard_imports)]
683    use crate::query::plan::*;
684
685    match op {
686        LogicalOperator::Filter(filter) => {
687            substitute_in_expression(&mut filter.predicate, params)?;
688            substitute_in_operator(&mut filter.input, params)?;
689        }
690        LogicalOperator::Return(ret) => {
691            for item in &mut ret.items {
692                substitute_in_expression(&mut item.expression, params)?;
693            }
694            substitute_in_operator(&mut ret.input, params)?;
695        }
696        LogicalOperator::Project(proj) => {
697            for p in &mut proj.projections {
698                substitute_in_expression(&mut p.expression, params)?;
699            }
700            substitute_in_operator(&mut proj.input, params)?;
701        }
702        LogicalOperator::NodeScan(scan) => {
703            if let Some(input) = &mut scan.input {
704                substitute_in_operator(input, params)?;
705            }
706        }
707        LogicalOperator::EdgeScan(scan) => {
708            if let Some(input) = &mut scan.input {
709                substitute_in_operator(input, params)?;
710            }
711        }
712        LogicalOperator::Expand(expand) => {
713            substitute_in_operator(&mut expand.input, params)?;
714        }
715        LogicalOperator::Join(join) => {
716            substitute_in_operator(&mut join.left, params)?;
717            substitute_in_operator(&mut join.right, params)?;
718            for cond in &mut join.conditions {
719                substitute_in_expression(&mut cond.left, params)?;
720                substitute_in_expression(&mut cond.right, params)?;
721            }
722        }
723        LogicalOperator::LeftJoin(join) => {
724            substitute_in_operator(&mut join.left, params)?;
725            substitute_in_operator(&mut join.right, params)?;
726            if let Some(cond) = &mut join.condition {
727                substitute_in_expression(cond, params)?;
728            }
729        }
730        LogicalOperator::Aggregate(agg) => {
731            for expr in &mut agg.group_by {
732                substitute_in_expression(expr, params)?;
733            }
734            for agg_expr in &mut agg.aggregates {
735                if let Some(expr) = &mut agg_expr.expression {
736                    substitute_in_expression(expr, params)?;
737                }
738            }
739            substitute_in_operator(&mut agg.input, params)?;
740        }
741        LogicalOperator::Sort(sort) => {
742            for key in &mut sort.keys {
743                substitute_in_expression(&mut key.expression, params)?;
744            }
745            substitute_in_operator(&mut sort.input, params)?;
746        }
747        LogicalOperator::Limit(limit) => {
748            resolve_count_param(&mut limit.count, params)?;
749            substitute_in_operator(&mut limit.input, params)?;
750        }
751        LogicalOperator::Skip(skip) => {
752            resolve_count_param(&mut skip.count, params)?;
753            substitute_in_operator(&mut skip.input, params)?;
754        }
755        LogicalOperator::Distinct(distinct) => {
756            substitute_in_operator(&mut distinct.input, params)?;
757        }
758        LogicalOperator::CreateNode(create) => {
759            for (_, expr) in &mut create.properties {
760                substitute_in_expression(expr, params)?;
761            }
762            if let Some(input) = &mut create.input {
763                substitute_in_operator(input, params)?;
764            }
765        }
766        LogicalOperator::CreateEdge(create) => {
767            for (_, expr) in &mut create.properties {
768                substitute_in_expression(expr, params)?;
769            }
770            substitute_in_operator(&mut create.input, params)?;
771        }
772        LogicalOperator::DeleteNode(delete) => {
773            substitute_in_operator(&mut delete.input, params)?;
774        }
775        LogicalOperator::DeleteEdge(delete) => {
776            substitute_in_operator(&mut delete.input, params)?;
777        }
778        LogicalOperator::SetProperty(set) => {
779            for (_, expr) in &mut set.properties {
780                substitute_in_expression(expr, params)?;
781            }
782            substitute_in_operator(&mut set.input, params)?;
783        }
784        LogicalOperator::Union(union) => {
785            for input in &mut union.inputs {
786                substitute_in_operator(input, params)?;
787            }
788        }
789        LogicalOperator::AntiJoin(anti) => {
790            substitute_in_operator(&mut anti.left, params)?;
791            substitute_in_operator(&mut anti.right, params)?;
792        }
793        LogicalOperator::Bind(bind) => {
794            substitute_in_expression(&mut bind.expression, params)?;
795            substitute_in_operator(&mut bind.input, params)?;
796        }
797        LogicalOperator::TripleScan(scan) => {
798            if let Some(input) = &mut scan.input {
799                substitute_in_operator(input, params)?;
800            }
801        }
802        LogicalOperator::Unwind(unwind) => {
803            substitute_in_expression(&mut unwind.expression, params)?;
804            substitute_in_operator(&mut unwind.input, params)?;
805        }
806        LogicalOperator::MapCollect(mc) => {
807            substitute_in_operator(&mut mc.input, params)?;
808        }
809        LogicalOperator::Merge(merge) => {
810            for (_, expr) in &mut merge.match_properties {
811                substitute_in_expression(expr, params)?;
812            }
813            for (_, expr) in &mut merge.on_create {
814                substitute_in_expression(expr, params)?;
815            }
816            for (_, expr) in &mut merge.on_match {
817                substitute_in_expression(expr, params)?;
818            }
819            substitute_in_operator(&mut merge.input, params)?;
820        }
821        LogicalOperator::MergeRelationship(merge_rel) => {
822            for (_, expr) in &mut merge_rel.match_properties {
823                substitute_in_expression(expr, params)?;
824            }
825            for (_, expr) in &mut merge_rel.on_create {
826                substitute_in_expression(expr, params)?;
827            }
828            for (_, expr) in &mut merge_rel.on_match {
829                substitute_in_expression(expr, params)?;
830            }
831            substitute_in_operator(&mut merge_rel.input, params)?;
832        }
833        LogicalOperator::AddLabel(add_label) => {
834            substitute_in_operator(&mut add_label.input, params)?;
835        }
836        LogicalOperator::RemoveLabel(remove_label) => {
837            substitute_in_operator(&mut remove_label.input, params)?;
838        }
839        LogicalOperator::ShortestPath(sp) => {
840            substitute_in_operator(&mut sp.input, params)?;
841        }
842        // SPARQL Update operators
843        LogicalOperator::InsertTriple(insert) => {
844            if let Some(ref mut input) = insert.input {
845                substitute_in_operator(input, params)?;
846            }
847        }
848        LogicalOperator::DeleteTriple(delete) => {
849            if let Some(ref mut input) = delete.input {
850                substitute_in_operator(input, params)?;
851            }
852        }
853        LogicalOperator::Modify(modify) => {
854            substitute_in_operator(&mut modify.where_clause, params)?;
855        }
856        LogicalOperator::ClearGraph(_)
857        | LogicalOperator::CreateGraph(_)
858        | LogicalOperator::DropGraph(_)
859        | LogicalOperator::LoadGraph(_)
860        | LogicalOperator::CopyGraph(_)
861        | LogicalOperator::MoveGraph(_)
862        | LogicalOperator::AddGraph(_) => {}
863        LogicalOperator::HorizontalAggregate(op) => {
864            substitute_in_operator(&mut op.input, params)?;
865        }
866        LogicalOperator::Empty => {}
867        LogicalOperator::VectorScan(scan) => {
868            substitute_in_expression(&mut scan.query_vector, params)?;
869            if let Some(ref mut input) = scan.input {
870                substitute_in_operator(input, params)?;
871            }
872        }
873        LogicalOperator::VectorJoin(join) => {
874            substitute_in_expression(&mut join.query_vector, params)?;
875            substitute_in_operator(&mut join.input, params)?;
876        }
877        LogicalOperator::Except(except) => {
878            substitute_in_operator(&mut except.left, params)?;
879            substitute_in_operator(&mut except.right, params)?;
880        }
881        LogicalOperator::Intersect(intersect) => {
882            substitute_in_operator(&mut intersect.left, params)?;
883            substitute_in_operator(&mut intersect.right, params)?;
884        }
885        LogicalOperator::Otherwise(otherwise) => {
886            substitute_in_operator(&mut otherwise.left, params)?;
887            substitute_in_operator(&mut otherwise.right, params)?;
888        }
889        LogicalOperator::Apply(apply) => {
890            substitute_in_operator(&mut apply.input, params)?;
891            substitute_in_operator(&mut apply.subplan, params)?;
892        }
893        // ParameterScan has no expressions to substitute
894        LogicalOperator::ParameterScan(_) => {}
895        LogicalOperator::MultiWayJoin(mwj) => {
896            for input in &mut mwj.inputs {
897                substitute_in_operator(input, params)?;
898            }
899            for cond in &mut mwj.conditions {
900                substitute_in_expression(&mut cond.left, params)?;
901                substitute_in_expression(&mut cond.right, params)?;
902            }
903        }
904        // DDL operators have no expressions to substitute
905        LogicalOperator::CreatePropertyGraph(_) => {}
906        // Procedure calls: arguments could contain parameters but we handle at execution time
907        LogicalOperator::CallProcedure(_) => {}
908        // LoadData: file path is a literal, no parameter substitution needed
909        LogicalOperator::LoadData(_) => {}
910    }
911    Ok(())
912}
913
914/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
915fn resolve_count_param(
916    count: &mut crate::query::plan::CountExpr,
917    params: &QueryParams,
918) -> Result<()> {
919    use crate::query::plan::CountExpr;
920    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
921
922    if let CountExpr::Parameter(name) = count {
923        let value = params.get(name.as_str()).ok_or_else(|| {
924            Error::Query(QueryError::new(
925                QueryErrorKind::Semantic,
926                format!("Missing parameter for SKIP/LIMIT: ${name}"),
927            ))
928        })?;
929        let n = match value {
930            Value::Int64(i) if *i >= 0 => *i as usize,
931            Value::Int64(i) => {
932                return Err(Error::Query(QueryError::new(
933                    QueryErrorKind::Semantic,
934                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
935                )));
936            }
937            other => {
938                return Err(Error::Query(QueryError::new(
939                    QueryErrorKind::Semantic,
940                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
941                )));
942            }
943        };
944        *count = CountExpr::Literal(n);
945    }
946    Ok(())
947}
948
949/// Substitutes parameters in an expression with their values.
950fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
951    use crate::query::plan::LogicalExpression;
952
953    match expr {
954        LogicalExpression::Parameter(name) => {
955            if let Some(value) = params.get(name) {
956                *expr = LogicalExpression::Literal(value.clone());
957            } else {
958                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
959            }
960        }
961        LogicalExpression::Binary { left, right, .. } => {
962            substitute_in_expression(left, params)?;
963            substitute_in_expression(right, params)?;
964        }
965        LogicalExpression::Unary { operand, .. } => {
966            substitute_in_expression(operand, params)?;
967        }
968        LogicalExpression::FunctionCall { args, .. } => {
969            for arg in args {
970                substitute_in_expression(arg, params)?;
971            }
972        }
973        LogicalExpression::List(items) => {
974            for item in items {
975                substitute_in_expression(item, params)?;
976            }
977        }
978        LogicalExpression::Map(pairs) => {
979            for (_, value) in pairs {
980                substitute_in_expression(value, params)?;
981            }
982        }
983        LogicalExpression::IndexAccess { base, index } => {
984            substitute_in_expression(base, params)?;
985            substitute_in_expression(index, params)?;
986        }
987        LogicalExpression::SliceAccess { base, start, end } => {
988            substitute_in_expression(base, params)?;
989            if let Some(s) = start {
990                substitute_in_expression(s, params)?;
991            }
992            if let Some(e) = end {
993                substitute_in_expression(e, params)?;
994            }
995        }
996        LogicalExpression::Case {
997            operand,
998            when_clauses,
999            else_clause,
1000        } => {
1001            if let Some(op) = operand {
1002                substitute_in_expression(op, params)?;
1003            }
1004            for (cond, result) in when_clauses {
1005                substitute_in_expression(cond, params)?;
1006                substitute_in_expression(result, params)?;
1007            }
1008            if let Some(el) = else_clause {
1009                substitute_in_expression(el, params)?;
1010            }
1011        }
1012        LogicalExpression::Property { .. }
1013        | LogicalExpression::Variable(_)
1014        | LogicalExpression::Literal(_)
1015        | LogicalExpression::Labels(_)
1016        | LogicalExpression::Type(_)
1017        | LogicalExpression::Id(_) => {}
1018        LogicalExpression::ListComprehension {
1019            list_expr,
1020            filter_expr,
1021            map_expr,
1022            ..
1023        } => {
1024            substitute_in_expression(list_expr, params)?;
1025            if let Some(filter) = filter_expr {
1026                substitute_in_expression(filter, params)?;
1027            }
1028            substitute_in_expression(map_expr, params)?;
1029        }
1030        LogicalExpression::ListPredicate {
1031            list_expr,
1032            predicate,
1033            ..
1034        } => {
1035            substitute_in_expression(list_expr, params)?;
1036            substitute_in_expression(predicate, params)?;
1037        }
1038        LogicalExpression::ExistsSubquery(_)
1039        | LogicalExpression::CountSubquery(_)
1040        | LogicalExpression::ValueSubquery(_) => {
1041            // Subqueries would need recursive parameter substitution
1042        }
1043        LogicalExpression::PatternComprehension { projection, .. } => {
1044            substitute_in_expression(projection, params)?;
1045        }
1046        LogicalExpression::MapProjection { entries, .. } => {
1047            for entry in entries {
1048                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1049                    substitute_in_expression(expr, params)?;
1050                }
1051            }
1052        }
1053        LogicalExpression::Reduce {
1054            initial,
1055            list,
1056            expression,
1057            ..
1058        } => {
1059            substitute_in_expression(initial, params)?;
1060            substitute_in_expression(list, params)?;
1061            substitute_in_expression(expression, params)?;
1062        }
1063    }
1064    Ok(())
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070
1071    #[test]
1072    fn test_query_language_is_lpg() {
1073        #[cfg(feature = "gql")]
1074        assert!(QueryLanguage::Gql.is_lpg());
1075        #[cfg(feature = "cypher")]
1076        assert!(QueryLanguage::Cypher.is_lpg());
1077        #[cfg(feature = "sparql")]
1078        assert!(!QueryLanguage::Sparql.is_lpg());
1079    }
1080
1081    #[test]
1082    fn test_processor_creation() {
1083        let store = Arc::new(LpgStore::new().unwrap());
1084        let processor = QueryProcessor::for_lpg(store);
1085        assert!(processor.lpg_store().node_count() == 0);
1086    }
1087
1088    #[cfg(feature = "gql")]
1089    #[test]
1090    fn test_process_simple_gql() {
1091        let store = Arc::new(LpgStore::new().unwrap());
1092        store.create_node(&["Person"]);
1093        store.create_node(&["Person"]);
1094
1095        let processor = QueryProcessor::for_lpg(store);
1096        let result = processor
1097            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1098            .unwrap();
1099
1100        assert_eq!(result.row_count(), 2);
1101        assert_eq!(result.columns[0], "n");
1102    }
1103
1104    #[cfg(feature = "cypher")]
1105    #[test]
1106    fn test_process_simple_cypher() {
1107        let store = Arc::new(LpgStore::new().unwrap());
1108        store.create_node(&["Person"]);
1109
1110        let processor = QueryProcessor::for_lpg(store);
1111        let result = processor
1112            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1113            .unwrap();
1114
1115        assert_eq!(result.row_count(), 1);
1116    }
1117
1118    #[cfg(feature = "gql")]
1119    #[test]
1120    fn test_process_with_params() {
1121        let store = Arc::new(LpgStore::new().unwrap());
1122        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1123        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1124        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1125
1126        let processor = QueryProcessor::for_lpg(store);
1127
1128        // Query with parameter
1129        let mut params = HashMap::new();
1130        params.insert("min_age".to_string(), Value::Int64(30));
1131
1132        let result = processor
1133            .process(
1134                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1135                QueryLanguage::Gql,
1136                Some(&params),
1137            )
1138            .unwrap();
1139
1140        // Should return 2 people (ages 35 and 45)
1141        assert_eq!(result.row_count(), 2);
1142    }
1143
1144    #[cfg(feature = "gql")]
1145    #[test]
1146    fn test_missing_param_error() {
1147        let store = Arc::new(LpgStore::new().unwrap());
1148        store.create_node(&["Person"]);
1149
1150        let processor = QueryProcessor::for_lpg(store);
1151
1152        // Query with parameter but empty params map (missing the required param)
1153        let params: HashMap<String, Value> = HashMap::new();
1154        let result = processor.process(
1155            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1156            QueryLanguage::Gql,
1157            Some(&params),
1158        );
1159
1160        // Should fail with missing parameter error
1161        assert!(result.is_err());
1162        let err = result.unwrap_err();
1163        assert!(
1164            err.to_string().contains("Missing parameter"),
1165            "Expected 'Missing parameter' error, got: {}",
1166            err
1167        );
1168    }
1169
1170    #[cfg(feature = "gql")]
1171    #[test]
1172    fn test_params_in_filter_with_property() {
1173        // Tests parameter substitution in WHERE clause with property comparison
1174        let store = Arc::new(LpgStore::new().unwrap());
1175        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1176        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1177
1178        let processor = QueryProcessor::for_lpg(store);
1179
1180        let mut params = HashMap::new();
1181        params.insert("threshold".to_string(), Value::Int64(15));
1182
1183        let result = processor
1184            .process(
1185                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1186                QueryLanguage::Gql,
1187                Some(&params),
1188            )
1189            .unwrap();
1190
1191        // Only value=20 matches > 15
1192        assert_eq!(result.row_count(), 1);
1193        let row = &result.rows[0];
1194        assert_eq!(row[0], Value::Int64(20));
1195    }
1196
1197    #[cfg(feature = "gql")]
1198    #[test]
1199    fn test_params_in_multiple_where_conditions() {
1200        // Tests multiple parameters in WHERE clause with AND
1201        let store = Arc::new(LpgStore::new().unwrap());
1202        store.create_node_with_props(
1203            &["Person"],
1204            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1205        );
1206        store.create_node_with_props(
1207            &["Person"],
1208            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1209        );
1210        store.create_node_with_props(
1211            &["Person"],
1212            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1213        );
1214
1215        let processor = QueryProcessor::for_lpg(store);
1216
1217        let mut params = HashMap::new();
1218        params.insert("min_age".to_string(), Value::Int64(30));
1219        params.insert("min_score".to_string(), Value::Int64(75));
1220
1221        let result = processor
1222            .process(
1223                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1224                QueryLanguage::Gql,
1225                Some(&params),
1226            )
1227            .unwrap();
1228
1229        // Only the person with age=35, score=90 matches both conditions
1230        assert_eq!(result.row_count(), 1);
1231    }
1232
1233    #[cfg(feature = "gql")]
1234    #[test]
1235    fn test_params_with_in_list() {
1236        // Tests parameter as a value checked against IN list
1237        let store = Arc::new(LpgStore::new().unwrap());
1238        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1239        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1240        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1241
1242        let processor = QueryProcessor::for_lpg(store);
1243
1244        // Check if a parameter value matches any of the statuses
1245        let mut params = HashMap::new();
1246        params.insert("target".to_string(), Value::String("active".into()));
1247
1248        let result = processor
1249            .process(
1250                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1251                QueryLanguage::Gql,
1252                Some(&params),
1253            )
1254            .unwrap();
1255
1256        assert_eq!(result.row_count(), 1);
1257    }
1258
1259    #[cfg(feature = "gql")]
1260    #[test]
1261    fn test_params_same_type_comparison() {
1262        // Tests that same-type parameter comparisons work correctly
1263        let store = Arc::new(LpgStore::new().unwrap());
1264        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1265        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1266
1267        let processor = QueryProcessor::for_lpg(store);
1268
1269        // Compare int property with int parameter
1270        let mut params = HashMap::new();
1271        params.insert("threshold".to_string(), Value::Int64(75));
1272
1273        let result = processor
1274            .process(
1275                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1276                QueryLanguage::Gql,
1277                Some(&params),
1278            )
1279            .unwrap();
1280
1281        // Only value=100 matches > 75
1282        assert_eq!(result.row_count(), 1);
1283    }
1284
1285    #[cfg(feature = "gql")]
1286    #[test]
1287    fn test_process_empty_result_has_columns() {
1288        // Tests that empty results still have correct column names
1289        let store = Arc::new(LpgStore::new().unwrap());
1290        // Don't create any nodes
1291
1292        let processor = QueryProcessor::for_lpg(store);
1293        let result = processor
1294            .process(
1295                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1296                QueryLanguage::Gql,
1297                None,
1298            )
1299            .unwrap();
1300
1301        assert_eq!(result.row_count(), 0);
1302        assert_eq!(result.columns.len(), 2);
1303        assert_eq!(result.columns[0], "name");
1304        assert_eq!(result.columns[1], "age");
1305    }
1306
1307    #[cfg(feature = "gql")]
1308    #[test]
1309    fn test_params_string_equality() {
1310        // Tests string parameter equality comparison
1311        let store = Arc::new(LpgStore::new().unwrap());
1312        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1313        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1314        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1315
1316        let processor = QueryProcessor::for_lpg(store);
1317
1318        let mut params = HashMap::new();
1319        params.insert("target".to_string(), Value::String("beta".into()));
1320
1321        let result = processor
1322            .process(
1323                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1324                QueryLanguage::Gql,
1325                Some(&params),
1326            )
1327            .unwrap();
1328
1329        assert_eq!(result.row_count(), 1);
1330        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1331    }
1332}