Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26/// Supported query languages.
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29    /// GQL (ISO/IEC 39075:2024) - default for LPG
30    #[cfg(feature = "gql")]
31    Gql,
32    /// openCypher 9.0
33    #[cfg(feature = "cypher")]
34    Cypher,
35    /// Apache TinkerPop Gremlin
36    #[cfg(feature = "gremlin")]
37    Gremlin,
38    /// GraphQL for LPG
39    #[cfg(feature = "graphql")]
40    GraphQL,
41    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
42    #[cfg(feature = "sql-pgq")]
43    SqlPgq,
44    /// SPARQL 1.1 for RDF
45    #[cfg(feature = "sparql")]
46    Sparql,
47    /// GraphQL for RDF
48    #[cfg(all(feature = "graphql", feature = "rdf"))]
49    GraphQLRdf,
50}
51
52impl QueryLanguage {
53    /// Returns whether this language targets LPG (vs RDF).
54    #[must_use]
55    pub const fn is_lpg(&self) -> bool {
56        match self {
57            #[cfg(feature = "gql")]
58            Self::Gql => true,
59            #[cfg(feature = "cypher")]
60            Self::Cypher => true,
61            #[cfg(feature = "gremlin")]
62            Self::Gremlin => true,
63            #[cfg(feature = "graphql")]
64            Self::GraphQL => true,
65            #[cfg(feature = "sql-pgq")]
66            Self::SqlPgq => true,
67            #[cfg(feature = "sparql")]
68            Self::Sparql => false,
69            #[cfg(all(feature = "graphql", feature = "rdf"))]
70            Self::GraphQLRdf => false,
71        }
72    }
73}
74
75/// Query parameters for prepared statements.
76pub type QueryParams = HashMap<String, Value>;
77
78/// Processes queries through the full pipeline.
79///
80/// The processor holds references to the stores and provides a unified
81/// interface for executing queries in any supported language.
82///
83/// # Example
84///
85/// ```no_run
86/// # use std::sync::Arc;
87/// # use grafeo_core::graph::lpg::LpgStore;
88/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
89///
90/// # fn main() -> grafeo_common::utils::error::Result<()> {
91/// let store = Arc::new(LpgStore::new().unwrap());
92/// let processor = QueryProcessor::for_lpg(store);
93/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
94/// # Ok(())
95/// # }
96/// ```
97pub struct QueryProcessor {
98    /// LPG store for property graph queries.
99    lpg_store: Arc<LpgStore>,
100    /// Graph store trait object for pluggable storage backends.
101    graph_store: Arc<dyn GraphStoreMut>,
102    /// Transaction manager for MVCC operations.
103    transaction_manager: Arc<TransactionManager>,
104    /// Catalog for schema and index metadata.
105    catalog: Arc<Catalog>,
106    /// Query optimizer.
107    optimizer: Optimizer,
108    /// Current transaction context (if any).
109    transaction_context: Option<(EpochId, TransactionId)>,
110    /// RDF store for triple pattern queries (optional).
111    #[cfg(feature = "rdf")]
112    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116    /// Creates a new query processor for LPG queries.
117    #[must_use]
118    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119        let optimizer = Optimizer::from_store(&store);
120        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121        Self {
122            lpg_store: store,
123            graph_store,
124            transaction_manager: Arc::new(TransactionManager::new()),
125            catalog: Arc::new(Catalog::new()),
126            optimizer,
127            transaction_context: None,
128            #[cfg(feature = "rdf")]
129            rdf_store: None,
130        }
131    }
132
133    /// Creates a new query processor with a transaction manager.
134    #[must_use]
135    pub fn for_lpg_with_transaction(
136        store: Arc<LpgStore>,
137        transaction_manager: Arc<TransactionManager>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&store);
140        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141        Self {
142            lpg_store: store,
143            graph_store,
144            transaction_manager,
145            catalog: Arc::new(Catalog::new()),
146            optimizer,
147            transaction_context: None,
148            #[cfg(feature = "rdf")]
149            rdf_store: None,
150        }
151    }
152
153    /// Creates a query processor backed by any `GraphStoreMut` implementation.
154    ///
155    /// # Errors
156    ///
157    /// Returns an error if the internal arena allocation fails (out of memory).
158    pub fn for_graph_store_with_transaction(
159        store: Arc<dyn GraphStoreMut>,
160        transaction_manager: Arc<TransactionManager>,
161    ) -> Result<Self> {
162        let optimizer = Optimizer::from_graph_store(&*store);
163        Ok(Self {
164            lpg_store: Arc::new(LpgStore::new()?),
165            graph_store: store,
166            transaction_manager,
167            catalog: Arc::new(Catalog::new()),
168            optimizer,
169            transaction_context: None,
170            #[cfg(feature = "rdf")]
171            rdf_store: None,
172        })
173    }
174
175    /// Creates a new query processor with both LPG and RDF stores.
176    #[cfg(feature = "rdf")]
177    #[must_use]
178    pub fn with_rdf(
179        lpg_store: Arc<LpgStore>,
180        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
181    ) -> Self {
182        let optimizer = Optimizer::from_store(&lpg_store);
183        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
184        Self {
185            lpg_store,
186            graph_store,
187            transaction_manager: Arc::new(TransactionManager::new()),
188            catalog: Arc::new(Catalog::new()),
189            optimizer,
190            transaction_context: None,
191            rdf_store: Some(rdf_store),
192        }
193    }
194
195    /// Sets the transaction context for MVCC visibility.
196    ///
197    /// This should be called when the processor is used within a transaction.
198    #[must_use]
199    pub fn with_transaction_context(
200        mut self,
201        viewing_epoch: EpochId,
202        transaction_id: TransactionId,
203    ) -> Self {
204        self.transaction_context = Some((viewing_epoch, transaction_id));
205        self
206    }
207
208    /// Sets a custom catalog.
209    #[must_use]
210    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
211        self.catalog = catalog;
212        self
213    }
214
215    /// Sets a custom optimizer.
216    #[must_use]
217    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
218        self.optimizer = optimizer;
219        self
220    }
221
222    /// Processes a query string and returns results.
223    ///
224    /// Pipeline:
225    /// 1. Parse (language-specific parser → AST)
226    /// 2. Translate (AST → LogicalPlan)
227    /// 3. Bind (semantic validation)
228    /// 4. Optimize (filter pushdown, join reorder, etc.)
229    /// 5. Plan (logical → physical operators)
230    /// 6. Execute (run operators, collect results)
231    ///
232    /// # Arguments
233    ///
234    /// * `query` - The query string
235    /// * `language` - Which query language to use
236    /// * `params` - Optional query parameters for prepared statements
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if any stage of the pipeline fails.
241    pub fn process(
242        &self,
243        query: &str,
244        language: QueryLanguage,
245        params: Option<&QueryParams>,
246    ) -> Result<QueryResult> {
247        if language.is_lpg() {
248            self.process_lpg(query, language, params)
249        } else {
250            #[cfg(feature = "rdf")]
251            {
252                self.process_rdf(query, language, params)
253            }
254            #[cfg(not(feature = "rdf"))]
255            {
256                Err(Error::Internal(
257                    "RDF support not enabled. Compile with --features rdf".to_string(),
258                ))
259            }
260        }
261    }
262
263    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
264    fn process_lpg(
265        &self,
266        query: &str,
267        language: QueryLanguage,
268        params: Option<&QueryParams>,
269    ) -> Result<QueryResult> {
270        #[cfg(not(target_arch = "wasm32"))]
271        let start_time = std::time::Instant::now();
272
273        // 1. Parse and translate to logical plan
274        let mut logical_plan = self.translate_lpg(query, language)?;
275
276        // 2. Substitute parameters if provided
277        if let Some(params) = params {
278            substitute_params(&mut logical_plan, params)?;
279        }
280
281        // 3. Semantic validation
282        let mut binder = Binder::new();
283        let _binding_context = binder.bind(&logical_plan)?;
284
285        // 4. Optimize the plan
286        let optimized_plan = self.optimizer.optimize(logical_plan)?;
287
288        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
289        if optimized_plan.explain {
290            let mut plan = optimized_plan;
291            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
292            return Ok(explain_result(&plan));
293        }
294
295        // 5. Convert to physical plan with transaction context
296        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
297            Planner::with_context(
298                Arc::clone(&self.graph_store),
299                Arc::clone(&self.transaction_manager),
300                Some(transaction_id),
301                epoch,
302            )
303        } else {
304            Planner::with_context(
305                Arc::clone(&self.graph_store),
306                Arc::clone(&self.transaction_manager),
307                None,
308                self.transaction_manager.current_epoch(),
309            )
310        };
311        let mut physical_plan = planner.plan(&optimized_plan)?;
312
313        // 6. Execute and collect results
314        let executor = Executor::with_columns(physical_plan.columns.clone());
315        let mut result = executor.execute(physical_plan.operator.as_mut())?;
316
317        // Add execution metrics
318        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
319        #[cfg(not(target_arch = "wasm32"))]
320        {
321            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
322            result.execution_time_ms = Some(elapsed_ms);
323        }
324        result.rows_scanned = Some(rows_scanned);
325
326        Ok(result)
327    }
328
329    /// Translates an LPG query to a logical plan.
330    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
331        match language {
332            #[cfg(feature = "gql")]
333            QueryLanguage::Gql => {
334                use crate::query::translators::gql;
335                gql::translate(query)
336            }
337            #[cfg(feature = "cypher")]
338            QueryLanguage::Cypher => {
339                use crate::query::translators::cypher;
340                cypher::translate(query)
341            }
342            #[cfg(feature = "gremlin")]
343            QueryLanguage::Gremlin => {
344                use crate::query::translators::gremlin;
345                gremlin::translate(query)
346            }
347            #[cfg(feature = "graphql")]
348            QueryLanguage::GraphQL => {
349                use crate::query::translators::graphql;
350                graphql::translate(query)
351            }
352            #[cfg(feature = "sql-pgq")]
353            QueryLanguage::SqlPgq => {
354                use crate::query::translators::sql_pgq;
355                sql_pgq::translate(query)
356            }
357            #[allow(unreachable_patterns)]
358            _ => Err(Error::Internal(format!(
359                "Language {:?} is not an LPG language",
360                language
361            ))),
362        }
363    }
364
365    /// Processes an RDF query (SPARQL, GraphQL-RDF).
366    #[cfg(feature = "rdf")]
367    fn process_rdf(
368        &self,
369        query: &str,
370        language: QueryLanguage,
371        params: Option<&QueryParams>,
372    ) -> Result<QueryResult> {
373        use crate::query::planner::rdf::RdfPlanner;
374
375        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
376            Error::Internal("RDF store not configured for this processor".to_string())
377        })?;
378
379        // 1. Parse and translate to logical plan
380        let mut logical_plan = self.translate_rdf(query, language)?;
381
382        // 2. Substitute parameters if provided
383        if let Some(params) = params {
384            substitute_params(&mut logical_plan, params)?;
385        }
386
387        // 3. Semantic validation
388        let mut binder = Binder::new();
389        let _binding_context = binder.bind(&logical_plan)?;
390
391        // 3. Optimize the plan
392        let optimized_plan = self.optimizer.optimize(logical_plan)?;
393
394        // 3a. EXPLAIN: return the optimized plan tree without executing
395        if optimized_plan.explain {
396            return Ok(explain_result(&optimized_plan));
397        }
398
399        // 4. Convert to physical plan (using RDF planner)
400        let planner = RdfPlanner::new(Arc::clone(rdf_store));
401        let mut physical_plan = planner.plan(&optimized_plan)?;
402
403        // 5. Execute and collect results
404        let executor = Executor::with_columns(physical_plan.columns.clone());
405        executor.execute(physical_plan.operator.as_mut())
406    }
407
408    /// Translates an RDF query to a logical plan.
409    #[cfg(feature = "rdf")]
410    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
411        match language {
412            #[cfg(feature = "sparql")]
413            QueryLanguage::Sparql => {
414                use crate::query::translators::sparql;
415                sparql::translate(query)
416            }
417            #[cfg(all(feature = "graphql", feature = "rdf"))]
418            QueryLanguage::GraphQLRdf => {
419                use crate::query::translators::graphql_rdf;
420                // Default namespace for GraphQL-RDF queries
421                graphql_rdf::translate(query, "http://example.org/")
422            }
423            _ => Err(Error::Internal(format!(
424                "Language {:?} is not an RDF language",
425                language
426            ))),
427        }
428    }
429
430    /// Returns a reference to the LPG store.
431    #[must_use]
432    pub fn lpg_store(&self) -> &Arc<LpgStore> {
433        &self.lpg_store
434    }
435
436    /// Returns a reference to the catalog.
437    #[must_use]
438    pub fn catalog(&self) -> &Arc<Catalog> {
439        &self.catalog
440    }
441
442    /// Returns a reference to the optimizer.
443    #[must_use]
444    pub fn optimizer(&self) -> &Optimizer {
445        &self.optimizer
446    }
447
448    /// Returns a reference to the RDF store (if configured).
449    #[cfg(feature = "rdf")]
450    #[must_use]
451    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
452        self.rdf_store.as_ref()
453    }
454}
455
456impl QueryProcessor {
457    /// Returns a reference to the transaction manager.
458    #[must_use]
459    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
460        &self.transaction_manager
461    }
462}
463
464/// Annotates filter operators in the plan with pushdown hints.
465///
466/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
467/// whether a property index exists for equality predicates.
468pub(crate) fn annotate_pushdown_hints(
469    op: &mut LogicalOperator,
470    store: &dyn grafeo_core::graph::GraphStore,
471) {
472    #[allow(clippy::wildcard_imports)]
473    use crate::query::plan::*;
474
475    match op {
476        LogicalOperator::Filter(filter) => {
477            // Recurse into children first
478            annotate_pushdown_hints(&mut filter.input, store);
479
480            // Annotate this filter if it sits on top of a NodeScan
481            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
482                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
483            }
484        }
485        LogicalOperator::NodeScan(op) => {
486            if let Some(input) = &mut op.input {
487                annotate_pushdown_hints(input, store);
488            }
489        }
490        LogicalOperator::EdgeScan(op) => {
491            if let Some(input) = &mut op.input {
492                annotate_pushdown_hints(input, store);
493            }
494        }
495        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
496        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
497        LogicalOperator::Join(op) => {
498            annotate_pushdown_hints(&mut op.left, store);
499            annotate_pushdown_hints(&mut op.right, store);
500        }
501        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
502        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
503        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
504        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
505        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
506        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
507        LogicalOperator::Union(op) => {
508            for input in &mut op.inputs {
509                annotate_pushdown_hints(input, store);
510            }
511        }
512        LogicalOperator::Apply(op) => {
513            annotate_pushdown_hints(&mut op.input, store);
514            annotate_pushdown_hints(&mut op.subplan, store);
515        }
516        LogicalOperator::Otherwise(op) => {
517            annotate_pushdown_hints(&mut op.left, store);
518            annotate_pushdown_hints(&mut op.right, store);
519        }
520        _ => {}
521    }
522}
523
524/// Infers the pushdown strategy for a filter predicate over a node scan.
525fn infer_pushdown(
526    predicate: &LogicalExpression,
527    scan: &crate::query::plan::NodeScanOp,
528    store: &dyn grafeo_core::graph::GraphStore,
529) -> Option<crate::query::plan::PushdownHint> {
530    #[allow(clippy::wildcard_imports)]
531    use crate::query::plan::*;
532
533    match predicate {
534        // Equality: n.prop = value
535        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
536            if let Some(prop) = extract_property_name(left, &scan.variable)
537                .or_else(|| extract_property_name(right, &scan.variable))
538            {
539                if store.has_property_index(&prop) {
540                    return Some(PushdownHint::IndexLookup { property: prop });
541                }
542                if scan.label.is_some() {
543                    return Some(PushdownHint::LabelFirst);
544                }
545            }
546            None
547        }
548        // Range: n.prop > value, n.prop < value, etc.
549        LogicalExpression::Binary {
550            left,
551            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
552            right,
553        } => {
554            if let Some(prop) = extract_property_name(left, &scan.variable)
555                .or_else(|| extract_property_name(right, &scan.variable))
556            {
557                if store.has_property_index(&prop) {
558                    return Some(PushdownHint::RangeScan { property: prop });
559                }
560                if scan.label.is_some() {
561                    return Some(PushdownHint::LabelFirst);
562                }
563            }
564            None
565        }
566        // AND: check the left side (first conjunct) for pushdown
567        LogicalExpression::Binary {
568            left,
569            op: BinaryOp::And,
570            ..
571        } => infer_pushdown(left, scan, store),
572        _ => {
573            // Any other predicate on a labeled scan gets label-first
574            if scan.label.is_some() {
575                Some(PushdownHint::LabelFirst)
576            } else {
577                None
578            }
579        }
580    }
581}
582
583/// Extracts the property name if the expression is `Property { variable, property }`
584/// and the variable matches the scan variable.
585fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
586    if let LogicalExpression::Property { variable, property } = expr
587        && variable == scan_var
588    {
589        Some(property.clone())
590    } else {
591        None
592    }
593}
594
595/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
596pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
597    let tree_text = plan.root.explain_tree();
598    QueryResult {
599        columns: vec!["plan".to_string()],
600        column_types: vec![grafeo_common::types::LogicalType::String],
601        rows: vec![vec![Value::String(tree_text.into())]],
602        execution_time_ms: None,
603        rows_scanned: None,
604        status_message: None,
605        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
606    }
607}
608
609/// Substitutes parameters in a logical plan with their values.
610pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
611    substitute_in_operator(&mut plan.root, params)
612}
613
614/// Recursively substitutes parameters in an operator.
615fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
616    #[allow(clippy::wildcard_imports)]
617    use crate::query::plan::*;
618
619    match op {
620        LogicalOperator::Filter(filter) => {
621            substitute_in_expression(&mut filter.predicate, params)?;
622            substitute_in_operator(&mut filter.input, params)?;
623        }
624        LogicalOperator::Return(ret) => {
625            for item in &mut ret.items {
626                substitute_in_expression(&mut item.expression, params)?;
627            }
628            substitute_in_operator(&mut ret.input, params)?;
629        }
630        LogicalOperator::Project(proj) => {
631            for p in &mut proj.projections {
632                substitute_in_expression(&mut p.expression, params)?;
633            }
634            substitute_in_operator(&mut proj.input, params)?;
635        }
636        LogicalOperator::NodeScan(scan) => {
637            if let Some(input) = &mut scan.input {
638                substitute_in_operator(input, params)?;
639            }
640        }
641        LogicalOperator::EdgeScan(scan) => {
642            if let Some(input) = &mut scan.input {
643                substitute_in_operator(input, params)?;
644            }
645        }
646        LogicalOperator::Expand(expand) => {
647            substitute_in_operator(&mut expand.input, params)?;
648        }
649        LogicalOperator::Join(join) => {
650            substitute_in_operator(&mut join.left, params)?;
651            substitute_in_operator(&mut join.right, params)?;
652            for cond in &mut join.conditions {
653                substitute_in_expression(&mut cond.left, params)?;
654                substitute_in_expression(&mut cond.right, params)?;
655            }
656        }
657        LogicalOperator::LeftJoin(join) => {
658            substitute_in_operator(&mut join.left, params)?;
659            substitute_in_operator(&mut join.right, params)?;
660            if let Some(cond) = &mut join.condition {
661                substitute_in_expression(cond, params)?;
662            }
663        }
664        LogicalOperator::Aggregate(agg) => {
665            for expr in &mut agg.group_by {
666                substitute_in_expression(expr, params)?;
667            }
668            for agg_expr in &mut agg.aggregates {
669                if let Some(expr) = &mut agg_expr.expression {
670                    substitute_in_expression(expr, params)?;
671                }
672            }
673            substitute_in_operator(&mut agg.input, params)?;
674        }
675        LogicalOperator::Sort(sort) => {
676            for key in &mut sort.keys {
677                substitute_in_expression(&mut key.expression, params)?;
678            }
679            substitute_in_operator(&mut sort.input, params)?;
680        }
681        LogicalOperator::Limit(limit) => {
682            resolve_count_param(&mut limit.count, params)?;
683            substitute_in_operator(&mut limit.input, params)?;
684        }
685        LogicalOperator::Skip(skip) => {
686            resolve_count_param(&mut skip.count, params)?;
687            substitute_in_operator(&mut skip.input, params)?;
688        }
689        LogicalOperator::Distinct(distinct) => {
690            substitute_in_operator(&mut distinct.input, params)?;
691        }
692        LogicalOperator::CreateNode(create) => {
693            for (_, expr) in &mut create.properties {
694                substitute_in_expression(expr, params)?;
695            }
696            if let Some(input) = &mut create.input {
697                substitute_in_operator(input, params)?;
698            }
699        }
700        LogicalOperator::CreateEdge(create) => {
701            for (_, expr) in &mut create.properties {
702                substitute_in_expression(expr, params)?;
703            }
704            substitute_in_operator(&mut create.input, params)?;
705        }
706        LogicalOperator::DeleteNode(delete) => {
707            substitute_in_operator(&mut delete.input, params)?;
708        }
709        LogicalOperator::DeleteEdge(delete) => {
710            substitute_in_operator(&mut delete.input, params)?;
711        }
712        LogicalOperator::SetProperty(set) => {
713            for (_, expr) in &mut set.properties {
714                substitute_in_expression(expr, params)?;
715            }
716            substitute_in_operator(&mut set.input, params)?;
717        }
718        LogicalOperator::Union(union) => {
719            for input in &mut union.inputs {
720                substitute_in_operator(input, params)?;
721            }
722        }
723        LogicalOperator::AntiJoin(anti) => {
724            substitute_in_operator(&mut anti.left, params)?;
725            substitute_in_operator(&mut anti.right, params)?;
726        }
727        LogicalOperator::Bind(bind) => {
728            substitute_in_expression(&mut bind.expression, params)?;
729            substitute_in_operator(&mut bind.input, params)?;
730        }
731        LogicalOperator::TripleScan(scan) => {
732            if let Some(input) = &mut scan.input {
733                substitute_in_operator(input, params)?;
734            }
735        }
736        LogicalOperator::Unwind(unwind) => {
737            substitute_in_expression(&mut unwind.expression, params)?;
738            substitute_in_operator(&mut unwind.input, params)?;
739        }
740        LogicalOperator::MapCollect(mc) => {
741            substitute_in_operator(&mut mc.input, params)?;
742        }
743        LogicalOperator::Merge(merge) => {
744            for (_, expr) in &mut merge.match_properties {
745                substitute_in_expression(expr, params)?;
746            }
747            for (_, expr) in &mut merge.on_create {
748                substitute_in_expression(expr, params)?;
749            }
750            for (_, expr) in &mut merge.on_match {
751                substitute_in_expression(expr, params)?;
752            }
753            substitute_in_operator(&mut merge.input, params)?;
754        }
755        LogicalOperator::MergeRelationship(merge_rel) => {
756            for (_, expr) in &mut merge_rel.match_properties {
757                substitute_in_expression(expr, params)?;
758            }
759            for (_, expr) in &mut merge_rel.on_create {
760                substitute_in_expression(expr, params)?;
761            }
762            for (_, expr) in &mut merge_rel.on_match {
763                substitute_in_expression(expr, params)?;
764            }
765            substitute_in_operator(&mut merge_rel.input, params)?;
766        }
767        LogicalOperator::AddLabel(add_label) => {
768            substitute_in_operator(&mut add_label.input, params)?;
769        }
770        LogicalOperator::RemoveLabel(remove_label) => {
771            substitute_in_operator(&mut remove_label.input, params)?;
772        }
773        LogicalOperator::ShortestPath(sp) => {
774            substitute_in_operator(&mut sp.input, params)?;
775        }
776        // SPARQL Update operators
777        LogicalOperator::InsertTriple(insert) => {
778            if let Some(ref mut input) = insert.input {
779                substitute_in_operator(input, params)?;
780            }
781        }
782        LogicalOperator::DeleteTriple(delete) => {
783            if let Some(ref mut input) = delete.input {
784                substitute_in_operator(input, params)?;
785            }
786        }
787        LogicalOperator::Modify(modify) => {
788            substitute_in_operator(&mut modify.where_clause, params)?;
789        }
790        LogicalOperator::ClearGraph(_)
791        | LogicalOperator::CreateGraph(_)
792        | LogicalOperator::DropGraph(_)
793        | LogicalOperator::LoadGraph(_)
794        | LogicalOperator::CopyGraph(_)
795        | LogicalOperator::MoveGraph(_)
796        | LogicalOperator::AddGraph(_) => {}
797        LogicalOperator::HorizontalAggregate(op) => {
798            substitute_in_operator(&mut op.input, params)?;
799        }
800        LogicalOperator::Empty => {}
801        LogicalOperator::VectorScan(scan) => {
802            substitute_in_expression(&mut scan.query_vector, params)?;
803            if let Some(ref mut input) = scan.input {
804                substitute_in_operator(input, params)?;
805            }
806        }
807        LogicalOperator::VectorJoin(join) => {
808            substitute_in_expression(&mut join.query_vector, params)?;
809            substitute_in_operator(&mut join.input, params)?;
810        }
811        LogicalOperator::Except(except) => {
812            substitute_in_operator(&mut except.left, params)?;
813            substitute_in_operator(&mut except.right, params)?;
814        }
815        LogicalOperator::Intersect(intersect) => {
816            substitute_in_operator(&mut intersect.left, params)?;
817            substitute_in_operator(&mut intersect.right, params)?;
818        }
819        LogicalOperator::Otherwise(otherwise) => {
820            substitute_in_operator(&mut otherwise.left, params)?;
821            substitute_in_operator(&mut otherwise.right, params)?;
822        }
823        LogicalOperator::Apply(apply) => {
824            substitute_in_operator(&mut apply.input, params)?;
825            substitute_in_operator(&mut apply.subplan, params)?;
826        }
827        // ParameterScan has no expressions to substitute
828        LogicalOperator::ParameterScan(_) => {}
829        LogicalOperator::MultiWayJoin(mwj) => {
830            for input in &mut mwj.inputs {
831                substitute_in_operator(input, params)?;
832            }
833            for cond in &mut mwj.conditions {
834                substitute_in_expression(&mut cond.left, params)?;
835                substitute_in_expression(&mut cond.right, params)?;
836            }
837        }
838        // DDL operators have no expressions to substitute
839        LogicalOperator::CreatePropertyGraph(_) => {}
840        // Procedure calls: arguments could contain parameters but we handle at execution time
841        LogicalOperator::CallProcedure(_) => {}
842        // LoadData: file path is a literal, no parameter substitution needed
843        LogicalOperator::LoadData(_) => {}
844    }
845    Ok(())
846}
847
848/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
849fn resolve_count_param(
850    count: &mut crate::query::plan::CountExpr,
851    params: &QueryParams,
852) -> Result<()> {
853    use crate::query::plan::CountExpr;
854    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
855
856    if let CountExpr::Parameter(name) = count {
857        let value = params.get(name.as_str()).ok_or_else(|| {
858            Error::Query(QueryError::new(
859                QueryErrorKind::Semantic,
860                format!("Missing parameter for SKIP/LIMIT: ${name}"),
861            ))
862        })?;
863        let n = match value {
864            Value::Int64(i) if *i >= 0 => *i as usize,
865            Value::Int64(i) => {
866                return Err(Error::Query(QueryError::new(
867                    QueryErrorKind::Semantic,
868                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
869                )));
870            }
871            other => {
872                return Err(Error::Query(QueryError::new(
873                    QueryErrorKind::Semantic,
874                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
875                )));
876            }
877        };
878        *count = CountExpr::Literal(n);
879    }
880    Ok(())
881}
882
883/// Substitutes parameters in an expression with their values.
884fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
885    use crate::query::plan::LogicalExpression;
886
887    match expr {
888        LogicalExpression::Parameter(name) => {
889            if let Some(value) = params.get(name) {
890                *expr = LogicalExpression::Literal(value.clone());
891            } else {
892                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
893            }
894        }
895        LogicalExpression::Binary { left, right, .. } => {
896            substitute_in_expression(left, params)?;
897            substitute_in_expression(right, params)?;
898        }
899        LogicalExpression::Unary { operand, .. } => {
900            substitute_in_expression(operand, params)?;
901        }
902        LogicalExpression::FunctionCall { args, .. } => {
903            for arg in args {
904                substitute_in_expression(arg, params)?;
905            }
906        }
907        LogicalExpression::List(items) => {
908            for item in items {
909                substitute_in_expression(item, params)?;
910            }
911        }
912        LogicalExpression::Map(pairs) => {
913            for (_, value) in pairs {
914                substitute_in_expression(value, params)?;
915            }
916        }
917        LogicalExpression::IndexAccess { base, index } => {
918            substitute_in_expression(base, params)?;
919            substitute_in_expression(index, params)?;
920        }
921        LogicalExpression::SliceAccess { base, start, end } => {
922            substitute_in_expression(base, params)?;
923            if let Some(s) = start {
924                substitute_in_expression(s, params)?;
925            }
926            if let Some(e) = end {
927                substitute_in_expression(e, params)?;
928            }
929        }
930        LogicalExpression::Case {
931            operand,
932            when_clauses,
933            else_clause,
934        } => {
935            if let Some(op) = operand {
936                substitute_in_expression(op, params)?;
937            }
938            for (cond, result) in when_clauses {
939                substitute_in_expression(cond, params)?;
940                substitute_in_expression(result, params)?;
941            }
942            if let Some(el) = else_clause {
943                substitute_in_expression(el, params)?;
944            }
945        }
946        LogicalExpression::Property { .. }
947        | LogicalExpression::Variable(_)
948        | LogicalExpression::Literal(_)
949        | LogicalExpression::Labels(_)
950        | LogicalExpression::Type(_)
951        | LogicalExpression::Id(_) => {}
952        LogicalExpression::ListComprehension {
953            list_expr,
954            filter_expr,
955            map_expr,
956            ..
957        } => {
958            substitute_in_expression(list_expr, params)?;
959            if let Some(filter) = filter_expr {
960                substitute_in_expression(filter, params)?;
961            }
962            substitute_in_expression(map_expr, params)?;
963        }
964        LogicalExpression::ListPredicate {
965            list_expr,
966            predicate,
967            ..
968        } => {
969            substitute_in_expression(list_expr, params)?;
970            substitute_in_expression(predicate, params)?;
971        }
972        LogicalExpression::ExistsSubquery(_)
973        | LogicalExpression::CountSubquery(_)
974        | LogicalExpression::ValueSubquery(_) => {
975            // Subqueries would need recursive parameter substitution
976        }
977        LogicalExpression::PatternComprehension { projection, .. } => {
978            substitute_in_expression(projection, params)?;
979        }
980        LogicalExpression::MapProjection { entries, .. } => {
981            for entry in entries {
982                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
983                    substitute_in_expression(expr, params)?;
984                }
985            }
986        }
987        LogicalExpression::Reduce {
988            initial,
989            list,
990            expression,
991            ..
992        } => {
993            substitute_in_expression(initial, params)?;
994            substitute_in_expression(list, params)?;
995            substitute_in_expression(expression, params)?;
996        }
997    }
998    Ok(())
999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004
1005    #[test]
1006    fn test_query_language_is_lpg() {
1007        #[cfg(feature = "gql")]
1008        assert!(QueryLanguage::Gql.is_lpg());
1009        #[cfg(feature = "cypher")]
1010        assert!(QueryLanguage::Cypher.is_lpg());
1011        #[cfg(feature = "sparql")]
1012        assert!(!QueryLanguage::Sparql.is_lpg());
1013    }
1014
1015    #[test]
1016    fn test_processor_creation() {
1017        let store = Arc::new(LpgStore::new().unwrap());
1018        let processor = QueryProcessor::for_lpg(store);
1019        assert!(processor.lpg_store().node_count() == 0);
1020    }
1021
1022    #[cfg(feature = "gql")]
1023    #[test]
1024    fn test_process_simple_gql() {
1025        let store = Arc::new(LpgStore::new().unwrap());
1026        store.create_node(&["Person"]);
1027        store.create_node(&["Person"]);
1028
1029        let processor = QueryProcessor::for_lpg(store);
1030        let result = processor
1031            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1032            .unwrap();
1033
1034        assert_eq!(result.row_count(), 2);
1035        assert_eq!(result.columns[0], "n");
1036    }
1037
1038    #[cfg(feature = "cypher")]
1039    #[test]
1040    fn test_process_simple_cypher() {
1041        let store = Arc::new(LpgStore::new().unwrap());
1042        store.create_node(&["Person"]);
1043
1044        let processor = QueryProcessor::for_lpg(store);
1045        let result = processor
1046            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1047            .unwrap();
1048
1049        assert_eq!(result.row_count(), 1);
1050    }
1051
1052    #[cfg(feature = "gql")]
1053    #[test]
1054    fn test_process_with_params() {
1055        let store = Arc::new(LpgStore::new().unwrap());
1056        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1057        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1058        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1059
1060        let processor = QueryProcessor::for_lpg(store);
1061
1062        // Query with parameter
1063        let mut params = HashMap::new();
1064        params.insert("min_age".to_string(), Value::Int64(30));
1065
1066        let result = processor
1067            .process(
1068                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1069                QueryLanguage::Gql,
1070                Some(&params),
1071            )
1072            .unwrap();
1073
1074        // Should return 2 people (ages 35 and 45)
1075        assert_eq!(result.row_count(), 2);
1076    }
1077
1078    #[cfg(feature = "gql")]
1079    #[test]
1080    fn test_missing_param_error() {
1081        let store = Arc::new(LpgStore::new().unwrap());
1082        store.create_node(&["Person"]);
1083
1084        let processor = QueryProcessor::for_lpg(store);
1085
1086        // Query with parameter but empty params map (missing the required param)
1087        let params: HashMap<String, Value> = HashMap::new();
1088        let result = processor.process(
1089            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1090            QueryLanguage::Gql,
1091            Some(&params),
1092        );
1093
1094        // Should fail with missing parameter error
1095        assert!(result.is_err());
1096        let err = result.unwrap_err();
1097        assert!(
1098            err.to_string().contains("Missing parameter"),
1099            "Expected 'Missing parameter' error, got: {}",
1100            err
1101        );
1102    }
1103
1104    #[cfg(feature = "gql")]
1105    #[test]
1106    fn test_params_in_filter_with_property() {
1107        // Tests parameter substitution in WHERE clause with property comparison
1108        let store = Arc::new(LpgStore::new().unwrap());
1109        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1110        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1111
1112        let processor = QueryProcessor::for_lpg(store);
1113
1114        let mut params = HashMap::new();
1115        params.insert("threshold".to_string(), Value::Int64(15));
1116
1117        let result = processor
1118            .process(
1119                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1120                QueryLanguage::Gql,
1121                Some(&params),
1122            )
1123            .unwrap();
1124
1125        // Only value=20 matches > 15
1126        assert_eq!(result.row_count(), 1);
1127        let row = &result.rows[0];
1128        assert_eq!(row[0], Value::Int64(20));
1129    }
1130
1131    #[cfg(feature = "gql")]
1132    #[test]
1133    fn test_params_in_multiple_where_conditions() {
1134        // Tests multiple parameters in WHERE clause with AND
1135        let store = Arc::new(LpgStore::new().unwrap());
1136        store.create_node_with_props(
1137            &["Person"],
1138            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1139        );
1140        store.create_node_with_props(
1141            &["Person"],
1142            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1143        );
1144        store.create_node_with_props(
1145            &["Person"],
1146            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1147        );
1148
1149        let processor = QueryProcessor::for_lpg(store);
1150
1151        let mut params = HashMap::new();
1152        params.insert("min_age".to_string(), Value::Int64(30));
1153        params.insert("min_score".to_string(), Value::Int64(75));
1154
1155        let result = processor
1156            .process(
1157                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1158                QueryLanguage::Gql,
1159                Some(&params),
1160            )
1161            .unwrap();
1162
1163        // Only the person with age=35, score=90 matches both conditions
1164        assert_eq!(result.row_count(), 1);
1165    }
1166
1167    #[cfg(feature = "gql")]
1168    #[test]
1169    fn test_params_with_in_list() {
1170        // Tests parameter as a value checked against IN list
1171        let store = Arc::new(LpgStore::new().unwrap());
1172        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1173        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1174        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1175
1176        let processor = QueryProcessor::for_lpg(store);
1177
1178        // Check if a parameter value matches any of the statuses
1179        let mut params = HashMap::new();
1180        params.insert("target".to_string(), Value::String("active".into()));
1181
1182        let result = processor
1183            .process(
1184                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1185                QueryLanguage::Gql,
1186                Some(&params),
1187            )
1188            .unwrap();
1189
1190        assert_eq!(result.row_count(), 1);
1191    }
1192
1193    #[cfg(feature = "gql")]
1194    #[test]
1195    fn test_params_same_type_comparison() {
1196        // Tests that same-type parameter comparisons work correctly
1197        let store = Arc::new(LpgStore::new().unwrap());
1198        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1199        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1200
1201        let processor = QueryProcessor::for_lpg(store);
1202
1203        // Compare int property with int parameter
1204        let mut params = HashMap::new();
1205        params.insert("threshold".to_string(), Value::Int64(75));
1206
1207        let result = processor
1208            .process(
1209                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1210                QueryLanguage::Gql,
1211                Some(&params),
1212            )
1213            .unwrap();
1214
1215        // Only value=100 matches > 75
1216        assert_eq!(result.row_count(), 1);
1217    }
1218
1219    #[cfg(feature = "gql")]
1220    #[test]
1221    fn test_process_empty_result_has_columns() {
1222        // Tests that empty results still have correct column names
1223        let store = Arc::new(LpgStore::new().unwrap());
1224        // Don't create any nodes
1225
1226        let processor = QueryProcessor::for_lpg(store);
1227        let result = processor
1228            .process(
1229                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1230                QueryLanguage::Gql,
1231                None,
1232            )
1233            .unwrap();
1234
1235        assert_eq!(result.row_count(), 0);
1236        assert_eq!(result.columns.len(), 2);
1237        assert_eq!(result.columns[0], "name");
1238        assert_eq!(result.columns[1], "age");
1239    }
1240
1241    #[cfg(feature = "gql")]
1242    #[test]
1243    fn test_params_string_equality() {
1244        // Tests string parameter equality comparison
1245        let store = Arc::new(LpgStore::new().unwrap());
1246        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1247        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1248        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1249
1250        let processor = QueryProcessor::for_lpg(store);
1251
1252        let mut params = HashMap::new();
1253        params.insert("target".to_string(), Value::String("beta".into()));
1254
1255        let result = processor
1256            .process(
1257                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1258                QueryLanguage::Gql,
1259                Some(&params),
1260            )
1261            .unwrap();
1262
1263        assert_eq!(result.row_count(), 1);
1264        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1265    }
1266}