Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    transaction_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    transaction_context: Option<(EpochId, TransactionId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            transaction_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            transaction_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_transaction(
136        store: Arc<LpgStore>,
137        transaction_manager: Arc<TransactionManager>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&store);
140        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141        Self {
142            lpg_store: store,
143            graph_store,
144            transaction_manager,
145            catalog: Arc::new(Catalog::new()),
146            optimizer,
147            transaction_context: None,
148            #[cfg(feature = "rdf")]
149            rdf_store: None,
150        }
151    }
152
153    /// Creates a query processor backed by any GraphStoreMut implementation.
154    #[must_use]
155    pub fn for_graph_store_with_transaction(
156        store: Arc<dyn GraphStoreMut>,
157        transaction_manager: Arc<TransactionManager>,
158    ) -> Self {
159        let optimizer = Optimizer::from_graph_store(&*store);
160        Self {
161            lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), // dummy, not used
162            graph_store: store,
163            transaction_manager,
164            catalog: Arc::new(Catalog::new()),
165            optimizer,
166            transaction_context: None,
167            #[cfg(feature = "rdf")]
168            rdf_store: None,
169        }
170    }
171
172    /// Creates a new query processor with both LPG and RDF stores.
173    #[cfg(feature = "rdf")]
174    #[must_use]
175    pub fn with_rdf(
176        lpg_store: Arc<LpgStore>,
177        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
178    ) -> Self {
179        let optimizer = Optimizer::from_store(&lpg_store);
180        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
181        Self {
182            lpg_store,
183            graph_store,
184            transaction_manager: Arc::new(TransactionManager::new()),
185            catalog: Arc::new(Catalog::new()),
186            optimizer,
187            transaction_context: None,
188            rdf_store: Some(rdf_store),
189        }
190    }
191
192    /// Sets the transaction context for MVCC visibility.
193    ///
194    /// This should be called when the processor is used within a transaction.
195    #[must_use]
196    pub fn with_transaction_context(
197        mut self,
198        viewing_epoch: EpochId,
199        transaction_id: TransactionId,
200    ) -> Self {
201        self.transaction_context = Some((viewing_epoch, transaction_id));
202        self
203    }
204
205    /// Sets a custom catalog.
206    #[must_use]
207    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
208        self.catalog = catalog;
209        self
210    }
211
212    /// Sets a custom optimizer.
213    #[must_use]
214    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
215        self.optimizer = optimizer;
216        self
217    }
218
219    /// Processes a query string and returns results.
220    ///
221    /// Pipeline:
222    /// 1. Parse (language-specific parser → AST)
223    /// 2. Translate (AST → LogicalPlan)
224    /// 3. Bind (semantic validation)
225    /// 4. Optimize (filter pushdown, join reorder, etc.)
226    /// 5. Plan (logical → physical operators)
227    /// 6. Execute (run operators, collect results)
228    ///
229    /// # Arguments
230    ///
231    /// * `query` - The query string
232    /// * `language` - Which query language to use
233    /// * `params` - Optional query parameters for prepared statements
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if any stage of the pipeline fails.
238    pub fn process(
239        &self,
240        query: &str,
241        language: QueryLanguage,
242        params: Option<&QueryParams>,
243    ) -> Result<QueryResult> {
244        if language.is_lpg() {
245            self.process_lpg(query, language, params)
246        } else {
247            #[cfg(feature = "rdf")]
248            {
249                self.process_rdf(query, language, params)
250            }
251            #[cfg(not(feature = "rdf"))]
252            {
253                Err(Error::Internal(
254                    "RDF support not enabled. Compile with --features rdf".to_string(),
255                ))
256            }
257        }
258    }
259
260    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
261    fn process_lpg(
262        &self,
263        query: &str,
264        language: QueryLanguage,
265        params: Option<&QueryParams>,
266    ) -> Result<QueryResult> {
267        #[cfg(not(target_arch = "wasm32"))]
268        let start_time = std::time::Instant::now();
269
270        // 1. Parse and translate to logical plan
271        let mut logical_plan = self.translate_lpg(query, language)?;
272
273        // 2. Substitute parameters if provided
274        if let Some(params) = params {
275            substitute_params(&mut logical_plan, params)?;
276        }
277
278        // 3. Semantic validation
279        let mut binder = Binder::new();
280        let _binding_context = binder.bind(&logical_plan)?;
281
282        // 4. Optimize the plan
283        let optimized_plan = self.optimizer.optimize(logical_plan)?;
284
285        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
286        if optimized_plan.explain {
287            let mut plan = optimized_plan;
288            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
289            return Ok(explain_result(&plan));
290        }
291
292        // 5. Convert to physical plan with transaction context
293        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
294            Planner::with_context(
295                Arc::clone(&self.graph_store),
296                Arc::clone(&self.transaction_manager),
297                Some(transaction_id),
298                epoch,
299            )
300        } else {
301            Planner::with_context(
302                Arc::clone(&self.graph_store),
303                Arc::clone(&self.transaction_manager),
304                None,
305                self.transaction_manager.current_epoch(),
306            )
307        };
308        let mut physical_plan = planner.plan(&optimized_plan)?;
309
310        // 6. Execute and collect results
311        let executor = Executor::with_columns(physical_plan.columns.clone());
312        let mut result = executor.execute(physical_plan.operator.as_mut())?;
313
314        // Add execution metrics
315        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
316        #[cfg(not(target_arch = "wasm32"))]
317        {
318            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
319            result.execution_time_ms = Some(elapsed_ms);
320        }
321        result.rows_scanned = Some(rows_scanned);
322
323        Ok(result)
324    }
325
326    /// Translates an LPG query to a logical plan.
327    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
328        match language {
329            #[cfg(feature = "gql")]
330            QueryLanguage::Gql => {
331                use crate::query::translators::gql;
332                gql::translate(query)
333            }
334            #[cfg(feature = "cypher")]
335            QueryLanguage::Cypher => {
336                use crate::query::translators::cypher;
337                cypher::translate(query)
338            }
339            #[cfg(feature = "gremlin")]
340            QueryLanguage::Gremlin => {
341                use crate::query::translators::gremlin;
342                gremlin::translate(query)
343            }
344            #[cfg(feature = "graphql")]
345            QueryLanguage::GraphQL => {
346                use crate::query::translators::graphql;
347                graphql::translate(query)
348            }
349            #[cfg(feature = "sql-pgq")]
350            QueryLanguage::SqlPgq => {
351                use crate::query::translators::sql_pgq;
352                sql_pgq::translate(query)
353            }
354            #[allow(unreachable_patterns)]
355            _ => Err(Error::Internal(format!(
356                "Language {:?} is not an LPG language",
357                language
358            ))),
359        }
360    }
361
362    /// Processes an RDF query (SPARQL, GraphQL-RDF).
363    #[cfg(feature = "rdf")]
364    fn process_rdf(
365        &self,
366        query: &str,
367        language: QueryLanguage,
368        _params: Option<&QueryParams>,
369    ) -> Result<QueryResult> {
370        use crate::query::planner::rdf::RdfPlanner;
371
372        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
373            Error::Internal("RDF store not configured for this processor".to_string())
374        })?;
375
376        // 1. Parse and translate to logical plan
377        let logical_plan = self.translate_rdf(query, language)?;
378
379        // 2. Semantic validation
380        let mut binder = Binder::new();
381        let _binding_context = binder.bind(&logical_plan)?;
382
383        // 3. Optimize the plan
384        let optimized_plan = self.optimizer.optimize(logical_plan)?;
385
386        // 3a. EXPLAIN: return the optimized plan tree without executing
387        if optimized_plan.explain {
388            return Ok(explain_result(&optimized_plan));
389        }
390
391        // 4. Convert to physical plan (using RDF planner)
392        let planner = RdfPlanner::new(Arc::clone(rdf_store));
393        let mut physical_plan = planner.plan(&optimized_plan)?;
394
395        // 5. Execute and collect results
396        let executor = Executor::with_columns(physical_plan.columns.clone());
397        executor.execute(physical_plan.operator.as_mut())
398    }
399
400    /// Translates an RDF query to a logical plan.
401    #[cfg(feature = "rdf")]
402    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
403        match language {
404            #[cfg(feature = "sparql")]
405            QueryLanguage::Sparql => {
406                use crate::query::translators::sparql;
407                sparql::translate(query)
408            }
409            #[cfg(all(feature = "graphql", feature = "rdf"))]
410            QueryLanguage::GraphQLRdf => {
411                use crate::query::translators::graphql_rdf;
412                // Default namespace for GraphQL-RDF queries
413                graphql_rdf::translate(query, "http://example.org/")
414            }
415            _ => Err(Error::Internal(format!(
416                "Language {:?} is not an RDF language",
417                language
418            ))),
419        }
420    }
421
422    /// Returns a reference to the LPG store.
423    #[must_use]
424    pub fn lpg_store(&self) -> &Arc<LpgStore> {
425        &self.lpg_store
426    }
427
428    /// Returns a reference to the catalog.
429    #[must_use]
430    pub fn catalog(&self) -> &Arc<Catalog> {
431        &self.catalog
432    }
433
434    /// Returns a reference to the optimizer.
435    #[must_use]
436    pub fn optimizer(&self) -> &Optimizer {
437        &self.optimizer
438    }
439
440    /// Returns a reference to the RDF store (if configured).
441    #[cfg(feature = "rdf")]
442    #[must_use]
443    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
444        self.rdf_store.as_ref()
445    }
446}
447
448impl QueryProcessor {
449    /// Returns a reference to the transaction manager.
450    #[must_use]
451    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
452        &self.transaction_manager
453    }
454}
455
456/// Annotates filter operators in the plan with pushdown hints.
457///
458/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
459/// whether a property index exists for equality predicates.
460pub(crate) fn annotate_pushdown_hints(
461    op: &mut LogicalOperator,
462    store: &dyn grafeo_core::graph::GraphStore,
463) {
464    use crate::query::plan::*;
465
466    match op {
467        LogicalOperator::Filter(filter) => {
468            // Recurse into children first
469            annotate_pushdown_hints(&mut filter.input, store);
470
471            // Annotate this filter if it sits on top of a NodeScan
472            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
473                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
474            }
475        }
476        LogicalOperator::NodeScan(op) => {
477            if let Some(input) = &mut op.input {
478                annotate_pushdown_hints(input, store);
479            }
480        }
481        LogicalOperator::EdgeScan(op) => {
482            if let Some(input) = &mut op.input {
483                annotate_pushdown_hints(input, store);
484            }
485        }
486        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
487        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
488        LogicalOperator::Join(op) => {
489            annotate_pushdown_hints(&mut op.left, store);
490            annotate_pushdown_hints(&mut op.right, store);
491        }
492        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
493        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
494        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
495        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
496        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
497        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
498        LogicalOperator::Union(op) => {
499            for input in &mut op.inputs {
500                annotate_pushdown_hints(input, store);
501            }
502        }
503        LogicalOperator::Apply(op) => {
504            annotate_pushdown_hints(&mut op.input, store);
505            annotate_pushdown_hints(&mut op.subplan, store);
506        }
507        LogicalOperator::Otherwise(op) => {
508            annotate_pushdown_hints(&mut op.left, store);
509            annotate_pushdown_hints(&mut op.right, store);
510        }
511        _ => {}
512    }
513}
514
515/// Infers the pushdown strategy for a filter predicate over a node scan.
516fn infer_pushdown(
517    predicate: &LogicalExpression,
518    scan: &crate::query::plan::NodeScanOp,
519    store: &dyn grafeo_core::graph::GraphStore,
520) -> Option<crate::query::plan::PushdownHint> {
521    use crate::query::plan::*;
522
523    match predicate {
524        // Equality: n.prop = value
525        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
526            if let Some(prop) = extract_property_name(left, &scan.variable)
527                .or_else(|| extract_property_name(right, &scan.variable))
528            {
529                if store.has_property_index(&prop) {
530                    return Some(PushdownHint::IndexLookup { property: prop });
531                }
532                if scan.label.is_some() {
533                    return Some(PushdownHint::LabelFirst);
534                }
535            }
536            None
537        }
538        // Range: n.prop > value, n.prop < value, etc.
539        LogicalExpression::Binary {
540            left,
541            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
542            right,
543        } => {
544            if let Some(prop) = extract_property_name(left, &scan.variable)
545                .or_else(|| extract_property_name(right, &scan.variable))
546            {
547                if store.has_property_index(&prop) {
548                    return Some(PushdownHint::RangeScan { property: prop });
549                }
550                if scan.label.is_some() {
551                    return Some(PushdownHint::LabelFirst);
552                }
553            }
554            None
555        }
556        // AND: check the left side (first conjunct) for pushdown
557        LogicalExpression::Binary {
558            left,
559            op: BinaryOp::And,
560            ..
561        } => infer_pushdown(left, scan, store),
562        _ => {
563            // Any other predicate on a labeled scan gets label-first
564            if scan.label.is_some() {
565                Some(PushdownHint::LabelFirst)
566            } else {
567                None
568            }
569        }
570    }
571}
572
573/// Extracts the property name if the expression is `Property { variable, property }`
574/// and the variable matches the scan variable.
575fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
576    if let LogicalExpression::Property { variable, property } = expr
577        && variable == scan_var
578    {
579        Some(property.clone())
580    } else {
581        None
582    }
583}
584
585/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
586pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
587    let tree_text = plan.root.explain_tree();
588    QueryResult {
589        columns: vec!["plan".to_string()],
590        column_types: vec![grafeo_common::types::LogicalType::String],
591        rows: vec![vec![Value::String(tree_text.into())]],
592        execution_time_ms: None,
593        rows_scanned: None,
594        status_message: None,
595        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
596    }
597}
598
599/// Substitutes parameters in a logical plan with their values.
600fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
601    substitute_in_operator(&mut plan.root, params)
602}
603
604/// Recursively substitutes parameters in an operator.
605fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
606    use crate::query::plan::*;
607
608    match op {
609        LogicalOperator::Filter(filter) => {
610            substitute_in_expression(&mut filter.predicate, params)?;
611            substitute_in_operator(&mut filter.input, params)?;
612        }
613        LogicalOperator::Return(ret) => {
614            for item in &mut ret.items {
615                substitute_in_expression(&mut item.expression, params)?;
616            }
617            substitute_in_operator(&mut ret.input, params)?;
618        }
619        LogicalOperator::Project(proj) => {
620            for p in &mut proj.projections {
621                substitute_in_expression(&mut p.expression, params)?;
622            }
623            substitute_in_operator(&mut proj.input, params)?;
624        }
625        LogicalOperator::NodeScan(scan) => {
626            if let Some(input) = &mut scan.input {
627                substitute_in_operator(input, params)?;
628            }
629        }
630        LogicalOperator::EdgeScan(scan) => {
631            if let Some(input) = &mut scan.input {
632                substitute_in_operator(input, params)?;
633            }
634        }
635        LogicalOperator::Expand(expand) => {
636            substitute_in_operator(&mut expand.input, params)?;
637        }
638        LogicalOperator::Join(join) => {
639            substitute_in_operator(&mut join.left, params)?;
640            substitute_in_operator(&mut join.right, params)?;
641            for cond in &mut join.conditions {
642                substitute_in_expression(&mut cond.left, params)?;
643                substitute_in_expression(&mut cond.right, params)?;
644            }
645        }
646        LogicalOperator::LeftJoin(join) => {
647            substitute_in_operator(&mut join.left, params)?;
648            substitute_in_operator(&mut join.right, params)?;
649            if let Some(cond) = &mut join.condition {
650                substitute_in_expression(cond, params)?;
651            }
652        }
653        LogicalOperator::Aggregate(agg) => {
654            for expr in &mut agg.group_by {
655                substitute_in_expression(expr, params)?;
656            }
657            for agg_expr in &mut agg.aggregates {
658                if let Some(expr) = &mut agg_expr.expression {
659                    substitute_in_expression(expr, params)?;
660                }
661            }
662            substitute_in_operator(&mut agg.input, params)?;
663        }
664        LogicalOperator::Sort(sort) => {
665            for key in &mut sort.keys {
666                substitute_in_expression(&mut key.expression, params)?;
667            }
668            substitute_in_operator(&mut sort.input, params)?;
669        }
670        LogicalOperator::Limit(limit) => {
671            resolve_count_param(&mut limit.count, params)?;
672            substitute_in_operator(&mut limit.input, params)?;
673        }
674        LogicalOperator::Skip(skip) => {
675            resolve_count_param(&mut skip.count, params)?;
676            substitute_in_operator(&mut skip.input, params)?;
677        }
678        LogicalOperator::Distinct(distinct) => {
679            substitute_in_operator(&mut distinct.input, params)?;
680        }
681        LogicalOperator::CreateNode(create) => {
682            for (_, expr) in &mut create.properties {
683                substitute_in_expression(expr, params)?;
684            }
685            if let Some(input) = &mut create.input {
686                substitute_in_operator(input, params)?;
687            }
688        }
689        LogicalOperator::CreateEdge(create) => {
690            for (_, expr) in &mut create.properties {
691                substitute_in_expression(expr, params)?;
692            }
693            substitute_in_operator(&mut create.input, params)?;
694        }
695        LogicalOperator::DeleteNode(delete) => {
696            substitute_in_operator(&mut delete.input, params)?;
697        }
698        LogicalOperator::DeleteEdge(delete) => {
699            substitute_in_operator(&mut delete.input, params)?;
700        }
701        LogicalOperator::SetProperty(set) => {
702            for (_, expr) in &mut set.properties {
703                substitute_in_expression(expr, params)?;
704            }
705            substitute_in_operator(&mut set.input, params)?;
706        }
707        LogicalOperator::Union(union) => {
708            for input in &mut union.inputs {
709                substitute_in_operator(input, params)?;
710            }
711        }
712        LogicalOperator::AntiJoin(anti) => {
713            substitute_in_operator(&mut anti.left, params)?;
714            substitute_in_operator(&mut anti.right, params)?;
715        }
716        LogicalOperator::Bind(bind) => {
717            substitute_in_expression(&mut bind.expression, params)?;
718            substitute_in_operator(&mut bind.input, params)?;
719        }
720        LogicalOperator::TripleScan(scan) => {
721            if let Some(input) = &mut scan.input {
722                substitute_in_operator(input, params)?;
723            }
724        }
725        LogicalOperator::Unwind(unwind) => {
726            substitute_in_expression(&mut unwind.expression, params)?;
727            substitute_in_operator(&mut unwind.input, params)?;
728        }
729        LogicalOperator::MapCollect(mc) => {
730            substitute_in_operator(&mut mc.input, params)?;
731        }
732        LogicalOperator::Merge(merge) => {
733            for (_, expr) in &mut merge.match_properties {
734                substitute_in_expression(expr, params)?;
735            }
736            for (_, expr) in &mut merge.on_create {
737                substitute_in_expression(expr, params)?;
738            }
739            for (_, expr) in &mut merge.on_match {
740                substitute_in_expression(expr, params)?;
741            }
742            substitute_in_operator(&mut merge.input, params)?;
743        }
744        LogicalOperator::MergeRelationship(merge_rel) => {
745            for (_, expr) in &mut merge_rel.match_properties {
746                substitute_in_expression(expr, params)?;
747            }
748            for (_, expr) in &mut merge_rel.on_create {
749                substitute_in_expression(expr, params)?;
750            }
751            for (_, expr) in &mut merge_rel.on_match {
752                substitute_in_expression(expr, params)?;
753            }
754            substitute_in_operator(&mut merge_rel.input, params)?;
755        }
756        LogicalOperator::AddLabel(add_label) => {
757            substitute_in_operator(&mut add_label.input, params)?;
758        }
759        LogicalOperator::RemoveLabel(remove_label) => {
760            substitute_in_operator(&mut remove_label.input, params)?;
761        }
762        LogicalOperator::ShortestPath(sp) => {
763            substitute_in_operator(&mut sp.input, params)?;
764        }
765        // SPARQL Update operators
766        LogicalOperator::InsertTriple(insert) => {
767            if let Some(ref mut input) = insert.input {
768                substitute_in_operator(input, params)?;
769            }
770        }
771        LogicalOperator::DeleteTriple(delete) => {
772            if let Some(ref mut input) = delete.input {
773                substitute_in_operator(input, params)?;
774            }
775        }
776        LogicalOperator::Modify(modify) => {
777            substitute_in_operator(&mut modify.where_clause, params)?;
778        }
779        LogicalOperator::ClearGraph(_)
780        | LogicalOperator::CreateGraph(_)
781        | LogicalOperator::DropGraph(_)
782        | LogicalOperator::LoadGraph(_)
783        | LogicalOperator::CopyGraph(_)
784        | LogicalOperator::MoveGraph(_)
785        | LogicalOperator::AddGraph(_) => {}
786        LogicalOperator::HorizontalAggregate(op) => {
787            substitute_in_operator(&mut op.input, params)?;
788        }
789        LogicalOperator::Empty => {}
790        LogicalOperator::VectorScan(scan) => {
791            substitute_in_expression(&mut scan.query_vector, params)?;
792            if let Some(ref mut input) = scan.input {
793                substitute_in_operator(input, params)?;
794            }
795        }
796        LogicalOperator::VectorJoin(join) => {
797            substitute_in_expression(&mut join.query_vector, params)?;
798            substitute_in_operator(&mut join.input, params)?;
799        }
800        LogicalOperator::Except(except) => {
801            substitute_in_operator(&mut except.left, params)?;
802            substitute_in_operator(&mut except.right, params)?;
803        }
804        LogicalOperator::Intersect(intersect) => {
805            substitute_in_operator(&mut intersect.left, params)?;
806            substitute_in_operator(&mut intersect.right, params)?;
807        }
808        LogicalOperator::Otherwise(otherwise) => {
809            substitute_in_operator(&mut otherwise.left, params)?;
810            substitute_in_operator(&mut otherwise.right, params)?;
811        }
812        LogicalOperator::Apply(apply) => {
813            substitute_in_operator(&mut apply.input, params)?;
814            substitute_in_operator(&mut apply.subplan, params)?;
815        }
816        // ParameterScan has no expressions to substitute
817        LogicalOperator::ParameterScan(_) => {}
818        LogicalOperator::MultiWayJoin(mwj) => {
819            for input in &mut mwj.inputs {
820                substitute_in_operator(input, params)?;
821            }
822            for cond in &mut mwj.conditions {
823                substitute_in_expression(&mut cond.left, params)?;
824                substitute_in_expression(&mut cond.right, params)?;
825            }
826        }
827        // DDL operators have no expressions to substitute
828        LogicalOperator::CreatePropertyGraph(_) => {}
829        // Procedure calls: arguments could contain parameters but we handle at execution time
830        LogicalOperator::CallProcedure(_) => {}
831    }
832    Ok(())
833}
834
835/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
836fn resolve_count_param(
837    count: &mut crate::query::plan::CountExpr,
838    params: &QueryParams,
839) -> Result<()> {
840    use crate::query::plan::CountExpr;
841    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
842
843    if let CountExpr::Parameter(name) = count {
844        let value = params.get(name.as_str()).ok_or_else(|| {
845            Error::Query(QueryError::new(
846                QueryErrorKind::Semantic,
847                format!("Missing parameter for SKIP/LIMIT: ${name}"),
848            ))
849        })?;
850        let n = match value {
851            Value::Int64(i) if *i >= 0 => *i as usize,
852            Value::Int64(i) => {
853                return Err(Error::Query(QueryError::new(
854                    QueryErrorKind::Semantic,
855                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
856                )));
857            }
858            other => {
859                return Err(Error::Query(QueryError::new(
860                    QueryErrorKind::Semantic,
861                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
862                )));
863            }
864        };
865        *count = CountExpr::Literal(n);
866    }
867    Ok(())
868}
869
870/// Substitutes parameters in an expression with their values.
871fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
872    use crate::query::plan::LogicalExpression;
873
874    match expr {
875        LogicalExpression::Parameter(name) => {
876            if let Some(value) = params.get(name) {
877                *expr = LogicalExpression::Literal(value.clone());
878            } else {
879                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
880            }
881        }
882        LogicalExpression::Binary { left, right, .. } => {
883            substitute_in_expression(left, params)?;
884            substitute_in_expression(right, params)?;
885        }
886        LogicalExpression::Unary { operand, .. } => {
887            substitute_in_expression(operand, params)?;
888        }
889        LogicalExpression::FunctionCall { args, .. } => {
890            for arg in args {
891                substitute_in_expression(arg, params)?;
892            }
893        }
894        LogicalExpression::List(items) => {
895            for item in items {
896                substitute_in_expression(item, params)?;
897            }
898        }
899        LogicalExpression::Map(pairs) => {
900            for (_, value) in pairs {
901                substitute_in_expression(value, params)?;
902            }
903        }
904        LogicalExpression::IndexAccess { base, index } => {
905            substitute_in_expression(base, params)?;
906            substitute_in_expression(index, params)?;
907        }
908        LogicalExpression::SliceAccess { base, start, end } => {
909            substitute_in_expression(base, params)?;
910            if let Some(s) = start {
911                substitute_in_expression(s, params)?;
912            }
913            if let Some(e) = end {
914                substitute_in_expression(e, params)?;
915            }
916        }
917        LogicalExpression::Case {
918            operand,
919            when_clauses,
920            else_clause,
921        } => {
922            if let Some(op) = operand {
923                substitute_in_expression(op, params)?;
924            }
925            for (cond, result) in when_clauses {
926                substitute_in_expression(cond, params)?;
927                substitute_in_expression(result, params)?;
928            }
929            if let Some(el) = else_clause {
930                substitute_in_expression(el, params)?;
931            }
932        }
933        LogicalExpression::Property { .. }
934        | LogicalExpression::Variable(_)
935        | LogicalExpression::Literal(_)
936        | LogicalExpression::Labels(_)
937        | LogicalExpression::Type(_)
938        | LogicalExpression::Id(_) => {}
939        LogicalExpression::ListComprehension {
940            list_expr,
941            filter_expr,
942            map_expr,
943            ..
944        } => {
945            substitute_in_expression(list_expr, params)?;
946            if let Some(filter) = filter_expr {
947                substitute_in_expression(filter, params)?;
948            }
949            substitute_in_expression(map_expr, params)?;
950        }
951        LogicalExpression::ListPredicate {
952            list_expr,
953            predicate,
954            ..
955        } => {
956            substitute_in_expression(list_expr, params)?;
957            substitute_in_expression(predicate, params)?;
958        }
959        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
960            // Subqueries would need recursive parameter substitution
961        }
962        LogicalExpression::PatternComprehension { projection, .. } => {
963            substitute_in_expression(projection, params)?;
964        }
965        LogicalExpression::MapProjection { entries, .. } => {
966            for entry in entries {
967                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
968                    substitute_in_expression(expr, params)?;
969                }
970            }
971        }
972        LogicalExpression::Reduce {
973            initial,
974            list,
975            expression,
976            ..
977        } => {
978            substitute_in_expression(initial, params)?;
979            substitute_in_expression(list, params)?;
980            substitute_in_expression(expression, params)?;
981        }
982    }
983    Ok(())
984}
985
986#[cfg(test)]
987mod tests {
988    use super::*;
989
990    #[test]
991    fn test_query_language_is_lpg() {
992        #[cfg(feature = "gql")]
993        assert!(QueryLanguage::Gql.is_lpg());
994        #[cfg(feature = "cypher")]
995        assert!(QueryLanguage::Cypher.is_lpg());
996        #[cfg(feature = "sparql")]
997        assert!(!QueryLanguage::Sparql.is_lpg());
998    }
999
1000    #[test]
1001    fn test_processor_creation() {
1002        let store = Arc::new(LpgStore::new().unwrap());
1003        let processor = QueryProcessor::for_lpg(store);
1004        assert!(processor.lpg_store().node_count() == 0);
1005    }
1006
1007    #[cfg(feature = "gql")]
1008    #[test]
1009    fn test_process_simple_gql() {
1010        let store = Arc::new(LpgStore::new().unwrap());
1011        store.create_node(&["Person"]);
1012        store.create_node(&["Person"]);
1013
1014        let processor = QueryProcessor::for_lpg(store);
1015        let result = processor
1016            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1017            .unwrap();
1018
1019        assert_eq!(result.row_count(), 2);
1020        assert_eq!(result.columns[0], "n");
1021    }
1022
1023    #[cfg(feature = "cypher")]
1024    #[test]
1025    fn test_process_simple_cypher() {
1026        let store = Arc::new(LpgStore::new().unwrap());
1027        store.create_node(&["Person"]);
1028
1029        let processor = QueryProcessor::for_lpg(store);
1030        let result = processor
1031            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1032            .unwrap();
1033
1034        assert_eq!(result.row_count(), 1);
1035    }
1036
1037    #[cfg(feature = "gql")]
1038    #[test]
1039    fn test_process_with_params() {
1040        let store = Arc::new(LpgStore::new().unwrap());
1041        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1042        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1043        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1044
1045        let processor = QueryProcessor::for_lpg(store);
1046
1047        // Query with parameter
1048        let mut params = HashMap::new();
1049        params.insert("min_age".to_string(), Value::Int64(30));
1050
1051        let result = processor
1052            .process(
1053                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1054                QueryLanguage::Gql,
1055                Some(&params),
1056            )
1057            .unwrap();
1058
1059        // Should return 2 people (ages 35 and 45)
1060        assert_eq!(result.row_count(), 2);
1061    }
1062
1063    #[cfg(feature = "gql")]
1064    #[test]
1065    fn test_missing_param_error() {
1066        let store = Arc::new(LpgStore::new().unwrap());
1067        store.create_node(&["Person"]);
1068
1069        let processor = QueryProcessor::for_lpg(store);
1070
1071        // Query with parameter but empty params map (missing the required param)
1072        let params: HashMap<String, Value> = HashMap::new();
1073        let result = processor.process(
1074            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1075            QueryLanguage::Gql,
1076            Some(&params),
1077        );
1078
1079        // Should fail with missing parameter error
1080        assert!(result.is_err());
1081        let err = result.unwrap_err();
1082        assert!(
1083            err.to_string().contains("Missing parameter"),
1084            "Expected 'Missing parameter' error, got: {}",
1085            err
1086        );
1087    }
1088
1089    #[cfg(feature = "gql")]
1090    #[test]
1091    fn test_params_in_filter_with_property() {
1092        // Tests parameter substitution in WHERE clause with property comparison
1093        let store = Arc::new(LpgStore::new().unwrap());
1094        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1095        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1096
1097        let processor = QueryProcessor::for_lpg(store);
1098
1099        let mut params = HashMap::new();
1100        params.insert("threshold".to_string(), Value::Int64(15));
1101
1102        let result = processor
1103            .process(
1104                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1105                QueryLanguage::Gql,
1106                Some(&params),
1107            )
1108            .unwrap();
1109
1110        // Only value=20 matches > 15
1111        assert_eq!(result.row_count(), 1);
1112        let row = &result.rows[0];
1113        assert_eq!(row[0], Value::Int64(20));
1114    }
1115
1116    #[cfg(feature = "gql")]
1117    #[test]
1118    fn test_params_in_multiple_where_conditions() {
1119        // Tests multiple parameters in WHERE clause with AND
1120        let store = Arc::new(LpgStore::new().unwrap());
1121        store.create_node_with_props(
1122            &["Person"],
1123            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1124        );
1125        store.create_node_with_props(
1126            &["Person"],
1127            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1128        );
1129        store.create_node_with_props(
1130            &["Person"],
1131            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1132        );
1133
1134        let processor = QueryProcessor::for_lpg(store);
1135
1136        let mut params = HashMap::new();
1137        params.insert("min_age".to_string(), Value::Int64(30));
1138        params.insert("min_score".to_string(), Value::Int64(75));
1139
1140        let result = processor
1141            .process(
1142                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1143                QueryLanguage::Gql,
1144                Some(&params),
1145            )
1146            .unwrap();
1147
1148        // Only the person with age=35, score=90 matches both conditions
1149        assert_eq!(result.row_count(), 1);
1150    }
1151
1152    #[cfg(feature = "gql")]
1153    #[test]
1154    fn test_params_with_in_list() {
1155        // Tests parameter as a value checked against IN list
1156        let store = Arc::new(LpgStore::new().unwrap());
1157        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1158        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1159        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1160
1161        let processor = QueryProcessor::for_lpg(store);
1162
1163        // Check if a parameter value matches any of the statuses
1164        let mut params = HashMap::new();
1165        params.insert("target".to_string(), Value::String("active".into()));
1166
1167        let result = processor
1168            .process(
1169                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1170                QueryLanguage::Gql,
1171                Some(&params),
1172            )
1173            .unwrap();
1174
1175        assert_eq!(result.row_count(), 1);
1176    }
1177
1178    #[cfg(feature = "gql")]
1179    #[test]
1180    fn test_params_same_type_comparison() {
1181        // Tests that same-type parameter comparisons work correctly
1182        let store = Arc::new(LpgStore::new().unwrap());
1183        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1184        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1185
1186        let processor = QueryProcessor::for_lpg(store);
1187
1188        // Compare int property with int parameter
1189        let mut params = HashMap::new();
1190        params.insert("threshold".to_string(), Value::Int64(75));
1191
1192        let result = processor
1193            .process(
1194                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1195                QueryLanguage::Gql,
1196                Some(&params),
1197            )
1198            .unwrap();
1199
1200        // Only value=100 matches > 75
1201        assert_eq!(result.row_count(), 1);
1202    }
1203
1204    #[cfg(feature = "gql")]
1205    #[test]
1206    fn test_process_empty_result_has_columns() {
1207        // Tests that empty results still have correct column names
1208        let store = Arc::new(LpgStore::new().unwrap());
1209        // Don't create any nodes
1210
1211        let processor = QueryProcessor::for_lpg(store);
1212        let result = processor
1213            .process(
1214                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1215                QueryLanguage::Gql,
1216                None,
1217            )
1218            .unwrap();
1219
1220        assert_eq!(result.row_count(), 0);
1221        assert_eq!(result.columns.len(), 2);
1222        assert_eq!(result.columns[0], "name");
1223        assert_eq!(result.columns[1], "age");
1224    }
1225
1226    #[cfg(feature = "gql")]
1227    #[test]
1228    fn test_params_string_equality() {
1229        // Tests string parameter equality comparison
1230        let store = Arc::new(LpgStore::new().unwrap());
1231        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1232        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1233        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1234
1235        let processor = QueryProcessor::for_lpg(store);
1236
1237        let mut params = HashMap::new();
1238        params.insert("target".to_string(), Value::String("beta".into()));
1239
1240        let result = processor
1241            .process(
1242                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1243                QueryLanguage::Gql,
1244                Some(&params),
1245            )
1246            .unwrap();
1247
1248        assert_eq!(result.row_count(), 1);
1249        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1250    }
1251}