Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    transaction_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    transaction_context: Option<(EpochId, TransactionId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            transaction_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            transaction_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_transaction(
136        store: Arc<LpgStore>,
137        transaction_manager: Arc<TransactionManager>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&store);
140        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141        Self {
142            lpg_store: store,
143            graph_store,
144            transaction_manager,
145            catalog: Arc::new(Catalog::new()),
146            optimizer,
147            transaction_context: None,
148            #[cfg(feature = "rdf")]
149            rdf_store: None,
150        }
151    }
152
153    /// Creates a query processor backed by any `GraphStoreMut` implementation.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the internal arena allocation fails (out of memory).
158    pub fn for_graph_store_with_transaction(
159        store: Arc<dyn GraphStoreMut>,
160        transaction_manager: Arc<TransactionManager>,
161    ) -> Result<Self> {
162        let optimizer = Optimizer::from_graph_store(&*store);
163        Ok(Self {
164            lpg_store: Arc::new(LpgStore::new()?),
165            graph_store: store,
166            transaction_manager,
167            catalog: Arc::new(Catalog::new()),
168            optimizer,
169            transaction_context: None,
170            #[cfg(feature = "rdf")]
171            rdf_store: None,
172        })
173    }
174
175    /// Sets the transaction context for MVCC visibility.
176    ///
177    /// This should be called when the processor is used within a transaction.
178    #[must_use]
179    pub fn with_transaction_context(
180        mut self,
181        viewing_epoch: EpochId,
182        transaction_id: TransactionId,
183    ) -> Self {
184        self.transaction_context = Some((viewing_epoch, transaction_id));
185        self
186    }
187
188    /// Sets a custom catalog.
189    #[must_use]
190    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
191        self.catalog = catalog;
192        self
193    }
194
195    /// Sets a custom optimizer.
196    #[must_use]
197    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
198        self.optimizer = optimizer;
199        self
200    }
201
202    /// Processes a query string and returns results.
203    ///
204    /// Pipeline:
205    /// 1. Parse (language-specific parser → AST)
206    /// 2. Translate (AST → LogicalPlan)
207    /// 3. Bind (semantic validation)
208    /// 4. Optimize (filter pushdown, join reorder, etc.)
209    /// 5. Plan (logical → physical operators)
210    /// 6. Execute (run operators, collect results)
211    ///
212    /// # Arguments
213    ///
214    /// * `query` - The query string
215    /// * `language` - Which query language to use
216    /// * `params` - Optional query parameters for prepared statements
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if any stage of the pipeline fails.
221    pub fn process(
222        &self,
223        query: &str,
224        language: QueryLanguage,
225        params: Option<&QueryParams>,
226    ) -> Result<QueryResult> {
227        if language.is_lpg() {
228            self.process_lpg(query, language, params)
229        } else {
230            #[cfg(feature = "rdf")]
231            {
232                self.process_rdf(query, language, params)
233            }
234            #[cfg(not(feature = "rdf"))]
235            {
236                Err(Error::Internal(
237                    "RDF support not enabled. Compile with --features rdf".to_string(),
238                ))
239            }
240        }
241    }
242
243    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
244    fn process_lpg(
245        &self,
246        query: &str,
247        language: QueryLanguage,
248        params: Option<&QueryParams>,
249    ) -> Result<QueryResult> {
250        #[cfg(not(target_arch = "wasm32"))]
251        let start_time = std::time::Instant::now();
252
253        // 1. Parse and translate to logical plan
254        let mut logical_plan = self.translate_lpg(query, language)?;
255
256        // 2. Substitute parameters if provided
257        if let Some(params) = params {
258            substitute_params(&mut logical_plan, params)?;
259        }
260
261        // 3. Semantic validation
262        let mut binder = Binder::new();
263        let _binding_context = binder.bind(&logical_plan)?;
264
265        // 4. Optimize the plan
266        let optimized_plan = self.optimizer.optimize(logical_plan)?;
267
268        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
269        if optimized_plan.explain {
270            let mut plan = optimized_plan;
271            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
272            return Ok(explain_result(&plan));
273        }
274
275        // 5. Convert to physical plan with transaction context
276        // Read-only fast path: safe when no mutations AND no active transaction
277        // (an active transaction may have prior uncommitted writes from earlier statements)
278        let is_read_only =
279            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
280        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
281            Planner::with_context(
282                Arc::clone(&self.graph_store),
283                Arc::clone(&self.transaction_manager),
284                Some(transaction_id),
285                epoch,
286            )
287        } else {
288            Planner::with_context(
289                Arc::clone(&self.graph_store),
290                Arc::clone(&self.transaction_manager),
291                None,
292                self.transaction_manager.current_epoch(),
293            )
294        }
295        .with_read_only(is_read_only);
296        let mut physical_plan = planner.plan(&optimized_plan)?;
297
298        // 6. Execute and collect results
299        let executor = Executor::with_columns(physical_plan.columns.clone());
300        let mut result = executor.execute(physical_plan.operator.as_mut())?;
301
302        // Add execution metrics
303        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
304        #[cfg(not(target_arch = "wasm32"))]
305        {
306            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
307            result.execution_time_ms = Some(elapsed_ms);
308        }
309        result.rows_scanned = Some(rows_scanned);
310
311        Ok(result)
312    }
313
314    /// Translates an LPG query to a logical plan.
315    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
316        let _span = tracing::debug_span!("grafeo::query::parse", ?language).entered();
317        match language {
318            #[cfg(feature = "gql")]
319            QueryLanguage::Gql => {
320                use crate::query::translators::gql;
321                gql::translate(query)
322            }
323            #[cfg(feature = "cypher")]
324            QueryLanguage::Cypher => {
325                use crate::query::translators::cypher;
326                cypher::translate(query)
327            }
328            #[cfg(feature = "gremlin")]
329            QueryLanguage::Gremlin => {
330                use crate::query::translators::gremlin;
331                gremlin::translate(query)
332            }
333            #[cfg(feature = "graphql")]
334            QueryLanguage::GraphQL => {
335                use crate::query::translators::graphql;
336                graphql::translate(query)
337            }
338            #[cfg(feature = "sql-pgq")]
339            QueryLanguage::SqlPgq => {
340                use crate::query::translators::sql_pgq;
341                sql_pgq::translate(query)
342            }
343            #[allow(unreachable_patterns)]
344            _ => Err(Error::Internal(format!(
345                "Language {:?} is not an LPG language",
346                language
347            ))),
348        }
349    }
350
351    /// Returns a reference to the LPG store.
352    #[must_use]
353    pub fn lpg_store(&self) -> &Arc<LpgStore> {
354        &self.lpg_store
355    }
356
357    /// Returns a reference to the catalog.
358    #[must_use]
359    pub fn catalog(&self) -> &Arc<Catalog> {
360        &self.catalog
361    }
362
363    /// Returns a reference to the optimizer.
364    #[must_use]
365    pub fn optimizer(&self) -> &Optimizer {
366        &self.optimizer
367    }
368}
369
370impl QueryProcessor {
371    /// Returns a reference to the transaction manager.
372    #[must_use]
373    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
374        &self.transaction_manager
375    }
376}
377
378// =========================================================================
379// RDF-specific methods (gated behind `rdf` feature)
380// =========================================================================
381
382#[cfg(feature = "rdf")]
383impl QueryProcessor {
384    /// Creates a new query processor with both LPG and RDF stores.
385    #[must_use]
386    pub fn with_rdf(
387        lpg_store: Arc<LpgStore>,
388        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
389    ) -> Self {
390        let optimizer = Optimizer::from_store(&lpg_store);
391        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
392        Self {
393            lpg_store,
394            graph_store,
395            transaction_manager: Arc::new(TransactionManager::new()),
396            catalog: Arc::new(Catalog::new()),
397            optimizer,
398            transaction_context: None,
399            rdf_store: Some(rdf_store),
400        }
401    }
402
403    /// Returns a reference to the RDF store (if configured).
404    #[must_use]
405    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
406        self.rdf_store.as_ref()
407    }
408
409    /// Processes an RDF query (SPARQL, GraphQL-RDF).
410    fn process_rdf(
411        &self,
412        query: &str,
413        language: QueryLanguage,
414        params: Option<&QueryParams>,
415    ) -> Result<QueryResult> {
416        use crate::query::planner::rdf::RdfPlanner;
417
418        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
419            Error::Internal("RDF store not configured for this processor".to_string())
420        })?;
421
422        // 1. Parse and translate to logical plan
423        let mut logical_plan = self.translate_rdf(query, language)?;
424
425        // 2. Substitute parameters if provided
426        if let Some(params) = params {
427            substitute_params(&mut logical_plan, params)?;
428        }
429
430        // 3. Semantic validation
431        let mut binder = Binder::new();
432        let _binding_context = binder.bind(&logical_plan)?;
433
434        // 3. Optimize the plan
435        let optimized_plan = self.optimizer.optimize(logical_plan)?;
436
437        // 3a. EXPLAIN: return the optimized plan tree without executing
438        if optimized_plan.explain {
439            return Ok(explain_result(&optimized_plan));
440        }
441
442        // 4. Convert to physical plan (using RDF planner)
443        let planner = RdfPlanner::new(Arc::clone(rdf_store));
444        let mut physical_plan = planner.plan(&optimized_plan)?;
445
446        // 5. Execute and collect results
447        let executor = Executor::with_columns(physical_plan.columns.clone());
448        executor.execute(physical_plan.operator.as_mut())
449    }
450
451    /// Translates an RDF query to a logical plan.
452    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
453        match language {
454            #[cfg(feature = "sparql")]
455            QueryLanguage::Sparql => {
456                use crate::query::translators::sparql;
457                sparql::translate(query)
458            }
459            #[cfg(all(feature = "graphql", feature = "rdf"))]
460            QueryLanguage::GraphQLRdf => {
461                use crate::query::translators::graphql_rdf;
462                // Default namespace for GraphQL-RDF queries
463                graphql_rdf::translate(query, "http://example.org/")
464            }
465            _ => Err(Error::Internal(format!(
466                "Language {:?} is not an RDF language",
467                language
468            ))),
469        }
470    }
471}
472
473/// Annotates filter operators in the plan with pushdown hints.
474///
475/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
476/// whether a property index exists for equality predicates.
477pub(crate) fn annotate_pushdown_hints(
478    op: &mut LogicalOperator,
479    store: &dyn grafeo_core::graph::GraphStore,
480) {
481    #[allow(clippy::wildcard_imports)]
482    use crate::query::plan::*;
483
484    match op {
485        LogicalOperator::Filter(filter) => {
486            // Recurse into children first
487            annotate_pushdown_hints(&mut filter.input, store);
488
489            // Annotate this filter if it sits on top of a NodeScan
490            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
491                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
492            }
493        }
494        LogicalOperator::NodeScan(op) => {
495            if let Some(input) = &mut op.input {
496                annotate_pushdown_hints(input, store);
497            }
498        }
499        LogicalOperator::EdgeScan(op) => {
500            if let Some(input) = &mut op.input {
501                annotate_pushdown_hints(input, store);
502            }
503        }
504        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
505        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
506        LogicalOperator::Join(op) => {
507            annotate_pushdown_hints(&mut op.left, store);
508            annotate_pushdown_hints(&mut op.right, store);
509        }
510        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
511        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
512        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
513        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
514        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
515        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
516        LogicalOperator::Union(op) => {
517            for input in &mut op.inputs {
518                annotate_pushdown_hints(input, store);
519            }
520        }
521        LogicalOperator::Apply(op) => {
522            annotate_pushdown_hints(&mut op.input, store);
523            annotate_pushdown_hints(&mut op.subplan, store);
524        }
525        LogicalOperator::Otherwise(op) => {
526            annotate_pushdown_hints(&mut op.left, store);
527            annotate_pushdown_hints(&mut op.right, store);
528        }
529        _ => {}
530    }
531}
532
533/// Infers the pushdown strategy for a filter predicate over a node scan.
534fn infer_pushdown(
535    predicate: &LogicalExpression,
536    scan: &crate::query::plan::NodeScanOp,
537    store: &dyn grafeo_core::graph::GraphStore,
538) -> Option<crate::query::plan::PushdownHint> {
539    #[allow(clippy::wildcard_imports)]
540    use crate::query::plan::*;
541
542    match predicate {
543        // Equality: n.prop = value
544        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
545            if let Some(prop) = extract_property_name(left, &scan.variable)
546                .or_else(|| extract_property_name(right, &scan.variable))
547            {
548                if store.has_property_index(&prop) {
549                    return Some(PushdownHint::IndexLookup { property: prop });
550                }
551                if scan.label.is_some() {
552                    return Some(PushdownHint::LabelFirst);
553                }
554            }
555            None
556        }
557        // Range: n.prop > value, n.prop < value, etc.
558        LogicalExpression::Binary {
559            left,
560            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
561            right,
562        } => {
563            if let Some(prop) = extract_property_name(left, &scan.variable)
564                .or_else(|| extract_property_name(right, &scan.variable))
565            {
566                if store.has_property_index(&prop) {
567                    return Some(PushdownHint::RangeScan { property: prop });
568                }
569                if scan.label.is_some() {
570                    return Some(PushdownHint::LabelFirst);
571                }
572            }
573            None
574        }
575        // AND: check the left side (first conjunct) for pushdown
576        LogicalExpression::Binary {
577            left,
578            op: BinaryOp::And,
579            ..
580        } => infer_pushdown(left, scan, store),
581        _ => {
582            // Any other predicate on a labeled scan gets label-first
583            if scan.label.is_some() {
584                Some(PushdownHint::LabelFirst)
585            } else {
586                None
587            }
588        }
589    }
590}
591
592/// Extracts the property name if the expression is `Property { variable, property }`
593/// and the variable matches the scan variable.
594fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
595    if let LogicalExpression::Property { variable, property } = expr
596        && variable == scan_var
597    {
598        Some(property.clone())
599    } else {
600        None
601    }
602}
603
604/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
605pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
606    let tree_text = plan.root.explain_tree();
607    QueryResult {
608        columns: vec!["plan".to_string()],
609        column_types: vec![grafeo_common::types::LogicalType::String],
610        rows: vec![vec![Value::String(tree_text.into())]],
611        execution_time_ms: None,
612        rows_scanned: None,
613        status_message: None,
614        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
615    }
616}
617
618/// Substitutes parameters in a logical plan with their values.
619pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
620    substitute_in_operator(&mut plan.root, params)
621}
622
623/// Recursively substitutes parameters in an operator.
624fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
625    #[allow(clippy::wildcard_imports)]
626    use crate::query::plan::*;
627
628    match op {
629        LogicalOperator::Filter(filter) => {
630            substitute_in_expression(&mut filter.predicate, params)?;
631            substitute_in_operator(&mut filter.input, params)?;
632        }
633        LogicalOperator::Return(ret) => {
634            for item in &mut ret.items {
635                substitute_in_expression(&mut item.expression, params)?;
636            }
637            substitute_in_operator(&mut ret.input, params)?;
638        }
639        LogicalOperator::Project(proj) => {
640            for p in &mut proj.projections {
641                substitute_in_expression(&mut p.expression, params)?;
642            }
643            substitute_in_operator(&mut proj.input, params)?;
644        }
645        LogicalOperator::NodeScan(scan) => {
646            if let Some(input) = &mut scan.input {
647                substitute_in_operator(input, params)?;
648            }
649        }
650        LogicalOperator::EdgeScan(scan) => {
651            if let Some(input) = &mut scan.input {
652                substitute_in_operator(input, params)?;
653            }
654        }
655        LogicalOperator::Expand(expand) => {
656            substitute_in_operator(&mut expand.input, params)?;
657        }
658        LogicalOperator::Join(join) => {
659            substitute_in_operator(&mut join.left, params)?;
660            substitute_in_operator(&mut join.right, params)?;
661            for cond in &mut join.conditions {
662                substitute_in_expression(&mut cond.left, params)?;
663                substitute_in_expression(&mut cond.right, params)?;
664            }
665        }
666        LogicalOperator::LeftJoin(join) => {
667            substitute_in_operator(&mut join.left, params)?;
668            substitute_in_operator(&mut join.right, params)?;
669            if let Some(cond) = &mut join.condition {
670                substitute_in_expression(cond, params)?;
671            }
672        }
673        LogicalOperator::Aggregate(agg) => {
674            for expr in &mut agg.group_by {
675                substitute_in_expression(expr, params)?;
676            }
677            for agg_expr in &mut agg.aggregates {
678                if let Some(expr) = &mut agg_expr.expression {
679                    substitute_in_expression(expr, params)?;
680                }
681            }
682            substitute_in_operator(&mut agg.input, params)?;
683        }
684        LogicalOperator::Sort(sort) => {
685            for key in &mut sort.keys {
686                substitute_in_expression(&mut key.expression, params)?;
687            }
688            substitute_in_operator(&mut sort.input, params)?;
689        }
690        LogicalOperator::Limit(limit) => {
691            resolve_count_param(&mut limit.count, params)?;
692            substitute_in_operator(&mut limit.input, params)?;
693        }
694        LogicalOperator::Skip(skip) => {
695            resolve_count_param(&mut skip.count, params)?;
696            substitute_in_operator(&mut skip.input, params)?;
697        }
698        LogicalOperator::Distinct(distinct) => {
699            substitute_in_operator(&mut distinct.input, params)?;
700        }
701        LogicalOperator::CreateNode(create) => {
702            for (_, expr) in &mut create.properties {
703                substitute_in_expression(expr, params)?;
704            }
705            if let Some(input) = &mut create.input {
706                substitute_in_operator(input, params)?;
707            }
708        }
709        LogicalOperator::CreateEdge(create) => {
710            for (_, expr) in &mut create.properties {
711                substitute_in_expression(expr, params)?;
712            }
713            substitute_in_operator(&mut create.input, params)?;
714        }
715        LogicalOperator::DeleteNode(delete) => {
716            substitute_in_operator(&mut delete.input, params)?;
717        }
718        LogicalOperator::DeleteEdge(delete) => {
719            substitute_in_operator(&mut delete.input, params)?;
720        }
721        LogicalOperator::SetProperty(set) => {
722            for (_, expr) in &mut set.properties {
723                substitute_in_expression(expr, params)?;
724            }
725            substitute_in_operator(&mut set.input, params)?;
726        }
727        LogicalOperator::Union(union) => {
728            for input in &mut union.inputs {
729                substitute_in_operator(input, params)?;
730            }
731        }
732        LogicalOperator::AntiJoin(anti) => {
733            substitute_in_operator(&mut anti.left, params)?;
734            substitute_in_operator(&mut anti.right, params)?;
735        }
736        LogicalOperator::Bind(bind) => {
737            substitute_in_expression(&mut bind.expression, params)?;
738            substitute_in_operator(&mut bind.input, params)?;
739        }
740        LogicalOperator::TripleScan(scan) => {
741            if let Some(input) = &mut scan.input {
742                substitute_in_operator(input, params)?;
743            }
744        }
745        LogicalOperator::Unwind(unwind) => {
746            substitute_in_expression(&mut unwind.expression, params)?;
747            substitute_in_operator(&mut unwind.input, params)?;
748        }
749        LogicalOperator::MapCollect(mc) => {
750            substitute_in_operator(&mut mc.input, params)?;
751        }
752        LogicalOperator::Merge(merge) => {
753            for (_, expr) in &mut merge.match_properties {
754                substitute_in_expression(expr, params)?;
755            }
756            for (_, expr) in &mut merge.on_create {
757                substitute_in_expression(expr, params)?;
758            }
759            for (_, expr) in &mut merge.on_match {
760                substitute_in_expression(expr, params)?;
761            }
762            substitute_in_operator(&mut merge.input, params)?;
763        }
764        LogicalOperator::MergeRelationship(merge_rel) => {
765            for (_, expr) in &mut merge_rel.match_properties {
766                substitute_in_expression(expr, params)?;
767            }
768            for (_, expr) in &mut merge_rel.on_create {
769                substitute_in_expression(expr, params)?;
770            }
771            for (_, expr) in &mut merge_rel.on_match {
772                substitute_in_expression(expr, params)?;
773            }
774            substitute_in_operator(&mut merge_rel.input, params)?;
775        }
776        LogicalOperator::AddLabel(add_label) => {
777            substitute_in_operator(&mut add_label.input, params)?;
778        }
779        LogicalOperator::RemoveLabel(remove_label) => {
780            substitute_in_operator(&mut remove_label.input, params)?;
781        }
782        LogicalOperator::ShortestPath(sp) => {
783            substitute_in_operator(&mut sp.input, params)?;
784        }
785        // SPARQL Update operators
786        LogicalOperator::InsertTriple(insert) => {
787            if let Some(ref mut input) = insert.input {
788                substitute_in_operator(input, params)?;
789            }
790        }
791        LogicalOperator::DeleteTriple(delete) => {
792            if let Some(ref mut input) = delete.input {
793                substitute_in_operator(input, params)?;
794            }
795        }
796        LogicalOperator::Modify(modify) => {
797            substitute_in_operator(&mut modify.where_clause, params)?;
798        }
799        LogicalOperator::ClearGraph(_)
800        | LogicalOperator::CreateGraph(_)
801        | LogicalOperator::DropGraph(_)
802        | LogicalOperator::LoadGraph(_)
803        | LogicalOperator::CopyGraph(_)
804        | LogicalOperator::MoveGraph(_)
805        | LogicalOperator::AddGraph(_) => {}
806        LogicalOperator::HorizontalAggregate(op) => {
807            substitute_in_operator(&mut op.input, params)?;
808        }
809        LogicalOperator::Empty => {}
810        LogicalOperator::VectorScan(scan) => {
811            substitute_in_expression(&mut scan.query_vector, params)?;
812            if let Some(ref mut input) = scan.input {
813                substitute_in_operator(input, params)?;
814            }
815        }
816        LogicalOperator::VectorJoin(join) => {
817            substitute_in_expression(&mut join.query_vector, params)?;
818            substitute_in_operator(&mut join.input, params)?;
819        }
820        LogicalOperator::Except(except) => {
821            substitute_in_operator(&mut except.left, params)?;
822            substitute_in_operator(&mut except.right, params)?;
823        }
824        LogicalOperator::Intersect(intersect) => {
825            substitute_in_operator(&mut intersect.left, params)?;
826            substitute_in_operator(&mut intersect.right, params)?;
827        }
828        LogicalOperator::Otherwise(otherwise) => {
829            substitute_in_operator(&mut otherwise.left, params)?;
830            substitute_in_operator(&mut otherwise.right, params)?;
831        }
832        LogicalOperator::Apply(apply) => {
833            substitute_in_operator(&mut apply.input, params)?;
834            substitute_in_operator(&mut apply.subplan, params)?;
835        }
836        // ParameterScan has no expressions to substitute
837        LogicalOperator::ParameterScan(_) => {}
838        LogicalOperator::MultiWayJoin(mwj) => {
839            for input in &mut mwj.inputs {
840                substitute_in_operator(input, params)?;
841            }
842            for cond in &mut mwj.conditions {
843                substitute_in_expression(&mut cond.left, params)?;
844                substitute_in_expression(&mut cond.right, params)?;
845            }
846        }
847        // DDL operators have no expressions to substitute
848        LogicalOperator::CreatePropertyGraph(_) => {}
849        // Procedure calls: arguments could contain parameters but we handle at execution time
850        LogicalOperator::CallProcedure(_) => {}
851        // LoadData: file path is a literal, no parameter substitution needed
852        LogicalOperator::LoadData(_) => {}
853    }
854    Ok(())
855}
856
857/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
858fn resolve_count_param(
859    count: &mut crate::query::plan::CountExpr,
860    params: &QueryParams,
861) -> Result<()> {
862    use crate::query::plan::CountExpr;
863    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
864
865    if let CountExpr::Parameter(name) = count {
866        let value = params.get(name.as_str()).ok_or_else(|| {
867            Error::Query(QueryError::new(
868                QueryErrorKind::Semantic,
869                format!("Missing parameter for SKIP/LIMIT: ${name}"),
870            ))
871        })?;
872        let n = match value {
873            Value::Int64(i) if *i >= 0 => *i as usize,
874            Value::Int64(i) => {
875                return Err(Error::Query(QueryError::new(
876                    QueryErrorKind::Semantic,
877                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
878                )));
879            }
880            other => {
881                return Err(Error::Query(QueryError::new(
882                    QueryErrorKind::Semantic,
883                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
884                )));
885            }
886        };
887        *count = CountExpr::Literal(n);
888    }
889    Ok(())
890}
891
892/// Substitutes parameters in an expression with their values.
893fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
894    use crate::query::plan::LogicalExpression;
895
896    match expr {
897        LogicalExpression::Parameter(name) => {
898            if let Some(value) = params.get(name) {
899                *expr = LogicalExpression::Literal(value.clone());
900            } else {
901                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
902            }
903        }
904        LogicalExpression::Binary { left, right, .. } => {
905            substitute_in_expression(left, params)?;
906            substitute_in_expression(right, params)?;
907        }
908        LogicalExpression::Unary { operand, .. } => {
909            substitute_in_expression(operand, params)?;
910        }
911        LogicalExpression::FunctionCall { args, .. } => {
912            for arg in args {
913                substitute_in_expression(arg, params)?;
914            }
915        }
916        LogicalExpression::List(items) => {
917            for item in items {
918                substitute_in_expression(item, params)?;
919            }
920        }
921        LogicalExpression::Map(pairs) => {
922            for (_, value) in pairs {
923                substitute_in_expression(value, params)?;
924            }
925        }
926        LogicalExpression::IndexAccess { base, index } => {
927            substitute_in_expression(base, params)?;
928            substitute_in_expression(index, params)?;
929        }
930        LogicalExpression::SliceAccess { base, start, end } => {
931            substitute_in_expression(base, params)?;
932            if let Some(s) = start {
933                substitute_in_expression(s, params)?;
934            }
935            if let Some(e) = end {
936                substitute_in_expression(e, params)?;
937            }
938        }
939        LogicalExpression::Case {
940            operand,
941            when_clauses,
942            else_clause,
943        } => {
944            if let Some(op) = operand {
945                substitute_in_expression(op, params)?;
946            }
947            for (cond, result) in when_clauses {
948                substitute_in_expression(cond, params)?;
949                substitute_in_expression(result, params)?;
950            }
951            if let Some(el) = else_clause {
952                substitute_in_expression(el, params)?;
953            }
954        }
955        LogicalExpression::Property { .. }
956        | LogicalExpression::Variable(_)
957        | LogicalExpression::Literal(_)
958        | LogicalExpression::Labels(_)
959        | LogicalExpression::Type(_)
960        | LogicalExpression::Id(_) => {}
961        LogicalExpression::ListComprehension {
962            list_expr,
963            filter_expr,
964            map_expr,
965            ..
966        } => {
967            substitute_in_expression(list_expr, params)?;
968            if let Some(filter) = filter_expr {
969                substitute_in_expression(filter, params)?;
970            }
971            substitute_in_expression(map_expr, params)?;
972        }
973        LogicalExpression::ListPredicate {
974            list_expr,
975            predicate,
976            ..
977        } => {
978            substitute_in_expression(list_expr, params)?;
979            substitute_in_expression(predicate, params)?;
980        }
981        LogicalExpression::ExistsSubquery(_)
982        | LogicalExpression::CountSubquery(_)
983        | LogicalExpression::ValueSubquery(_) => {
984            // Subqueries would need recursive parameter substitution
985        }
986        LogicalExpression::PatternComprehension { projection, .. } => {
987            substitute_in_expression(projection, params)?;
988        }
989        LogicalExpression::MapProjection { entries, .. } => {
990            for entry in entries {
991                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
992                    substitute_in_expression(expr, params)?;
993                }
994            }
995        }
996        LogicalExpression::Reduce {
997            initial,
998            list,
999            expression,
1000            ..
1001        } => {
1002            substitute_in_expression(initial, params)?;
1003            substitute_in_expression(list, params)?;
1004            substitute_in_expression(expression, params)?;
1005        }
1006    }
1007    Ok(())
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012    use super::*;
1013
1014    #[test]
1015    fn test_query_language_is_lpg() {
1016        #[cfg(feature = "gql")]
1017        assert!(QueryLanguage::Gql.is_lpg());
1018        #[cfg(feature = "cypher")]
1019        assert!(QueryLanguage::Cypher.is_lpg());
1020        #[cfg(feature = "sparql")]
1021        assert!(!QueryLanguage::Sparql.is_lpg());
1022    }
1023
1024    #[test]
1025    fn test_processor_creation() {
1026        let store = Arc::new(LpgStore::new().unwrap());
1027        let processor = QueryProcessor::for_lpg(store);
1028        assert!(processor.lpg_store().node_count() == 0);
1029    }
1030
1031    #[cfg(feature = "gql")]
1032    #[test]
1033    fn test_process_simple_gql() {
1034        let store = Arc::new(LpgStore::new().unwrap());
1035        store.create_node(&["Person"]);
1036        store.create_node(&["Person"]);
1037
1038        let processor = QueryProcessor::for_lpg(store);
1039        let result = processor
1040            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1041            .unwrap();
1042
1043        assert_eq!(result.row_count(), 2);
1044        assert_eq!(result.columns[0], "n");
1045    }
1046
1047    #[cfg(feature = "cypher")]
1048    #[test]
1049    fn test_process_simple_cypher() {
1050        let store = Arc::new(LpgStore::new().unwrap());
1051        store.create_node(&["Person"]);
1052
1053        let processor = QueryProcessor::for_lpg(store);
1054        let result = processor
1055            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1056            .unwrap();
1057
1058        assert_eq!(result.row_count(), 1);
1059    }
1060
1061    #[cfg(feature = "gql")]
1062    #[test]
1063    fn test_process_with_params() {
1064        let store = Arc::new(LpgStore::new().unwrap());
1065        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1066        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1067        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1068
1069        let processor = QueryProcessor::for_lpg(store);
1070
1071        // Query with parameter
1072        let mut params = HashMap::new();
1073        params.insert("min_age".to_string(), Value::Int64(30));
1074
1075        let result = processor
1076            .process(
1077                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1078                QueryLanguage::Gql,
1079                Some(&params),
1080            )
1081            .unwrap();
1082
1083        // Should return 2 people (ages 35 and 45)
1084        assert_eq!(result.row_count(), 2);
1085    }
1086
1087    #[cfg(feature = "gql")]
1088    #[test]
1089    fn test_missing_param_error() {
1090        let store = Arc::new(LpgStore::new().unwrap());
1091        store.create_node(&["Person"]);
1092
1093        let processor = QueryProcessor::for_lpg(store);
1094
1095        // Query with parameter but empty params map (missing the required param)
1096        let params: HashMap<String, Value> = HashMap::new();
1097        let result = processor.process(
1098            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1099            QueryLanguage::Gql,
1100            Some(&params),
1101        );
1102
1103        // Should fail with missing parameter error
1104        assert!(result.is_err());
1105        let err = result.unwrap_err();
1106        assert!(
1107            err.to_string().contains("Missing parameter"),
1108            "Expected 'Missing parameter' error, got: {}",
1109            err
1110        );
1111    }
1112
1113    #[cfg(feature = "gql")]
1114    #[test]
1115    fn test_params_in_filter_with_property() {
1116        // Tests parameter substitution in WHERE clause with property comparison
1117        let store = Arc::new(LpgStore::new().unwrap());
1118        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1119        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1120
1121        let processor = QueryProcessor::for_lpg(store);
1122
1123        let mut params = HashMap::new();
1124        params.insert("threshold".to_string(), Value::Int64(15));
1125
1126        let result = processor
1127            .process(
1128                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1129                QueryLanguage::Gql,
1130                Some(&params),
1131            )
1132            .unwrap();
1133
1134        // Only value=20 matches > 15
1135        assert_eq!(result.row_count(), 1);
1136        let row = &result.rows[0];
1137        assert_eq!(row[0], Value::Int64(20));
1138    }
1139
1140    #[cfg(feature = "gql")]
1141    #[test]
1142    fn test_params_in_multiple_where_conditions() {
1143        // Tests multiple parameters in WHERE clause with AND
1144        let store = Arc::new(LpgStore::new().unwrap());
1145        store.create_node_with_props(
1146            &["Person"],
1147            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1148        );
1149        store.create_node_with_props(
1150            &["Person"],
1151            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1152        );
1153        store.create_node_with_props(
1154            &["Person"],
1155            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1156        );
1157
1158        let processor = QueryProcessor::for_lpg(store);
1159
1160        let mut params = HashMap::new();
1161        params.insert("min_age".to_string(), Value::Int64(30));
1162        params.insert("min_score".to_string(), Value::Int64(75));
1163
1164        let result = processor
1165            .process(
1166                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1167                QueryLanguage::Gql,
1168                Some(&params),
1169            )
1170            .unwrap();
1171
1172        // Only the person with age=35, score=90 matches both conditions
1173        assert_eq!(result.row_count(), 1);
1174    }
1175
1176    #[cfg(feature = "gql")]
1177    #[test]
1178    fn test_params_with_in_list() {
1179        // Tests parameter as a value checked against IN list
1180        let store = Arc::new(LpgStore::new().unwrap());
1181        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1182        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1183        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1184
1185        let processor = QueryProcessor::for_lpg(store);
1186
1187        // Check if a parameter value matches any of the statuses
1188        let mut params = HashMap::new();
1189        params.insert("target".to_string(), Value::String("active".into()));
1190
1191        let result = processor
1192            .process(
1193                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1194                QueryLanguage::Gql,
1195                Some(&params),
1196            )
1197            .unwrap();
1198
1199        assert_eq!(result.row_count(), 1);
1200    }
1201
1202    #[cfg(feature = "gql")]
1203    #[test]
1204    fn test_params_same_type_comparison() {
1205        // Tests that same-type parameter comparisons work correctly
1206        let store = Arc::new(LpgStore::new().unwrap());
1207        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1208        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1209
1210        let processor = QueryProcessor::for_lpg(store);
1211
1212        // Compare int property with int parameter
1213        let mut params = HashMap::new();
1214        params.insert("threshold".to_string(), Value::Int64(75));
1215
1216        let result = processor
1217            .process(
1218                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1219                QueryLanguage::Gql,
1220                Some(&params),
1221            )
1222            .unwrap();
1223
1224        // Only value=100 matches > 75
1225        assert_eq!(result.row_count(), 1);
1226    }
1227
1228    #[cfg(feature = "gql")]
1229    #[test]
1230    fn test_process_empty_result_has_columns() {
1231        // Tests that empty results still have correct column names
1232        let store = Arc::new(LpgStore::new().unwrap());
1233        // Don't create any nodes
1234
1235        let processor = QueryProcessor::for_lpg(store);
1236        let result = processor
1237            .process(
1238                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1239                QueryLanguage::Gql,
1240                None,
1241            )
1242            .unwrap();
1243
1244        assert_eq!(result.row_count(), 0);
1245        assert_eq!(result.columns.len(), 2);
1246        assert_eq!(result.columns[0], "name");
1247        assert_eq!(result.columns[1], "age");
1248    }
1249
1250    #[cfg(feature = "gql")]
1251    #[test]
1252    fn test_params_string_equality() {
1253        // Tests string parameter equality comparison
1254        let store = Arc::new(LpgStore::new().unwrap());
1255        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1256        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1257        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1258
1259        let processor = QueryProcessor::for_lpg(store);
1260
1261        let mut params = HashMap::new();
1262        params.insert("target".to_string(), Value::String("beta".into()));
1263
1264        let result = processor
1265            .process(
1266                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1267                QueryLanguage::Gql,
1268                Some(&params),
1269            )
1270            .unwrap();
1271
1272        assert_eq!(result.row_count(), 1);
1273        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1274    }
1275}