Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::lpg::LpgStore;
16use grafeo_core::graph::{GraphStore, GraphStoreMut};
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27/// Supported query languages.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29#[non_exhaustive]
30pub enum QueryLanguage {
31    /// GQL (ISO/IEC 39075:2024) - default for LPG
32    #[cfg(feature = "gql")]
33    Gql,
34    /// openCypher 9.0
35    #[cfg(feature = "cypher")]
36    Cypher,
37    /// Apache TinkerPop Gremlin
38    #[cfg(feature = "gremlin")]
39    Gremlin,
40    /// GraphQL for LPG
41    #[cfg(feature = "graphql")]
42    GraphQL,
43    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
44    #[cfg(feature = "sql-pgq")]
45    SqlPgq,
46    /// SPARQL 1.1 for RDF
47    #[cfg(feature = "sparql")]
48    Sparql,
49    /// GraphQL for RDF
50    #[cfg(all(feature = "graphql", feature = "rdf"))]
51    GraphQLRdf,
52}
53
54impl QueryLanguage {
55    /// Returns whether this language targets LPG (vs RDF).
56    #[must_use]
57    pub const fn is_lpg(&self) -> bool {
58        match self {
59            #[cfg(feature = "gql")]
60            Self::Gql => true,
61            #[cfg(feature = "cypher")]
62            Self::Cypher => true,
63            #[cfg(feature = "gremlin")]
64            Self::Gremlin => true,
65            #[cfg(feature = "graphql")]
66            Self::GraphQL => true,
67            #[cfg(feature = "sql-pgq")]
68            Self::SqlPgq => true,
69            #[cfg(feature = "sparql")]
70            Self::Sparql => false,
71            #[cfg(all(feature = "graphql", feature = "rdf"))]
72            Self::GraphQLRdf => false,
73        }
74    }
75}
76
77/// Query parameters for prepared statements.
78pub type QueryParams = HashMap<String, Value>;
79
80/// Processes queries through the full pipeline.
81///
82/// The processor holds references to the stores and provides a unified
83/// interface for executing queries in any supported language.
84///
85/// # Example
86///
87/// ```no_run
88/// # use std::sync::Arc;
89/// # use grafeo_core::graph::lpg::LpgStore;
90/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
91///
92/// # fn main() -> grafeo_common::utils::error::Result<()> {
93/// let store = Arc::new(LpgStore::new().unwrap());
94/// let processor = QueryProcessor::for_lpg(store);
95/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
96/// # Ok(())
97/// # }
98/// ```
99pub struct QueryProcessor {
100    /// LPG store for property graph queries.
101    lpg_store: Arc<LpgStore>,
102    /// Graph store trait object for pluggable storage backends (read path).
103    graph_store: Arc<dyn GraphStore>,
104    /// Writable graph store (None when read-only).
105    write_store: Option<Arc<dyn GraphStoreMut>>,
106    /// Transaction manager for MVCC operations.
107    transaction_manager: Arc<TransactionManager>,
108    /// Catalog for schema and index metadata.
109    catalog: Arc<Catalog>,
110    /// Query optimizer.
111    optimizer: Optimizer,
112    /// Current transaction context (if any).
113    transaction_context: Option<(EpochId, TransactionId)>,
114    /// RDF store for triple pattern queries (optional).
115    #[cfg(feature = "rdf")]
116    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
117}
118
119impl QueryProcessor {
120    /// Creates a new query processor for LPG queries.
121    #[must_use]
122    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
123        let optimizer = Optimizer::from_store(&store);
124        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
125        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
126        Self {
127            lpg_store: store,
128            graph_store,
129            write_store,
130            transaction_manager: Arc::new(TransactionManager::new()),
131            catalog: Arc::new(Catalog::new()),
132            optimizer,
133            transaction_context: None,
134            #[cfg(feature = "rdf")]
135            rdf_store: None,
136        }
137    }
138
139    /// Creates a new query processor with a transaction manager.
140    #[must_use]
141    pub fn for_lpg_with_transaction(
142        store: Arc<LpgStore>,
143        transaction_manager: Arc<TransactionManager>,
144    ) -> Self {
145        let optimizer = Optimizer::from_store(&store);
146        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
147        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
148        Self {
149            lpg_store: store,
150            graph_store,
151            write_store,
152            transaction_manager,
153            catalog: Arc::new(Catalog::new()),
154            optimizer,
155            transaction_context: None,
156            #[cfg(feature = "rdf")]
157            rdf_store: None,
158        }
159    }
160
161    /// Creates a query processor backed by any `GraphStoreMut` implementation.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the internal arena allocation fails (out of memory).
166    pub fn for_graph_store_with_transaction(
167        store: Arc<dyn GraphStoreMut>,
168        transaction_manager: Arc<TransactionManager>,
169    ) -> Result<Self> {
170        let optimizer = Optimizer::from_graph_store(&*store);
171        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
172        Ok(Self {
173            lpg_store: Arc::new(LpgStore::new()?),
174            graph_store: read_store,
175            write_store: Some(store),
176            transaction_manager,
177            catalog: Arc::new(Catalog::new()),
178            optimizer,
179            transaction_context: None,
180            #[cfg(feature = "rdf")]
181            rdf_store: None,
182        })
183    }
184
185    /// Creates a query processor from split read/write stores.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the internal arena allocation fails (out of memory).
190    pub fn for_stores_with_transaction(
191        read_store: Arc<dyn GraphStore>,
192        write_store: Option<Arc<dyn GraphStoreMut>>,
193        transaction_manager: Arc<TransactionManager>,
194    ) -> Result<Self> {
195        let optimizer = Optimizer::from_graph_store(&*read_store);
196        Ok(Self {
197            lpg_store: Arc::new(LpgStore::new()?),
198            graph_store: read_store,
199            write_store,
200            transaction_manager,
201            catalog: Arc::new(Catalog::new()),
202            optimizer,
203            transaction_context: None,
204            #[cfg(feature = "rdf")]
205            rdf_store: None,
206        })
207    }
208
209    /// Sets the transaction context for MVCC visibility.
210    ///
211    /// This should be called when the processor is used within a transaction.
212    #[must_use]
213    pub fn with_transaction_context(
214        mut self,
215        viewing_epoch: EpochId,
216        transaction_id: TransactionId,
217    ) -> Self {
218        self.transaction_context = Some((viewing_epoch, transaction_id));
219        self
220    }
221
222    /// Sets a custom catalog.
223    #[must_use]
224    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
225        self.catalog = catalog;
226        self
227    }
228
229    /// Sets a custom optimizer.
230    #[must_use]
231    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
232        self.optimizer = optimizer;
233        self
234    }
235
236    /// Processes a query string and returns results.
237    ///
238    /// Pipeline:
239    /// 1. Parse (language-specific parser → AST)
240    /// 2. Translate (AST → LogicalPlan)
241    /// 3. Bind (semantic validation)
242    /// 4. Optimize (filter pushdown, join reorder, etc.)
243    /// 5. Plan (logical → physical operators)
244    /// 6. Execute (run operators, collect results)
245    ///
246    /// # Arguments
247    ///
248    /// * `query` - The query string
249    /// * `language` - Which query language to use
250    /// * `params` - Optional query parameters for prepared statements
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if any stage of the pipeline fails.
255    pub fn process(
256        &self,
257        query: &str,
258        language: QueryLanguage,
259        params: Option<&QueryParams>,
260    ) -> Result<QueryResult> {
261        if language.is_lpg() {
262            self.process_lpg(query, language, params)
263        } else {
264            #[cfg(feature = "rdf")]
265            {
266                self.process_rdf(query, language, params)
267            }
268            #[cfg(not(feature = "rdf"))]
269            {
270                Err(Error::Internal(
271                    "RDF support not enabled. Compile with --features rdf".to_string(),
272                ))
273            }
274        }
275    }
276
277    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
278    fn process_lpg(
279        &self,
280        query: &str,
281        language: QueryLanguage,
282        params: Option<&QueryParams>,
283    ) -> Result<QueryResult> {
284        #[cfg(not(target_arch = "wasm32"))]
285        let start_time = std::time::Instant::now();
286
287        // 1. Parse and translate to logical plan
288        let mut logical_plan = self.translate_lpg(query, language)?;
289
290        // 2. Substitute parameters if provided (merge defaults from the plan first)
291        let has_defaults = !logical_plan.default_params.is_empty();
292        if params.is_some() || has_defaults {
293            let merged = if has_defaults {
294                let mut merged = logical_plan.default_params.clone();
295                if let Some(params) = params {
296                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
297                }
298                merged
299            } else {
300                params.cloned().unwrap_or_default()
301            };
302            substitute_params(&mut logical_plan, &merged)?;
303        }
304
305        // 3. Semantic validation
306        let mut binder = Binder::new();
307        let _binding_context = binder.bind(&logical_plan)?;
308
309        // 4. Optimize the plan
310        let optimized_plan = self.optimizer.optimize(logical_plan)?;
311
312        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
313        if optimized_plan.explain {
314            let mut plan = optimized_plan;
315            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
316            return Ok(explain_result(&plan));
317        }
318
319        // 5. Convert to physical plan with transaction context
320        // Read-only fast path: safe when no mutations AND no active transaction
321        // (an active transaction may have prior uncommitted writes from earlier statements)
322        let is_read_only =
323            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
324        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
325            Planner::with_context(
326                Arc::clone(&self.graph_store),
327                self.write_store.as_ref().map(Arc::clone),
328                Arc::clone(&self.transaction_manager),
329                Some(transaction_id),
330                epoch,
331            )
332        } else {
333            Planner::with_context(
334                Arc::clone(&self.graph_store),
335                self.write_store.as_ref().map(Arc::clone),
336                Arc::clone(&self.transaction_manager),
337                None,
338                self.transaction_manager.current_epoch(),
339            )
340        }
341        .with_read_only(is_read_only);
342        let mut physical_plan = planner.plan(&optimized_plan)?;
343
344        // 6. Execute and collect results
345        let executor = Executor::with_columns(physical_plan.columns.clone());
346        let mut result = executor.execute(physical_plan.operator.as_mut())?;
347
348        // Add execution metrics
349        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
350        #[cfg(not(target_arch = "wasm32"))]
351        {
352            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
353            result.execution_time_ms = Some(elapsed_ms);
354        }
355        result.rows_scanned = Some(rows_scanned);
356
357        Ok(result)
358    }
359
360    /// Translates an LPG query to a logical plan.
361    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
362        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
363        match language {
364            #[cfg(feature = "gql")]
365            QueryLanguage::Gql => {
366                use crate::query::translators::gql;
367                gql::translate(query)
368            }
369            #[cfg(feature = "cypher")]
370            QueryLanguage::Cypher => {
371                use crate::query::translators::cypher;
372                cypher::translate(query)
373            }
374            #[cfg(feature = "gremlin")]
375            QueryLanguage::Gremlin => {
376                use crate::query::translators::gremlin;
377                gremlin::translate(query)
378            }
379            #[cfg(feature = "graphql")]
380            QueryLanguage::GraphQL => {
381                use crate::query::translators::graphql;
382                graphql::translate(query)
383            }
384            #[cfg(feature = "sql-pgq")]
385            QueryLanguage::SqlPgq => {
386                use crate::query::translators::sql_pgq;
387                sql_pgq::translate(query)
388            }
389            #[allow(unreachable_patterns)]
390            _ => Err(Error::Internal(format!(
391                "Language {:?} is not an LPG language",
392                language
393            ))),
394        }
395    }
396
397    /// Returns a reference to the LPG store.
398    #[must_use]
399    pub fn lpg_store(&self) -> &Arc<LpgStore> {
400        &self.lpg_store
401    }
402
403    /// Returns a reference to the catalog.
404    #[must_use]
405    pub fn catalog(&self) -> &Arc<Catalog> {
406        &self.catalog
407    }
408
409    /// Returns a reference to the optimizer.
410    #[must_use]
411    pub fn optimizer(&self) -> &Optimizer {
412        &self.optimizer
413    }
414}
415
416impl QueryProcessor {
417    /// Returns a reference to the transaction manager.
418    #[must_use]
419    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
420        &self.transaction_manager
421    }
422}
423
424// =========================================================================
425// RDF-specific methods (gated behind `rdf` feature)
426// =========================================================================
427
428#[cfg(feature = "rdf")]
429impl QueryProcessor {
430    /// Creates a new query processor with both LPG and RDF stores.
431    #[must_use]
432    pub fn with_rdf(
433        lpg_store: Arc<LpgStore>,
434        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
435    ) -> Self {
436        let optimizer = Optimizer::from_store(&lpg_store);
437        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
438        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
439        Self {
440            lpg_store,
441            graph_store,
442            write_store,
443            transaction_manager: Arc::new(TransactionManager::new()),
444            catalog: Arc::new(Catalog::new()),
445            optimizer,
446            transaction_context: None,
447            rdf_store: Some(rdf_store),
448        }
449    }
450
451    /// Returns a reference to the RDF store (if configured).
452    #[must_use]
453    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
454        self.rdf_store.as_ref()
455    }
456
457    /// Processes an RDF query (SPARQL, GraphQL-RDF).
458    fn process_rdf(
459        &self,
460        query: &str,
461        language: QueryLanguage,
462        params: Option<&QueryParams>,
463    ) -> Result<QueryResult> {
464        use crate::query::planner::rdf::RdfPlanner;
465
466        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
467            Error::Internal("RDF store not configured for this processor".to_string())
468        })?;
469
470        // 1. Parse and translate to logical plan
471        let mut logical_plan = self.translate_rdf(query, language)?;
472
473        // 2. Substitute parameters if provided (merge defaults from the plan first)
474        let has_defaults = !logical_plan.default_params.is_empty();
475        if params.is_some() || has_defaults {
476            let merged = if has_defaults {
477                let mut merged = logical_plan.default_params.clone();
478                if let Some(params) = params {
479                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
480                }
481                merged
482            } else {
483                params.cloned().unwrap_or_default()
484            };
485            substitute_params(&mut logical_plan, &merged)?;
486        }
487
488        // 3. Semantic validation
489        let mut binder = Binder::new();
490        let _binding_context = binder.bind(&logical_plan)?;
491
492        // 3. Optimize the plan
493        let optimized_plan = self.optimizer.optimize(logical_plan)?;
494
495        // 3a. EXPLAIN: return the optimized plan tree without executing
496        if optimized_plan.explain {
497            return Ok(explain_result(&optimized_plan));
498        }
499
500        // 4. Convert to physical plan (using RDF planner)
501        let planner = RdfPlanner::new(Arc::clone(rdf_store));
502        let mut physical_plan = planner.plan(&optimized_plan)?;
503
504        // 5. Execute and collect results
505        let executor = Executor::with_columns(physical_plan.columns.clone());
506        executor.execute(physical_plan.operator.as_mut())
507    }
508
509    /// Translates an RDF query to a logical plan.
510    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
511        match language {
512            #[cfg(feature = "sparql")]
513            QueryLanguage::Sparql => {
514                use crate::query::translators::sparql;
515                sparql::translate(query)
516            }
517            #[cfg(all(feature = "graphql", feature = "rdf"))]
518            QueryLanguage::GraphQLRdf => {
519                use crate::query::translators::graphql_rdf;
520                // Default namespace for GraphQL-RDF queries
521                graphql_rdf::translate(query, "http://example.org/")
522            }
523            _ => Err(Error::Internal(format!(
524                "Language {:?} is not an RDF language",
525                language
526            ))),
527        }
528    }
529}
530
531/// Annotates filter operators in the plan with pushdown hints.
532///
533/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
534/// whether a property index exists for equality predicates.
535pub(crate) fn annotate_pushdown_hints(
536    op: &mut LogicalOperator,
537    store: &dyn grafeo_core::graph::GraphStore,
538) {
539    #[allow(clippy::wildcard_imports)]
540    use crate::query::plan::*;
541
542    match op {
543        LogicalOperator::Filter(filter) => {
544            // Recurse into children first
545            annotate_pushdown_hints(&mut filter.input, store);
546
547            // Annotate this filter if it sits on top of a NodeScan
548            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
549                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
550            }
551        }
552        LogicalOperator::NodeScan(op) => {
553            if let Some(input) = &mut op.input {
554                annotate_pushdown_hints(input, store);
555            }
556        }
557        LogicalOperator::EdgeScan(op) => {
558            if let Some(input) = &mut op.input {
559                annotate_pushdown_hints(input, store);
560            }
561        }
562        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
563        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
564        LogicalOperator::Join(op) => {
565            annotate_pushdown_hints(&mut op.left, store);
566            annotate_pushdown_hints(&mut op.right, store);
567        }
568        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
569        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
570        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
571        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
572        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
573        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
574        LogicalOperator::Union(op) => {
575            for input in &mut op.inputs {
576                annotate_pushdown_hints(input, store);
577            }
578        }
579        LogicalOperator::Apply(op) => {
580            annotate_pushdown_hints(&mut op.input, store);
581            annotate_pushdown_hints(&mut op.subplan, store);
582        }
583        LogicalOperator::Otherwise(op) => {
584            annotate_pushdown_hints(&mut op.left, store);
585            annotate_pushdown_hints(&mut op.right, store);
586        }
587        _ => {}
588    }
589}
590
591/// Infers the pushdown strategy for a filter predicate over a node scan.
592fn infer_pushdown(
593    predicate: &LogicalExpression,
594    scan: &crate::query::plan::NodeScanOp,
595    store: &dyn grafeo_core::graph::GraphStore,
596) -> Option<crate::query::plan::PushdownHint> {
597    #[allow(clippy::wildcard_imports)]
598    use crate::query::plan::*;
599
600    match predicate {
601        // Equality: n.prop = value
602        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
603            if let Some(prop) = extract_property_name(left, &scan.variable)
604                .or_else(|| extract_property_name(right, &scan.variable))
605            {
606                if store.has_property_index(&prop) {
607                    return Some(PushdownHint::IndexLookup { property: prop });
608                }
609                if scan.label.is_some() {
610                    return Some(PushdownHint::LabelFirst);
611                }
612            }
613            None
614        }
615        // Range: n.prop > value, n.prop < value, etc.
616        LogicalExpression::Binary {
617            left,
618            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
619            right,
620        } => {
621            if let Some(prop) = extract_property_name(left, &scan.variable)
622                .or_else(|| extract_property_name(right, &scan.variable))
623            {
624                if store.has_property_index(&prop) {
625                    return Some(PushdownHint::RangeScan { property: prop });
626                }
627                if scan.label.is_some() {
628                    return Some(PushdownHint::LabelFirst);
629                }
630            }
631            None
632        }
633        // AND: check the left side (first conjunct) for pushdown
634        LogicalExpression::Binary {
635            left,
636            op: BinaryOp::And,
637            ..
638        } => infer_pushdown(left, scan, store),
639        _ => {
640            // Any other predicate on a labeled scan gets label-first
641            if scan.label.is_some() {
642                Some(PushdownHint::LabelFirst)
643            } else {
644                None
645            }
646        }
647    }
648}
649
650/// Extracts the property name if the expression is `Property { variable, property }`
651/// and the variable matches the scan variable.
652fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
653    if let LogicalExpression::Property { variable, property } = expr
654        && variable == scan_var
655    {
656        Some(property.clone())
657    } else {
658        None
659    }
660}
661
662/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
663pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
664    let tree_text = plan.root.explain_tree();
665    QueryResult {
666        columns: vec!["plan".to_string()],
667        column_types: vec![grafeo_common::types::LogicalType::String],
668        rows: vec![vec![Value::String(tree_text.into())]],
669        execution_time_ms: None,
670        rows_scanned: None,
671        status_message: None,
672        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
673    }
674}
675
676/// Substitutes parameters in a logical plan with their values.
677///
678/// # Errors
679///
680/// Returns an error if a referenced parameter is not found in `params`.
681pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
682    substitute_in_operator(&mut plan.root, params)
683}
684
685/// Recursively substitutes parameters in an operator.
686fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
687    #[allow(clippy::wildcard_imports)]
688    use crate::query::plan::*;
689
690    match op {
691        LogicalOperator::Filter(filter) => {
692            substitute_in_expression(&mut filter.predicate, params)?;
693            substitute_in_operator(&mut filter.input, params)?;
694        }
695        LogicalOperator::Return(ret) => {
696            for item in &mut ret.items {
697                substitute_in_expression(&mut item.expression, params)?;
698            }
699            substitute_in_operator(&mut ret.input, params)?;
700        }
701        LogicalOperator::Project(proj) => {
702            for p in &mut proj.projections {
703                substitute_in_expression(&mut p.expression, params)?;
704            }
705            substitute_in_operator(&mut proj.input, params)?;
706        }
707        LogicalOperator::NodeScan(scan) => {
708            if let Some(input) = &mut scan.input {
709                substitute_in_operator(input, params)?;
710            }
711        }
712        LogicalOperator::EdgeScan(scan) => {
713            if let Some(input) = &mut scan.input {
714                substitute_in_operator(input, params)?;
715            }
716        }
717        LogicalOperator::Expand(expand) => {
718            substitute_in_operator(&mut expand.input, params)?;
719        }
720        LogicalOperator::Join(join) => {
721            substitute_in_operator(&mut join.left, params)?;
722            substitute_in_operator(&mut join.right, params)?;
723            for cond in &mut join.conditions {
724                substitute_in_expression(&mut cond.left, params)?;
725                substitute_in_expression(&mut cond.right, params)?;
726            }
727        }
728        LogicalOperator::LeftJoin(join) => {
729            substitute_in_operator(&mut join.left, params)?;
730            substitute_in_operator(&mut join.right, params)?;
731            if let Some(cond) = &mut join.condition {
732                substitute_in_expression(cond, params)?;
733            }
734        }
735        LogicalOperator::Aggregate(agg) => {
736            for expr in &mut agg.group_by {
737                substitute_in_expression(expr, params)?;
738            }
739            for agg_expr in &mut agg.aggregates {
740                if let Some(expr) = &mut agg_expr.expression {
741                    substitute_in_expression(expr, params)?;
742                }
743            }
744            substitute_in_operator(&mut agg.input, params)?;
745        }
746        LogicalOperator::Sort(sort) => {
747            for key in &mut sort.keys {
748                substitute_in_expression(&mut key.expression, params)?;
749            }
750            substitute_in_operator(&mut sort.input, params)?;
751        }
752        LogicalOperator::Limit(limit) => {
753            resolve_count_param(&mut limit.count, params)?;
754            substitute_in_operator(&mut limit.input, params)?;
755        }
756        LogicalOperator::Skip(skip) => {
757            resolve_count_param(&mut skip.count, params)?;
758            substitute_in_operator(&mut skip.input, params)?;
759        }
760        LogicalOperator::Distinct(distinct) => {
761            substitute_in_operator(&mut distinct.input, params)?;
762        }
763        LogicalOperator::CreateNode(create) => {
764            for (_, expr) in &mut create.properties {
765                substitute_in_expression(expr, params)?;
766            }
767            if let Some(input) = &mut create.input {
768                substitute_in_operator(input, params)?;
769            }
770        }
771        LogicalOperator::CreateEdge(create) => {
772            for (_, expr) in &mut create.properties {
773                substitute_in_expression(expr, params)?;
774            }
775            substitute_in_operator(&mut create.input, params)?;
776        }
777        LogicalOperator::DeleteNode(delete) => {
778            substitute_in_operator(&mut delete.input, params)?;
779        }
780        LogicalOperator::DeleteEdge(delete) => {
781            substitute_in_operator(&mut delete.input, params)?;
782        }
783        LogicalOperator::SetProperty(set) => {
784            for (_, expr) in &mut set.properties {
785                substitute_in_expression(expr, params)?;
786            }
787            substitute_in_operator(&mut set.input, params)?;
788        }
789        LogicalOperator::Union(union) => {
790            for input in &mut union.inputs {
791                substitute_in_operator(input, params)?;
792            }
793        }
794        LogicalOperator::AntiJoin(anti) => {
795            substitute_in_operator(&mut anti.left, params)?;
796            substitute_in_operator(&mut anti.right, params)?;
797        }
798        LogicalOperator::Bind(bind) => {
799            substitute_in_expression(&mut bind.expression, params)?;
800            substitute_in_operator(&mut bind.input, params)?;
801        }
802        LogicalOperator::TripleScan(scan) => {
803            if let Some(input) = &mut scan.input {
804                substitute_in_operator(input, params)?;
805            }
806        }
807        LogicalOperator::Unwind(unwind) => {
808            substitute_in_expression(&mut unwind.expression, params)?;
809            substitute_in_operator(&mut unwind.input, params)?;
810        }
811        LogicalOperator::MapCollect(mc) => {
812            substitute_in_operator(&mut mc.input, params)?;
813        }
814        LogicalOperator::Merge(merge) => {
815            for (_, expr) in &mut merge.match_properties {
816                substitute_in_expression(expr, params)?;
817            }
818            for (_, expr) in &mut merge.on_create {
819                substitute_in_expression(expr, params)?;
820            }
821            for (_, expr) in &mut merge.on_match {
822                substitute_in_expression(expr, params)?;
823            }
824            substitute_in_operator(&mut merge.input, params)?;
825        }
826        LogicalOperator::MergeRelationship(merge_rel) => {
827            for (_, expr) in &mut merge_rel.match_properties {
828                substitute_in_expression(expr, params)?;
829            }
830            for (_, expr) in &mut merge_rel.on_create {
831                substitute_in_expression(expr, params)?;
832            }
833            for (_, expr) in &mut merge_rel.on_match {
834                substitute_in_expression(expr, params)?;
835            }
836            substitute_in_operator(&mut merge_rel.input, params)?;
837        }
838        LogicalOperator::AddLabel(add_label) => {
839            substitute_in_operator(&mut add_label.input, params)?;
840        }
841        LogicalOperator::RemoveLabel(remove_label) => {
842            substitute_in_operator(&mut remove_label.input, params)?;
843        }
844        LogicalOperator::ShortestPath(sp) => {
845            substitute_in_operator(&mut sp.input, params)?;
846        }
847        // SPARQL Update operators
848        LogicalOperator::InsertTriple(insert) => {
849            if let Some(ref mut input) = insert.input {
850                substitute_in_operator(input, params)?;
851            }
852        }
853        LogicalOperator::DeleteTriple(delete) => {
854            if let Some(ref mut input) = delete.input {
855                substitute_in_operator(input, params)?;
856            }
857        }
858        LogicalOperator::Modify(modify) => {
859            substitute_in_operator(&mut modify.where_clause, params)?;
860        }
861        LogicalOperator::ClearGraph(_)
862        | LogicalOperator::CreateGraph(_)
863        | LogicalOperator::DropGraph(_)
864        | LogicalOperator::LoadGraph(_)
865        | LogicalOperator::CopyGraph(_)
866        | LogicalOperator::MoveGraph(_)
867        | LogicalOperator::AddGraph(_) => {}
868        LogicalOperator::HorizontalAggregate(op) => {
869            substitute_in_operator(&mut op.input, params)?;
870        }
871        LogicalOperator::Empty => {}
872        LogicalOperator::VectorScan(scan) => {
873            substitute_in_expression(&mut scan.query_vector, params)?;
874            if let Some(ref mut input) = scan.input {
875                substitute_in_operator(input, params)?;
876            }
877        }
878        LogicalOperator::VectorJoin(join) => {
879            substitute_in_expression(&mut join.query_vector, params)?;
880            substitute_in_operator(&mut join.input, params)?;
881        }
882        LogicalOperator::Except(except) => {
883            substitute_in_operator(&mut except.left, params)?;
884            substitute_in_operator(&mut except.right, params)?;
885        }
886        LogicalOperator::Intersect(intersect) => {
887            substitute_in_operator(&mut intersect.left, params)?;
888            substitute_in_operator(&mut intersect.right, params)?;
889        }
890        LogicalOperator::Otherwise(otherwise) => {
891            substitute_in_operator(&mut otherwise.left, params)?;
892            substitute_in_operator(&mut otherwise.right, params)?;
893        }
894        LogicalOperator::Apply(apply) => {
895            substitute_in_operator(&mut apply.input, params)?;
896            substitute_in_operator(&mut apply.subplan, params)?;
897        }
898        // ParameterScan has no expressions to substitute
899        LogicalOperator::ParameterScan(_) => {}
900        LogicalOperator::MultiWayJoin(mwj) => {
901            for input in &mut mwj.inputs {
902                substitute_in_operator(input, params)?;
903            }
904            for cond in &mut mwj.conditions {
905                substitute_in_expression(&mut cond.left, params)?;
906                substitute_in_expression(&mut cond.right, params)?;
907            }
908        }
909        // DDL operators have no expressions to substitute
910        LogicalOperator::CreatePropertyGraph(_) => {}
911        // Procedure calls: arguments could contain parameters but we handle at execution time
912        LogicalOperator::CallProcedure(_) => {}
913        // LoadData: file path is a literal, no parameter substitution needed
914        LogicalOperator::LoadData(_) => {}
915    }
916    Ok(())
917}
918
919/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
920fn resolve_count_param(
921    count: &mut crate::query::plan::CountExpr,
922    params: &QueryParams,
923) -> Result<()> {
924    use crate::query::plan::CountExpr;
925    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
926
927    if let CountExpr::Parameter(name) = count {
928        let value = params.get(name.as_str()).ok_or_else(|| {
929            Error::Query(QueryError::new(
930                QueryErrorKind::Semantic,
931                format!("Missing parameter for SKIP/LIMIT: ${name}"),
932            ))
933        })?;
934        let n = match value {
935            Value::Int64(i) if *i >= 0 => *i as usize,
936            Value::Int64(i) => {
937                return Err(Error::Query(QueryError::new(
938                    QueryErrorKind::Semantic,
939                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
940                )));
941            }
942            other => {
943                return Err(Error::Query(QueryError::new(
944                    QueryErrorKind::Semantic,
945                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
946                )));
947            }
948        };
949        *count = CountExpr::Literal(n);
950    }
951    Ok(())
952}
953
954/// Substitutes parameters in an expression with their values.
955fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
956    use crate::query::plan::LogicalExpression;
957
958    match expr {
959        LogicalExpression::Parameter(name) => {
960            if let Some(value) = params.get(name) {
961                *expr = LogicalExpression::Literal(value.clone());
962            } else {
963                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
964            }
965        }
966        LogicalExpression::Binary { left, right, .. } => {
967            substitute_in_expression(left, params)?;
968            substitute_in_expression(right, params)?;
969        }
970        LogicalExpression::Unary { operand, .. } => {
971            substitute_in_expression(operand, params)?;
972        }
973        LogicalExpression::FunctionCall { args, .. } => {
974            for arg in args {
975                substitute_in_expression(arg, params)?;
976            }
977        }
978        LogicalExpression::List(items) => {
979            for item in items {
980                substitute_in_expression(item, params)?;
981            }
982        }
983        LogicalExpression::Map(pairs) => {
984            for (_, value) in pairs {
985                substitute_in_expression(value, params)?;
986            }
987        }
988        LogicalExpression::IndexAccess { base, index } => {
989            substitute_in_expression(base, params)?;
990            substitute_in_expression(index, params)?;
991        }
992        LogicalExpression::SliceAccess { base, start, end } => {
993            substitute_in_expression(base, params)?;
994            if let Some(s) = start {
995                substitute_in_expression(s, params)?;
996            }
997            if let Some(e) = end {
998                substitute_in_expression(e, params)?;
999            }
1000        }
1001        LogicalExpression::Case {
1002            operand,
1003            when_clauses,
1004            else_clause,
1005        } => {
1006            if let Some(op) = operand {
1007                substitute_in_expression(op, params)?;
1008            }
1009            for (cond, result) in when_clauses {
1010                substitute_in_expression(cond, params)?;
1011                substitute_in_expression(result, params)?;
1012            }
1013            if let Some(el) = else_clause {
1014                substitute_in_expression(el, params)?;
1015            }
1016        }
1017        LogicalExpression::Property { .. }
1018        | LogicalExpression::Variable(_)
1019        | LogicalExpression::Literal(_)
1020        | LogicalExpression::Labels(_)
1021        | LogicalExpression::Type(_)
1022        | LogicalExpression::Id(_) => {}
1023        LogicalExpression::ListComprehension {
1024            list_expr,
1025            filter_expr,
1026            map_expr,
1027            ..
1028        } => {
1029            substitute_in_expression(list_expr, params)?;
1030            if let Some(filter) = filter_expr {
1031                substitute_in_expression(filter, params)?;
1032            }
1033            substitute_in_expression(map_expr, params)?;
1034        }
1035        LogicalExpression::ListPredicate {
1036            list_expr,
1037            predicate,
1038            ..
1039        } => {
1040            substitute_in_expression(list_expr, params)?;
1041            substitute_in_expression(predicate, params)?;
1042        }
1043        LogicalExpression::ExistsSubquery(_)
1044        | LogicalExpression::CountSubquery(_)
1045        | LogicalExpression::ValueSubquery(_) => {
1046            // Subqueries would need recursive parameter substitution
1047        }
1048        LogicalExpression::PatternComprehension { projection, .. } => {
1049            substitute_in_expression(projection, params)?;
1050        }
1051        LogicalExpression::MapProjection { entries, .. } => {
1052            for entry in entries {
1053                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1054                    substitute_in_expression(expr, params)?;
1055                }
1056            }
1057        }
1058        LogicalExpression::Reduce {
1059            initial,
1060            list,
1061            expression,
1062            ..
1063        } => {
1064            substitute_in_expression(initial, params)?;
1065            substitute_in_expression(list, params)?;
1066            substitute_in_expression(expression, params)?;
1067        }
1068    }
1069    Ok(())
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use super::*;
1075
1076    #[test]
1077    fn test_query_language_is_lpg() {
1078        #[cfg(feature = "gql")]
1079        assert!(QueryLanguage::Gql.is_lpg());
1080        #[cfg(feature = "cypher")]
1081        assert!(QueryLanguage::Cypher.is_lpg());
1082        #[cfg(feature = "sparql")]
1083        assert!(!QueryLanguage::Sparql.is_lpg());
1084    }
1085
1086    #[test]
1087    fn test_processor_creation() {
1088        let store = Arc::new(LpgStore::new().unwrap());
1089        let processor = QueryProcessor::for_lpg(store);
1090        assert!(processor.lpg_store().node_count() == 0);
1091    }
1092
1093    #[cfg(feature = "gql")]
1094    #[test]
1095    fn test_process_simple_gql() {
1096        let store = Arc::new(LpgStore::new().unwrap());
1097        store.create_node(&["Person"]);
1098        store.create_node(&["Person"]);
1099
1100        let processor = QueryProcessor::for_lpg(store);
1101        let result = processor
1102            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1103            .unwrap();
1104
1105        assert_eq!(result.row_count(), 2);
1106        assert_eq!(result.columns[0], "n");
1107    }
1108
1109    #[cfg(feature = "cypher")]
1110    #[test]
1111    fn test_process_simple_cypher() {
1112        let store = Arc::new(LpgStore::new().unwrap());
1113        store.create_node(&["Person"]);
1114
1115        let processor = QueryProcessor::for_lpg(store);
1116        let result = processor
1117            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1118            .unwrap();
1119
1120        assert_eq!(result.row_count(), 1);
1121    }
1122
1123    #[cfg(feature = "gql")]
1124    #[test]
1125    fn test_process_with_params() {
1126        let store = Arc::new(LpgStore::new().unwrap());
1127        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1128        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1129        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1130
1131        let processor = QueryProcessor::for_lpg(store);
1132
1133        // Query with parameter
1134        let mut params = HashMap::new();
1135        params.insert("min_age".to_string(), Value::Int64(30));
1136
1137        let result = processor
1138            .process(
1139                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1140                QueryLanguage::Gql,
1141                Some(&params),
1142            )
1143            .unwrap();
1144
1145        // Should return 2 people (ages 35 and 45)
1146        assert_eq!(result.row_count(), 2);
1147    }
1148
1149    #[cfg(feature = "gql")]
1150    #[test]
1151    fn test_missing_param_error() {
1152        let store = Arc::new(LpgStore::new().unwrap());
1153        store.create_node(&["Person"]);
1154
1155        let processor = QueryProcessor::for_lpg(store);
1156
1157        // Query with parameter but empty params map (missing the required param)
1158        let params: HashMap<String, Value> = HashMap::new();
1159        let result = processor.process(
1160            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1161            QueryLanguage::Gql,
1162            Some(&params),
1163        );
1164
1165        // Should fail with missing parameter error
1166        assert!(result.is_err());
1167        let err = result.unwrap_err();
1168        assert!(
1169            err.to_string().contains("Missing parameter"),
1170            "Expected 'Missing parameter' error, got: {}",
1171            err
1172        );
1173    }
1174
1175    #[cfg(feature = "gql")]
1176    #[test]
1177    fn test_params_in_filter_with_property() {
1178        // Tests parameter substitution in WHERE clause with property comparison
1179        let store = Arc::new(LpgStore::new().unwrap());
1180        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1181        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1182
1183        let processor = QueryProcessor::for_lpg(store);
1184
1185        let mut params = HashMap::new();
1186        params.insert("threshold".to_string(), Value::Int64(15));
1187
1188        let result = processor
1189            .process(
1190                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1191                QueryLanguage::Gql,
1192                Some(&params),
1193            )
1194            .unwrap();
1195
1196        // Only value=20 matches > 15
1197        assert_eq!(result.row_count(), 1);
1198        let row = &result.rows[0];
1199        assert_eq!(row[0], Value::Int64(20));
1200    }
1201
1202    #[cfg(feature = "gql")]
1203    #[test]
1204    fn test_params_in_multiple_where_conditions() {
1205        // Tests multiple parameters in WHERE clause with AND
1206        let store = Arc::new(LpgStore::new().unwrap());
1207        store.create_node_with_props(
1208            &["Person"],
1209            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1210        );
1211        store.create_node_with_props(
1212            &["Person"],
1213            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1214        );
1215        store.create_node_with_props(
1216            &["Person"],
1217            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1218        );
1219
1220        let processor = QueryProcessor::for_lpg(store);
1221
1222        let mut params = HashMap::new();
1223        params.insert("min_age".to_string(), Value::Int64(30));
1224        params.insert("min_score".to_string(), Value::Int64(75));
1225
1226        let result = processor
1227            .process(
1228                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1229                QueryLanguage::Gql,
1230                Some(&params),
1231            )
1232            .unwrap();
1233
1234        // Only the person with age=35, score=90 matches both conditions
1235        assert_eq!(result.row_count(), 1);
1236    }
1237
1238    #[cfg(feature = "gql")]
1239    #[test]
1240    fn test_params_with_in_list() {
1241        // Tests parameter as a value checked against IN list
1242        let store = Arc::new(LpgStore::new().unwrap());
1243        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1244        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1245        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1246
1247        let processor = QueryProcessor::for_lpg(store);
1248
1249        // Check if a parameter value matches any of the statuses
1250        let mut params = HashMap::new();
1251        params.insert("target".to_string(), Value::String("active".into()));
1252
1253        let result = processor
1254            .process(
1255                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1256                QueryLanguage::Gql,
1257                Some(&params),
1258            )
1259            .unwrap();
1260
1261        assert_eq!(result.row_count(), 1);
1262    }
1263
1264    #[cfg(feature = "gql")]
1265    #[test]
1266    fn test_params_same_type_comparison() {
1267        // Tests that same-type parameter comparisons work correctly
1268        let store = Arc::new(LpgStore::new().unwrap());
1269        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1270        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1271
1272        let processor = QueryProcessor::for_lpg(store);
1273
1274        // Compare int property with int parameter
1275        let mut params = HashMap::new();
1276        params.insert("threshold".to_string(), Value::Int64(75));
1277
1278        let result = processor
1279            .process(
1280                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1281                QueryLanguage::Gql,
1282                Some(&params),
1283            )
1284            .unwrap();
1285
1286        // Only value=100 matches > 75
1287        assert_eq!(result.row_count(), 1);
1288    }
1289
1290    #[cfg(feature = "gql")]
1291    #[test]
1292    fn test_process_empty_result_has_columns() {
1293        // Tests that empty results still have correct column names
1294        let store = Arc::new(LpgStore::new().unwrap());
1295        // Don't create any nodes
1296
1297        let processor = QueryProcessor::for_lpg(store);
1298        let result = processor
1299            .process(
1300                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1301                QueryLanguage::Gql,
1302                None,
1303            )
1304            .unwrap();
1305
1306        assert_eq!(result.row_count(), 0);
1307        assert_eq!(result.columns.len(), 2);
1308        assert_eq!(result.columns[0], "name");
1309        assert_eq!(result.columns[1], "age");
1310    }
1311
1312    #[cfg(feature = "gql")]
1313    #[test]
1314    fn test_params_string_equality() {
1315        // Tests string parameter equality comparison
1316        let store = Arc::new(LpgStore::new().unwrap());
1317        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1318        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1319        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1320
1321        let processor = QueryProcessor::for_lpg(store);
1322
1323        let mut params = HashMap::new();
1324        params.insert("target".to_string(), Value::String("beta".into()));
1325
1326        let result = processor
1327            .process(
1328                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1329                QueryLanguage::Gql,
1330                Some(&params),
1331            )
1332            .unwrap();
1333
1334        assert_eq!(result.row_count(), 1);
1335        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1336    }
1337}