Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStore, GraphStoreMut};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28/// Supported query languages.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32    /// GQL (ISO/IEC 39075:2024) - default for LPG
33    #[cfg(feature = "gql")]
34    Gql,
35    /// openCypher 9.0
36    #[cfg(feature = "cypher")]
37    Cypher,
38    /// Apache TinkerPop Gremlin
39    #[cfg(feature = "gremlin")]
40    Gremlin,
41    /// GraphQL for LPG
42    #[cfg(feature = "graphql")]
43    GraphQL,
44    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
45    #[cfg(feature = "sql-pgq")]
46    SqlPgq,
47    /// SPARQL 1.1 for RDF
48    #[cfg(feature = "sparql")]
49    Sparql,
50    /// GraphQL for RDF
51    #[cfg(all(feature = "graphql", feature = "triple-store"))]
52    GraphQLRdf,
53}
54
55impl QueryLanguage {
56    /// Returns whether this language targets LPG (vs RDF).
57    #[must_use]
58    pub const fn is_lpg(&self) -> bool {
59        match self {
60            #[cfg(feature = "gql")]
61            Self::Gql => true,
62            #[cfg(feature = "cypher")]
63            Self::Cypher => true,
64            #[cfg(feature = "gremlin")]
65            Self::Gremlin => true,
66            #[cfg(feature = "graphql")]
67            Self::GraphQL => true,
68            #[cfg(feature = "sql-pgq")]
69            Self::SqlPgq => true,
70            #[cfg(feature = "sparql")]
71            Self::Sparql => false,
72            #[cfg(all(feature = "graphql", feature = "triple-store"))]
73            Self::GraphQLRdf => false,
74        }
75    }
76}
77
78/// Query parameters for prepared statements.
79pub type QueryParams = HashMap<String, Value>;
80
81/// Processes queries through the full pipeline.
82///
83/// The processor holds references to the stores and provides a unified
84/// interface for executing queries in any supported language.
85///
86/// # Example
87///
88/// ```no_run
89/// # use std::sync::Arc;
90/// # use grafeo_core::graph::lpg::LpgStore;
91/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
92///
93/// # fn main() -> grafeo_common::utils::error::Result<()> {
94/// let store = Arc::new(LpgStore::new().unwrap());
95/// let processor = QueryProcessor::for_lpg(store);
96/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
97/// # Ok(())
98/// # }
99/// ```
100pub struct QueryProcessor {
101    /// LPG store for property graph queries.
102    #[cfg(feature = "lpg")]
103    lpg_store: Arc<LpgStore>,
104    /// Graph store trait object for pluggable storage backends (read path).
105    graph_store: Arc<dyn GraphStore>,
106    /// Writable graph store (None when read-only).
107    write_store: Option<Arc<dyn GraphStoreMut>>,
108    /// Transaction manager for MVCC operations.
109    transaction_manager: Arc<TransactionManager>,
110    /// Catalog for schema and index metadata.
111    catalog: Arc<Catalog>,
112    /// Query optimizer.
113    optimizer: Optimizer,
114    /// Current transaction context (if any).
115    transaction_context: Option<(EpochId, TransactionId)>,
116    /// RDF store for triple pattern queries (optional).
117    #[cfg(feature = "triple-store")]
118    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
119}
120
121impl QueryProcessor {
122    /// Creates a new query processor for LPG queries.
123    #[cfg(feature = "lpg")]
124    #[must_use]
125    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
126        let optimizer = Optimizer::from_store(&store);
127        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
128        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
129        Self {
130            lpg_store: store,
131            graph_store,
132            write_store,
133            transaction_manager: Arc::new(TransactionManager::new()),
134            catalog: Arc::new(Catalog::new()),
135            optimizer,
136            transaction_context: None,
137            #[cfg(feature = "triple-store")]
138            rdf_store: None,
139        }
140    }
141
142    /// Creates a new query processor with a transaction manager.
143    #[cfg(feature = "lpg")]
144    #[must_use]
145    pub fn for_lpg_with_transaction(
146        store: Arc<LpgStore>,
147        transaction_manager: Arc<TransactionManager>,
148    ) -> Self {
149        let optimizer = Optimizer::from_store(&store);
150        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
151        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
152        Self {
153            lpg_store: store,
154            graph_store,
155            write_store,
156            transaction_manager,
157            catalog: Arc::new(Catalog::new()),
158            optimizer,
159            transaction_context: None,
160            #[cfg(feature = "triple-store")]
161            rdf_store: None,
162        }
163    }
164
165    /// Creates a query processor backed by any `GraphStoreMut` implementation.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the internal arena allocation fails (out of memory).
170    pub fn for_graph_store_with_transaction(
171        store: Arc<dyn GraphStoreMut>,
172        transaction_manager: Arc<TransactionManager>,
173    ) -> Result<Self> {
174        let optimizer = Optimizer::from_graph_store(&*store);
175        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
176        Ok(Self {
177            #[cfg(feature = "lpg")]
178            lpg_store: Arc::new(LpgStore::new()?),
179            graph_store: read_store,
180            write_store: Some(store),
181            transaction_manager,
182            catalog: Arc::new(Catalog::new()),
183            optimizer,
184            transaction_context: None,
185            #[cfg(feature = "triple-store")]
186            rdf_store: None,
187        })
188    }
189
190    /// Creates a query processor from split read/write stores.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the internal arena allocation fails (out of memory).
195    pub fn for_stores_with_transaction(
196        read_store: Arc<dyn GraphStore>,
197        write_store: Option<Arc<dyn GraphStoreMut>>,
198        transaction_manager: Arc<TransactionManager>,
199    ) -> Result<Self> {
200        let optimizer = Optimizer::from_graph_store(&*read_store);
201        Ok(Self {
202            #[cfg(feature = "lpg")]
203            lpg_store: Arc::new(LpgStore::new()?),
204            graph_store: read_store,
205            write_store,
206            transaction_manager,
207            catalog: Arc::new(Catalog::new()),
208            optimizer,
209            transaction_context: None,
210            #[cfg(feature = "triple-store")]
211            rdf_store: None,
212        })
213    }
214
215    /// Sets the transaction context for MVCC visibility.
216    ///
217    /// This should be called when the processor is used within a transaction.
218    #[must_use]
219    pub fn with_transaction_context(
220        mut self,
221        viewing_epoch: EpochId,
222        transaction_id: TransactionId,
223    ) -> Self {
224        self.transaction_context = Some((viewing_epoch, transaction_id));
225        self
226    }
227
228    /// Sets a custom catalog.
229    #[must_use]
230    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
231        self.catalog = catalog;
232        self
233    }
234
235    /// Sets a custom optimizer.
236    #[must_use]
237    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
238        self.optimizer = optimizer;
239        self
240    }
241
242    /// Processes a query string and returns results.
243    ///
244    /// Pipeline:
245    /// 1. Parse (language-specific parser → AST)
246    /// 2. Translate (AST → LogicalPlan)
247    /// 3. Bind (semantic validation)
248    /// 4. Optimize (filter pushdown, join reorder, etc.)
249    /// 5. Plan (logical → physical operators)
250    /// 6. Execute (run operators, collect results)
251    ///
252    /// # Arguments
253    ///
254    /// * `query` - The query string
255    /// * `language` - Which query language to use
256    /// * `params` - Optional query parameters for prepared statements
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if any stage of the pipeline fails.
261    pub fn process(
262        &self,
263        query: &str,
264        language: QueryLanguage,
265        params: Option<&QueryParams>,
266    ) -> Result<QueryResult> {
267        if language.is_lpg() {
268            self.process_lpg(query, language, params)
269        } else {
270            #[cfg(feature = "triple-store")]
271            {
272                self.process_rdf(query, language, params)
273            }
274            #[cfg(not(feature = "triple-store"))]
275            {
276                Err(Error::Internal(
277                    "RDF support not enabled. Compile with --features rdf".to_string(),
278                ))
279            }
280        }
281    }
282
283    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
284    fn process_lpg(
285        &self,
286        query: &str,
287        language: QueryLanguage,
288        params: Option<&QueryParams>,
289    ) -> Result<QueryResult> {
290        #[cfg(not(target_arch = "wasm32"))]
291        let start_time = std::time::Instant::now();
292
293        // 1. Parse and translate to logical plan
294        let mut logical_plan = self.translate_lpg(query, language)?;
295
296        // 2. Substitute parameters if provided (merge defaults from the plan first)
297        let has_defaults = !logical_plan.default_params.is_empty();
298        if params.is_some() || has_defaults {
299            let merged = if has_defaults {
300                let mut merged = logical_plan.default_params.clone();
301                if let Some(params) = params {
302                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
303                }
304                merged
305            } else {
306                params.cloned().unwrap_or_default()
307            };
308            substitute_params(&mut logical_plan, &merged)?;
309        }
310
311        // 3. Semantic validation
312        let mut binder = Binder::new();
313        let _binding_context = binder.bind(&logical_plan)?;
314
315        // 4. Optimize the plan
316        let optimized_plan = self.optimizer.optimize(logical_plan)?;
317
318        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
319        if optimized_plan.explain {
320            let mut plan = optimized_plan;
321            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
322            return Ok(explain_result(&plan));
323        }
324
325        // 5. Convert to physical plan with transaction context
326        // Read-only fast path: safe when no mutations AND no active transaction
327        // (an active transaction may have prior uncommitted writes from earlier statements)
328        let is_read_only =
329            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
330        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
331            Planner::with_context(
332                Arc::clone(&self.graph_store),
333                self.write_store.as_ref().map(Arc::clone),
334                Arc::clone(&self.transaction_manager),
335                Some(transaction_id),
336                epoch,
337            )
338        } else {
339            Planner::with_context(
340                Arc::clone(&self.graph_store),
341                self.write_store.as_ref().map(Arc::clone),
342                Arc::clone(&self.transaction_manager),
343                None,
344                self.transaction_manager.current_epoch(),
345            )
346        }
347        .with_read_only(is_read_only);
348        let mut physical_plan = planner.plan(&optimized_plan)?;
349
350        // 6. Execute and collect results
351        let executor = Executor::with_columns(physical_plan.columns.clone());
352        let mut result = executor.execute(physical_plan.operator.as_mut())?;
353
354        // Add execution metrics
355        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
356        #[cfg(not(target_arch = "wasm32"))]
357        {
358            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
359            result.execution_time_ms = Some(elapsed_ms);
360        }
361        result.rows_scanned = Some(rows_scanned);
362
363        Ok(result)
364    }
365
366    /// Translates an LPG query to a logical plan.
367    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
368        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
369        match language {
370            #[cfg(feature = "gql")]
371            QueryLanguage::Gql => {
372                use crate::query::translators::gql;
373                gql::translate(query)
374            }
375            #[cfg(feature = "cypher")]
376            QueryLanguage::Cypher => {
377                use crate::query::translators::cypher;
378                cypher::translate(query)
379            }
380            #[cfg(feature = "gremlin")]
381            QueryLanguage::Gremlin => {
382                use crate::query::translators::gremlin;
383                gremlin::translate(query)
384            }
385            #[cfg(feature = "graphql")]
386            QueryLanguage::GraphQL => {
387                use crate::query::translators::graphql;
388                graphql::translate(query)
389            }
390            #[cfg(feature = "sql-pgq")]
391            QueryLanguage::SqlPgq => {
392                use crate::query::translators::sql_pgq;
393                sql_pgq::translate(query)
394            }
395            #[allow(unreachable_patterns)]
396            _ => Err(Error::Internal(format!(
397                "Language {:?} is not an LPG language",
398                language
399            ))),
400        }
401    }
402
403    /// Returns a reference to the LPG store.
404    #[cfg(feature = "lpg")]
405    #[must_use]
406    pub fn lpg_store(&self) -> &Arc<LpgStore> {
407        &self.lpg_store
408    }
409
410    /// Returns a reference to the catalog.
411    #[must_use]
412    pub fn catalog(&self) -> &Arc<Catalog> {
413        &self.catalog
414    }
415
416    /// Returns a reference to the optimizer.
417    #[must_use]
418    pub fn optimizer(&self) -> &Optimizer {
419        &self.optimizer
420    }
421}
422
423impl QueryProcessor {
424    /// Returns a reference to the transaction manager.
425    #[must_use]
426    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
427        &self.transaction_manager
428    }
429}
430
431// =========================================================================
432// RDF-specific methods (gated behind `rdf` feature)
433// =========================================================================
434
435#[cfg(feature = "triple-store")]
436impl QueryProcessor {
437    /// Creates a new query processor with both LPG and RDF stores.
438    #[cfg(feature = "lpg")]
439    #[must_use]
440    pub fn with_rdf(
441        lpg_store: Arc<LpgStore>,
442        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
443    ) -> Self {
444        let optimizer = Optimizer::from_store(&lpg_store);
445        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
446        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
447        Self {
448            lpg_store,
449            graph_store,
450            write_store,
451            transaction_manager: Arc::new(TransactionManager::new()),
452            catalog: Arc::new(Catalog::new()),
453            optimizer,
454            transaction_context: None,
455            rdf_store: Some(rdf_store),
456        }
457    }
458
459    /// Returns a reference to the RDF store (if configured).
460    #[must_use]
461    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
462        self.rdf_store.as_ref()
463    }
464
465    /// Processes an RDF query (SPARQL, GraphQL-RDF).
466    fn process_rdf(
467        &self,
468        query: &str,
469        language: QueryLanguage,
470        params: Option<&QueryParams>,
471    ) -> Result<QueryResult> {
472        use crate::query::planner::rdf::RdfPlanner;
473
474        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
475            Error::Internal("RDF store not configured for this processor".to_string())
476        })?;
477
478        // 1. Parse and translate to logical plan
479        let mut logical_plan = self.translate_rdf(query, language)?;
480
481        // 2. Substitute parameters if provided (merge defaults from the plan first)
482        let has_defaults = !logical_plan.default_params.is_empty();
483        if params.is_some() || has_defaults {
484            let merged = if has_defaults {
485                let mut merged = logical_plan.default_params.clone();
486                if let Some(params) = params {
487                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
488                }
489                merged
490            } else {
491                params.cloned().unwrap_or_default()
492            };
493            substitute_params(&mut logical_plan, &merged)?;
494        }
495
496        // 3. Semantic validation
497        let mut binder = Binder::new();
498        let _binding_context = binder.bind(&logical_plan)?;
499
500        // 3. Optimize the plan
501        let optimized_plan = self.optimizer.optimize(logical_plan)?;
502
503        // 3a. EXPLAIN: return the optimized plan tree without executing
504        if optimized_plan.explain {
505            return Ok(explain_result(&optimized_plan));
506        }
507
508        // 4. Convert to physical plan (using RDF planner)
509        let planner = RdfPlanner::new(Arc::clone(rdf_store));
510        let mut physical_plan = planner.plan(&optimized_plan)?;
511
512        // 5. Execute and collect results
513        let executor = Executor::with_columns(physical_plan.columns.clone());
514        executor.execute(physical_plan.operator.as_mut())
515    }
516
517    /// Translates an RDF query to a logical plan.
518    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
519        match language {
520            #[cfg(feature = "sparql")]
521            QueryLanguage::Sparql => {
522                use crate::query::translators::sparql;
523                sparql::translate(query)
524            }
525            #[cfg(all(feature = "graphql", feature = "triple-store"))]
526            QueryLanguage::GraphQLRdf => {
527                use crate::query::translators::graphql_rdf;
528                // Default namespace for GraphQL-RDF queries
529                graphql_rdf::translate(query, "http://example.org/")
530            }
531            _ => Err(Error::Internal(format!(
532                "Language {:?} is not an RDF language",
533                language
534            ))),
535        }
536    }
537}
538
539/// Annotates filter operators in the plan with pushdown hints.
540///
541/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
542/// whether a property index exists for equality predicates.
543pub(crate) fn annotate_pushdown_hints(
544    op: &mut LogicalOperator,
545    store: &dyn grafeo_core::graph::GraphStore,
546) {
547    #[allow(clippy::wildcard_imports)]
548    use crate::query::plan::*;
549
550    match op {
551        LogicalOperator::Filter(filter) => {
552            // Recurse into children first
553            annotate_pushdown_hints(&mut filter.input, store);
554
555            // Annotate this filter if it sits on top of a NodeScan
556            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
557                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
558            }
559        }
560        LogicalOperator::NodeScan(op) => {
561            if let Some(input) = &mut op.input {
562                annotate_pushdown_hints(input, store);
563            }
564        }
565        LogicalOperator::EdgeScan(op) => {
566            if let Some(input) = &mut op.input {
567                annotate_pushdown_hints(input, store);
568            }
569        }
570        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
571        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
572        LogicalOperator::Join(op) => {
573            annotate_pushdown_hints(&mut op.left, store);
574            annotate_pushdown_hints(&mut op.right, store);
575        }
576        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
577        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
578        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
579        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
580        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
581        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
582        LogicalOperator::Union(op) => {
583            for input in &mut op.inputs {
584                annotate_pushdown_hints(input, store);
585            }
586        }
587        LogicalOperator::Apply(op) => {
588            annotate_pushdown_hints(&mut op.input, store);
589            annotate_pushdown_hints(&mut op.subplan, store);
590        }
591        LogicalOperator::Otherwise(op) => {
592            annotate_pushdown_hints(&mut op.left, store);
593            annotate_pushdown_hints(&mut op.right, store);
594        }
595        _ => {}
596    }
597}
598
599/// Infers the pushdown strategy for a filter predicate over a node scan.
600fn infer_pushdown(
601    predicate: &LogicalExpression,
602    scan: &crate::query::plan::NodeScanOp,
603    store: &dyn grafeo_core::graph::GraphStore,
604) -> Option<crate::query::plan::PushdownHint> {
605    #[allow(clippy::wildcard_imports)]
606    use crate::query::plan::*;
607
608    match predicate {
609        // Equality: n.prop = value
610        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
611            if let Some(prop) = extract_property_name(left, &scan.variable)
612                .or_else(|| extract_property_name(right, &scan.variable))
613            {
614                if store.has_property_index(&prop) {
615                    return Some(PushdownHint::IndexLookup { property: prop });
616                }
617                if scan.label.is_some() {
618                    return Some(PushdownHint::LabelFirst);
619                }
620            }
621            None
622        }
623        // Range: n.prop > value, n.prop < value, etc.
624        LogicalExpression::Binary {
625            left,
626            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
627            right,
628        } => {
629            if let Some(prop) = extract_property_name(left, &scan.variable)
630                .or_else(|| extract_property_name(right, &scan.variable))
631            {
632                if store.has_property_index(&prop) {
633                    return Some(PushdownHint::RangeScan { property: prop });
634                }
635                if scan.label.is_some() {
636                    return Some(PushdownHint::LabelFirst);
637                }
638            }
639            None
640        }
641        // AND: check the left side (first conjunct) for pushdown
642        LogicalExpression::Binary {
643            left,
644            op: BinaryOp::And,
645            ..
646        } => infer_pushdown(left, scan, store),
647        _ => {
648            // Any other predicate on a labeled scan gets label-first
649            if scan.label.is_some() {
650                Some(PushdownHint::LabelFirst)
651            } else {
652                None
653            }
654        }
655    }
656}
657
658/// Extracts the property name if the expression is `Property { variable, property }`
659/// and the variable matches the scan variable.
660fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
661    if let LogicalExpression::Property { variable, property } = expr
662        && variable == scan_var
663    {
664        Some(property.clone())
665    } else {
666        None
667    }
668}
669
670/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
671pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
672    let tree_text = plan.root.explain_tree();
673    QueryResult {
674        columns: vec!["plan".to_string()],
675        column_types: vec![grafeo_common::types::LogicalType::String],
676        rows: vec![vec![Value::String(tree_text.into())]],
677        execution_time_ms: None,
678        rows_scanned: None,
679        status_message: None,
680        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
681    }
682}
683
684/// Substitutes parameters in a logical plan with their values.
685///
686/// # Errors
687///
688/// Returns an error if a referenced parameter is not found in `params`.
689pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
690    substitute_in_operator(&mut plan.root, params)
691}
692
693/// Recursively substitutes parameters in an operator.
694fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
695    #[allow(clippy::wildcard_imports)]
696    use crate::query::plan::*;
697
698    match op {
699        LogicalOperator::Filter(filter) => {
700            substitute_in_expression(&mut filter.predicate, params)?;
701            substitute_in_operator(&mut filter.input, params)?;
702        }
703        LogicalOperator::Return(ret) => {
704            for item in &mut ret.items {
705                substitute_in_expression(&mut item.expression, params)?;
706            }
707            substitute_in_operator(&mut ret.input, params)?;
708        }
709        LogicalOperator::Project(proj) => {
710            for p in &mut proj.projections {
711                substitute_in_expression(&mut p.expression, params)?;
712            }
713            substitute_in_operator(&mut proj.input, params)?;
714        }
715        LogicalOperator::NodeScan(scan) => {
716            if let Some(input) = &mut scan.input {
717                substitute_in_operator(input, params)?;
718            }
719        }
720        LogicalOperator::EdgeScan(scan) => {
721            if let Some(input) = &mut scan.input {
722                substitute_in_operator(input, params)?;
723            }
724        }
725        LogicalOperator::Expand(expand) => {
726            substitute_in_operator(&mut expand.input, params)?;
727        }
728        LogicalOperator::Join(join) => {
729            substitute_in_operator(&mut join.left, params)?;
730            substitute_in_operator(&mut join.right, params)?;
731            for cond in &mut join.conditions {
732                substitute_in_expression(&mut cond.left, params)?;
733                substitute_in_expression(&mut cond.right, params)?;
734            }
735        }
736        LogicalOperator::LeftJoin(join) => {
737            substitute_in_operator(&mut join.left, params)?;
738            substitute_in_operator(&mut join.right, params)?;
739            if let Some(cond) = &mut join.condition {
740                substitute_in_expression(cond, params)?;
741            }
742        }
743        LogicalOperator::Aggregate(agg) => {
744            for expr in &mut agg.group_by {
745                substitute_in_expression(expr, params)?;
746            }
747            for agg_expr in &mut agg.aggregates {
748                if let Some(expr) = &mut agg_expr.expression {
749                    substitute_in_expression(expr, params)?;
750                }
751            }
752            substitute_in_operator(&mut agg.input, params)?;
753        }
754        LogicalOperator::Sort(sort) => {
755            for key in &mut sort.keys {
756                substitute_in_expression(&mut key.expression, params)?;
757            }
758            substitute_in_operator(&mut sort.input, params)?;
759        }
760        LogicalOperator::Limit(limit) => {
761            resolve_count_param(&mut limit.count, params)?;
762            substitute_in_operator(&mut limit.input, params)?;
763        }
764        LogicalOperator::Skip(skip) => {
765            resolve_count_param(&mut skip.count, params)?;
766            substitute_in_operator(&mut skip.input, params)?;
767        }
768        LogicalOperator::Distinct(distinct) => {
769            substitute_in_operator(&mut distinct.input, params)?;
770        }
771        LogicalOperator::CreateNode(create) => {
772            for (_, expr) in &mut create.properties {
773                substitute_in_expression(expr, params)?;
774            }
775            if let Some(input) = &mut create.input {
776                substitute_in_operator(input, params)?;
777            }
778        }
779        LogicalOperator::CreateEdge(create) => {
780            for (_, expr) in &mut create.properties {
781                substitute_in_expression(expr, params)?;
782            }
783            substitute_in_operator(&mut create.input, params)?;
784        }
785        LogicalOperator::DeleteNode(delete) => {
786            substitute_in_operator(&mut delete.input, params)?;
787        }
788        LogicalOperator::DeleteEdge(delete) => {
789            substitute_in_operator(&mut delete.input, params)?;
790        }
791        LogicalOperator::SetProperty(set) => {
792            for (_, expr) in &mut set.properties {
793                substitute_in_expression(expr, params)?;
794            }
795            substitute_in_operator(&mut set.input, params)?;
796        }
797        LogicalOperator::Union(union) => {
798            for input in &mut union.inputs {
799                substitute_in_operator(input, params)?;
800            }
801        }
802        LogicalOperator::AntiJoin(anti) => {
803            substitute_in_operator(&mut anti.left, params)?;
804            substitute_in_operator(&mut anti.right, params)?;
805        }
806        LogicalOperator::Bind(bind) => {
807            substitute_in_expression(&mut bind.expression, params)?;
808            substitute_in_operator(&mut bind.input, params)?;
809        }
810        LogicalOperator::TripleScan(scan) => {
811            if let Some(input) = &mut scan.input {
812                substitute_in_operator(input, params)?;
813            }
814        }
815        LogicalOperator::Unwind(unwind) => {
816            substitute_in_expression(&mut unwind.expression, params)?;
817            substitute_in_operator(&mut unwind.input, params)?;
818        }
819        LogicalOperator::MapCollect(mc) => {
820            substitute_in_operator(&mut mc.input, params)?;
821        }
822        LogicalOperator::Merge(merge) => {
823            for (_, expr) in &mut merge.match_properties {
824                substitute_in_expression(expr, params)?;
825            }
826            for (_, expr) in &mut merge.on_create {
827                substitute_in_expression(expr, params)?;
828            }
829            for (_, expr) in &mut merge.on_match {
830                substitute_in_expression(expr, params)?;
831            }
832            substitute_in_operator(&mut merge.input, params)?;
833        }
834        LogicalOperator::MergeRelationship(merge_rel) => {
835            for (_, expr) in &mut merge_rel.match_properties {
836                substitute_in_expression(expr, params)?;
837            }
838            for (_, expr) in &mut merge_rel.on_create {
839                substitute_in_expression(expr, params)?;
840            }
841            for (_, expr) in &mut merge_rel.on_match {
842                substitute_in_expression(expr, params)?;
843            }
844            substitute_in_operator(&mut merge_rel.input, params)?;
845        }
846        LogicalOperator::AddLabel(add_label) => {
847            substitute_in_operator(&mut add_label.input, params)?;
848        }
849        LogicalOperator::RemoveLabel(remove_label) => {
850            substitute_in_operator(&mut remove_label.input, params)?;
851        }
852        LogicalOperator::ShortestPath(sp) => {
853            substitute_in_operator(&mut sp.input, params)?;
854        }
855        // SPARQL Update operators
856        LogicalOperator::InsertTriple(insert) => {
857            if let Some(ref mut input) = insert.input {
858                substitute_in_operator(input, params)?;
859            }
860        }
861        LogicalOperator::DeleteTriple(delete) => {
862            if let Some(ref mut input) = delete.input {
863                substitute_in_operator(input, params)?;
864            }
865        }
866        LogicalOperator::Modify(modify) => {
867            substitute_in_operator(&mut modify.where_clause, params)?;
868        }
869        LogicalOperator::ClearGraph(_)
870        | LogicalOperator::CreateGraph(_)
871        | LogicalOperator::DropGraph(_)
872        | LogicalOperator::LoadGraph(_)
873        | LogicalOperator::CopyGraph(_)
874        | LogicalOperator::MoveGraph(_)
875        | LogicalOperator::AddGraph(_) => {}
876        LogicalOperator::HorizontalAggregate(op) => {
877            substitute_in_operator(&mut op.input, params)?;
878        }
879        LogicalOperator::Empty => {}
880        LogicalOperator::VectorScan(scan) => {
881            substitute_in_expression(&mut scan.query_vector, params)?;
882            if let Some(ref mut input) = scan.input {
883                substitute_in_operator(input, params)?;
884            }
885        }
886        LogicalOperator::VectorJoin(join) => {
887            substitute_in_expression(&mut join.query_vector, params)?;
888            substitute_in_operator(&mut join.input, params)?;
889        }
890        LogicalOperator::Except(except) => {
891            substitute_in_operator(&mut except.left, params)?;
892            substitute_in_operator(&mut except.right, params)?;
893        }
894        LogicalOperator::Intersect(intersect) => {
895            substitute_in_operator(&mut intersect.left, params)?;
896            substitute_in_operator(&mut intersect.right, params)?;
897        }
898        LogicalOperator::Otherwise(otherwise) => {
899            substitute_in_operator(&mut otherwise.left, params)?;
900            substitute_in_operator(&mut otherwise.right, params)?;
901        }
902        LogicalOperator::Apply(apply) => {
903            substitute_in_operator(&mut apply.input, params)?;
904            substitute_in_operator(&mut apply.subplan, params)?;
905        }
906        // ParameterScan has no expressions to substitute
907        LogicalOperator::ParameterScan(_) => {}
908        LogicalOperator::MultiWayJoin(mwj) => {
909            for input in &mut mwj.inputs {
910                substitute_in_operator(input, params)?;
911            }
912            for cond in &mut mwj.conditions {
913                substitute_in_expression(&mut cond.left, params)?;
914                substitute_in_expression(&mut cond.right, params)?;
915            }
916        }
917        // DDL operators have no expressions to substitute
918        LogicalOperator::CreatePropertyGraph(_) => {}
919        // Procedure calls: arguments could contain parameters but we handle at execution time
920        LogicalOperator::CallProcedure(_) => {}
921        // LoadData: file path is a literal, no parameter substitution needed
922        LogicalOperator::LoadData(_) => {}
923    }
924    Ok(())
925}
926
927/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
928fn resolve_count_param(
929    count: &mut crate::query::plan::CountExpr,
930    params: &QueryParams,
931) -> Result<()> {
932    use crate::query::plan::CountExpr;
933    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
934
935    if let CountExpr::Parameter(name) = count {
936        let value = params.get(name.as_str()).ok_or_else(|| {
937            Error::Query(QueryError::new(
938                QueryErrorKind::Semantic,
939                format!("Missing parameter for SKIP/LIMIT: ${name}"),
940            ))
941        })?;
942        let n = match value {
943            Value::Int64(i) if *i >= 0 => *i as usize,
944            Value::Int64(i) => {
945                return Err(Error::Query(QueryError::new(
946                    QueryErrorKind::Semantic,
947                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
948                )));
949            }
950            other => {
951                return Err(Error::Query(QueryError::new(
952                    QueryErrorKind::Semantic,
953                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
954                )));
955            }
956        };
957        *count = CountExpr::Literal(n);
958    }
959    Ok(())
960}
961
962/// Substitutes parameters in an expression with their values.
963fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
964    use crate::query::plan::LogicalExpression;
965
966    match expr {
967        LogicalExpression::Parameter(name) => {
968            if let Some(value) = params.get(name) {
969                *expr = LogicalExpression::Literal(value.clone());
970            } else {
971                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
972            }
973        }
974        LogicalExpression::Binary { left, right, .. } => {
975            substitute_in_expression(left, params)?;
976            substitute_in_expression(right, params)?;
977        }
978        LogicalExpression::Unary { operand, .. } => {
979            substitute_in_expression(operand, params)?;
980        }
981        LogicalExpression::FunctionCall { args, .. } => {
982            for arg in args {
983                substitute_in_expression(arg, params)?;
984            }
985        }
986        LogicalExpression::List(items) => {
987            for item in items {
988                substitute_in_expression(item, params)?;
989            }
990        }
991        LogicalExpression::Map(pairs) => {
992            for (_, value) in pairs {
993                substitute_in_expression(value, params)?;
994            }
995        }
996        LogicalExpression::IndexAccess { base, index } => {
997            substitute_in_expression(base, params)?;
998            substitute_in_expression(index, params)?;
999        }
1000        LogicalExpression::SliceAccess { base, start, end } => {
1001            substitute_in_expression(base, params)?;
1002            if let Some(s) = start {
1003                substitute_in_expression(s, params)?;
1004            }
1005            if let Some(e) = end {
1006                substitute_in_expression(e, params)?;
1007            }
1008        }
1009        LogicalExpression::Case {
1010            operand,
1011            when_clauses,
1012            else_clause,
1013        } => {
1014            if let Some(op) = operand {
1015                substitute_in_expression(op, params)?;
1016            }
1017            for (cond, result) in when_clauses {
1018                substitute_in_expression(cond, params)?;
1019                substitute_in_expression(result, params)?;
1020            }
1021            if let Some(el) = else_clause {
1022                substitute_in_expression(el, params)?;
1023            }
1024        }
1025        LogicalExpression::Property { .. }
1026        | LogicalExpression::Variable(_)
1027        | LogicalExpression::Literal(_)
1028        | LogicalExpression::Labels(_)
1029        | LogicalExpression::Type(_)
1030        | LogicalExpression::Id(_) => {}
1031        LogicalExpression::ListComprehension {
1032            list_expr,
1033            filter_expr,
1034            map_expr,
1035            ..
1036        } => {
1037            substitute_in_expression(list_expr, params)?;
1038            if let Some(filter) = filter_expr {
1039                substitute_in_expression(filter, params)?;
1040            }
1041            substitute_in_expression(map_expr, params)?;
1042        }
1043        LogicalExpression::ListPredicate {
1044            list_expr,
1045            predicate,
1046            ..
1047        } => {
1048            substitute_in_expression(list_expr, params)?;
1049            substitute_in_expression(predicate, params)?;
1050        }
1051        LogicalExpression::ExistsSubquery(_)
1052        | LogicalExpression::CountSubquery(_)
1053        | LogicalExpression::ValueSubquery(_) => {
1054            // Subqueries would need recursive parameter substitution
1055        }
1056        LogicalExpression::PatternComprehension { projection, .. } => {
1057            substitute_in_expression(projection, params)?;
1058        }
1059        LogicalExpression::MapProjection { entries, .. } => {
1060            for entry in entries {
1061                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1062                    substitute_in_expression(expr, params)?;
1063                }
1064            }
1065        }
1066        LogicalExpression::Reduce {
1067            initial,
1068            list,
1069            expression,
1070            ..
1071        } => {
1072            substitute_in_expression(initial, params)?;
1073            substitute_in_expression(list, params)?;
1074            substitute_in_expression(expression, params)?;
1075        }
1076    }
1077    Ok(())
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083
1084    #[test]
1085    fn test_query_language_is_lpg() {
1086        #[cfg(feature = "gql")]
1087        assert!(QueryLanguage::Gql.is_lpg());
1088        #[cfg(feature = "cypher")]
1089        assert!(QueryLanguage::Cypher.is_lpg());
1090        #[cfg(feature = "sparql")]
1091        assert!(!QueryLanguage::Sparql.is_lpg());
1092    }
1093
1094    #[test]
1095    fn test_processor_creation() {
1096        let store = Arc::new(LpgStore::new().unwrap());
1097        let processor = QueryProcessor::for_lpg(store);
1098        assert!(processor.lpg_store().node_count() == 0);
1099    }
1100
1101    #[cfg(feature = "gql")]
1102    #[test]
1103    fn test_process_simple_gql() {
1104        let store = Arc::new(LpgStore::new().unwrap());
1105        store.create_node(&["Person"]);
1106        store.create_node(&["Person"]);
1107
1108        let processor = QueryProcessor::for_lpg(store);
1109        let result = processor
1110            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1111            .unwrap();
1112
1113        assert_eq!(result.row_count(), 2);
1114        assert_eq!(result.columns[0], "n");
1115    }
1116
1117    #[cfg(feature = "cypher")]
1118    #[test]
1119    fn test_process_simple_cypher() {
1120        let store = Arc::new(LpgStore::new().unwrap());
1121        store.create_node(&["Person"]);
1122
1123        let processor = QueryProcessor::for_lpg(store);
1124        let result = processor
1125            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1126            .unwrap();
1127
1128        assert_eq!(result.row_count(), 1);
1129    }
1130
1131    #[cfg(feature = "gql")]
1132    #[test]
1133    fn test_process_with_params() {
1134        let store = Arc::new(LpgStore::new().unwrap());
1135        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1136        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1137        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1138
1139        let processor = QueryProcessor::for_lpg(store);
1140
1141        // Query with parameter
1142        let mut params = HashMap::new();
1143        params.insert("min_age".to_string(), Value::Int64(30));
1144
1145        let result = processor
1146            .process(
1147                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1148                QueryLanguage::Gql,
1149                Some(&params),
1150            )
1151            .unwrap();
1152
1153        // Should return 2 people (ages 35 and 45)
1154        assert_eq!(result.row_count(), 2);
1155    }
1156
1157    #[cfg(feature = "gql")]
1158    #[test]
1159    fn test_missing_param_error() {
1160        let store = Arc::new(LpgStore::new().unwrap());
1161        store.create_node(&["Person"]);
1162
1163        let processor = QueryProcessor::for_lpg(store);
1164
1165        // Query with parameter but empty params map (missing the required param)
1166        let params: HashMap<String, Value> = HashMap::new();
1167        let result = processor.process(
1168            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1169            QueryLanguage::Gql,
1170            Some(&params),
1171        );
1172
1173        // Should fail with missing parameter error
1174        assert!(result.is_err());
1175        let err = result.unwrap_err();
1176        assert!(
1177            err.to_string().contains("Missing parameter"),
1178            "Expected 'Missing parameter' error, got: {}",
1179            err
1180        );
1181    }
1182
1183    #[cfg(feature = "gql")]
1184    #[test]
1185    fn test_params_in_filter_with_property() {
1186        // Tests parameter substitution in WHERE clause with property comparison
1187        let store = Arc::new(LpgStore::new().unwrap());
1188        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1189        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1190
1191        let processor = QueryProcessor::for_lpg(store);
1192
1193        let mut params = HashMap::new();
1194        params.insert("threshold".to_string(), Value::Int64(15));
1195
1196        let result = processor
1197            .process(
1198                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1199                QueryLanguage::Gql,
1200                Some(&params),
1201            )
1202            .unwrap();
1203
1204        // Only value=20 matches > 15
1205        assert_eq!(result.row_count(), 1);
1206        let row = &result.rows[0];
1207        assert_eq!(row[0], Value::Int64(20));
1208    }
1209
1210    #[cfg(feature = "gql")]
1211    #[test]
1212    fn test_params_in_multiple_where_conditions() {
1213        // Tests multiple parameters in WHERE clause with AND
1214        let store = Arc::new(LpgStore::new().unwrap());
1215        store.create_node_with_props(
1216            &["Person"],
1217            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1218        );
1219        store.create_node_with_props(
1220            &["Person"],
1221            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1222        );
1223        store.create_node_with_props(
1224            &["Person"],
1225            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1226        );
1227
1228        let processor = QueryProcessor::for_lpg(store);
1229
1230        let mut params = HashMap::new();
1231        params.insert("min_age".to_string(), Value::Int64(30));
1232        params.insert("min_score".to_string(), Value::Int64(75));
1233
1234        let result = processor
1235            .process(
1236                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1237                QueryLanguage::Gql,
1238                Some(&params),
1239            )
1240            .unwrap();
1241
1242        // Only the person with age=35, score=90 matches both conditions
1243        assert_eq!(result.row_count(), 1);
1244    }
1245
1246    #[cfg(feature = "gql")]
1247    #[test]
1248    fn test_params_with_in_list() {
1249        // Tests parameter as a value checked against IN list
1250        let store = Arc::new(LpgStore::new().unwrap());
1251        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1252        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1253        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1254
1255        let processor = QueryProcessor::for_lpg(store);
1256
1257        // Check if a parameter value matches any of the statuses
1258        let mut params = HashMap::new();
1259        params.insert("target".to_string(), Value::String("active".into()));
1260
1261        let result = processor
1262            .process(
1263                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1264                QueryLanguage::Gql,
1265                Some(&params),
1266            )
1267            .unwrap();
1268
1269        assert_eq!(result.row_count(), 1);
1270    }
1271
1272    #[cfg(feature = "gql")]
1273    #[test]
1274    fn test_params_same_type_comparison() {
1275        // Tests that same-type parameter comparisons work correctly
1276        let store = Arc::new(LpgStore::new().unwrap());
1277        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1278        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1279
1280        let processor = QueryProcessor::for_lpg(store);
1281
1282        // Compare int property with int parameter
1283        let mut params = HashMap::new();
1284        params.insert("threshold".to_string(), Value::Int64(75));
1285
1286        let result = processor
1287            .process(
1288                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1289                QueryLanguage::Gql,
1290                Some(&params),
1291            )
1292            .unwrap();
1293
1294        // Only value=100 matches > 75
1295        assert_eq!(result.row_count(), 1);
1296    }
1297
1298    #[cfg(feature = "gql")]
1299    #[test]
1300    fn test_process_empty_result_has_columns() {
1301        // Tests that empty results still have correct column names
1302        let store = Arc::new(LpgStore::new().unwrap());
1303        // Don't create any nodes
1304
1305        let processor = QueryProcessor::for_lpg(store);
1306        let result = processor
1307            .process(
1308                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1309                QueryLanguage::Gql,
1310                None,
1311            )
1312            .unwrap();
1313
1314        assert_eq!(result.row_count(), 0);
1315        assert_eq!(result.columns.len(), 2);
1316        assert_eq!(result.columns[0], "name");
1317        assert_eq!(result.columns[1], "age");
1318    }
1319
1320    #[cfg(feature = "gql")]
1321    #[test]
1322    fn test_params_string_equality() {
1323        // Tests string parameter equality comparison
1324        let store = Arc::new(LpgStore::new().unwrap());
1325        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1326        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1327        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1328
1329        let processor = QueryProcessor::for_lpg(store);
1330
1331        let mut params = HashMap::new();
1332        params.insert("target".to_string(), Value::String("beta".into()));
1333
1334        let result = processor
1335            .process(
1336                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1337                QueryLanguage::Gql,
1338                Some(&params),
1339            )
1340            .unwrap();
1341
1342        assert_eq!(result.row_count(), 1);
1343        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1344    }
1345}