Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28/// Supported query languages.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32    /// GQL (ISO/IEC 39075:2024) - default for LPG
33    #[cfg(feature = "gql")]
34    Gql,
35    /// openCypher 9.0
36    #[cfg(feature = "cypher")]
37    Cypher,
38    /// Apache TinkerPop Gremlin
39    #[cfg(feature = "gremlin")]
40    Gremlin,
41    /// GraphQL for LPG
42    #[cfg(feature = "graphql")]
43    GraphQL,
44    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
45    #[cfg(feature = "sql-pgq")]
46    SqlPgq,
47    /// SPARQL 1.1 for RDF
48    #[cfg(feature = "sparql")]
49    Sparql,
50    /// GraphQL for RDF
51    #[cfg(all(feature = "graphql", feature = "triple-store"))]
52    GraphQLRdf,
53}
54
55impl QueryLanguage {
56    /// Returns whether this language targets LPG (vs RDF).
57    #[must_use]
58    pub const fn is_lpg(&self) -> bool {
59        match self {
60            #[cfg(feature = "gql")]
61            Self::Gql => true,
62            #[cfg(feature = "cypher")]
63            Self::Cypher => true,
64            #[cfg(feature = "gremlin")]
65            Self::Gremlin => true,
66            #[cfg(feature = "graphql")]
67            Self::GraphQL => true,
68            #[cfg(feature = "sql-pgq")]
69            Self::SqlPgq => true,
70            #[cfg(feature = "sparql")]
71            Self::Sparql => false,
72            #[cfg(all(feature = "graphql", feature = "triple-store"))]
73            Self::GraphQLRdf => false,
74            #[allow(unreachable_patterns)]
75            _ => false,
76        }
77    }
78}
79
80/// Query parameters for prepared statements.
81pub type QueryParams = HashMap<String, Value>;
82
83/// Processes queries through the full pipeline.
84///
85/// The processor holds references to the stores and provides a unified
86/// interface for executing queries in any supported language.
87///
88/// # Example
89///
90/// ```no_run
91/// # use std::sync::Arc;
92/// # use grafeo_core::graph::lpg::LpgStore;
93/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
94///
95/// # fn main() -> grafeo_common::utils::error::Result<()> {
96/// let store = Arc::new(LpgStore::new().unwrap());
97/// let processor = QueryProcessor::for_lpg(store);
98/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
99/// # Ok(())
100/// # }
101/// ```
102pub struct QueryProcessor {
103    /// LPG store for property graph queries.
104    #[cfg(feature = "lpg")]
105    lpg_store: Arc<LpgStore>,
106    /// Graph store trait object for pluggable storage backends (read path).
107    graph_store: Arc<dyn GraphStoreSearch>,
108    /// Writable graph store (None when read-only).
109    write_store: Option<Arc<dyn GraphStoreMut>>,
110    /// Transaction manager for MVCC operations.
111    transaction_manager: Arc<TransactionManager>,
112    /// Catalog for schema and index metadata.
113    catalog: Arc<Catalog>,
114    /// Query optimizer.
115    optimizer: Optimizer,
116    /// Current transaction context (if any).
117    transaction_context: Option<(EpochId, TransactionId)>,
118    /// RDF store for triple pattern queries (optional).
119    #[cfg(feature = "triple-store")]
120    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
121}
122
123impl QueryProcessor {
124    /// Creates a new query processor for LPG queries.
125    #[cfg(feature = "lpg")]
126    #[must_use]
127    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
128        let optimizer = Optimizer::from_store(&store);
129        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
130        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
131        Self {
132            lpg_store: store,
133            graph_store,
134            write_store,
135            transaction_manager: Arc::new(TransactionManager::new()),
136            catalog: Arc::new(Catalog::new()),
137            optimizer,
138            transaction_context: None,
139            #[cfg(feature = "triple-store")]
140            rdf_store: None,
141        }
142    }
143
144    /// Creates a new query processor with a transaction manager.
145    #[cfg(feature = "lpg")]
146    #[must_use]
147    pub fn for_lpg_with_transaction(
148        store: Arc<LpgStore>,
149        transaction_manager: Arc<TransactionManager>,
150    ) -> Self {
151        let optimizer = Optimizer::from_store(&store);
152        let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
153        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
154        Self {
155            lpg_store: store,
156            graph_store,
157            write_store,
158            transaction_manager,
159            catalog: Arc::new(Catalog::new()),
160            optimizer,
161            transaction_context: None,
162            #[cfg(feature = "triple-store")]
163            rdf_store: None,
164        }
165    }
166
167    /// Creates a query processor backed by any `GraphStoreMut` implementation.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the internal arena allocation fails (out of memory).
172    pub fn for_graph_store_with_transaction(
173        store: Arc<dyn GraphStoreMut>,
174        transaction_manager: Arc<TransactionManager>,
175    ) -> Result<Self> {
176        let optimizer = Optimizer::from_graph_store(&*store);
177        let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
178        Ok(Self {
179            #[cfg(feature = "lpg")]
180            lpg_store: Arc::new(LpgStore::new()?),
181            graph_store: read_store,
182            write_store: Some(store),
183            transaction_manager,
184            catalog: Arc::new(Catalog::new()),
185            optimizer,
186            transaction_context: None,
187            #[cfg(feature = "triple-store")]
188            rdf_store: None,
189        })
190    }
191
192    /// Creates a query processor from split read/write stores.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the internal arena allocation fails (out of memory).
197    pub fn for_stores_with_transaction(
198        read_store: Arc<dyn GraphStoreSearch>,
199        write_store: Option<Arc<dyn GraphStoreMut>>,
200        transaction_manager: Arc<TransactionManager>,
201    ) -> Result<Self> {
202        let optimizer = Optimizer::from_graph_store(&*read_store);
203        Ok(Self {
204            #[cfg(feature = "lpg")]
205            lpg_store: Arc::new(LpgStore::new()?),
206            graph_store: read_store,
207            write_store,
208            transaction_manager,
209            catalog: Arc::new(Catalog::new()),
210            optimizer,
211            transaction_context: None,
212            #[cfg(feature = "triple-store")]
213            rdf_store: None,
214        })
215    }
216
217    /// Sets the transaction context for MVCC visibility.
218    ///
219    /// This should be called when the processor is used within a transaction.
220    #[must_use]
221    pub fn with_transaction_context(
222        mut self,
223        viewing_epoch: EpochId,
224        transaction_id: TransactionId,
225    ) -> Self {
226        self.transaction_context = Some((viewing_epoch, transaction_id));
227        self
228    }
229
230    /// Sets a custom catalog.
231    #[must_use]
232    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
233        self.catalog = catalog;
234        self
235    }
236
237    /// Sets a custom optimizer.
238    #[must_use]
239    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
240        self.optimizer = optimizer;
241        self
242    }
243
244    /// Processes a query string and returns results.
245    ///
246    /// Pipeline:
247    /// 1. Parse (language-specific parser → AST)
248    /// 2. Translate (AST → LogicalPlan)
249    /// 3. Bind (semantic validation)
250    /// 4. Optimize (filter pushdown, join reorder, etc.)
251    /// 5. Plan (logical → physical operators)
252    /// 6. Execute (run operators, collect results)
253    ///
254    /// # Arguments
255    ///
256    /// * `query` - The query string
257    /// * `language` - Which query language to use
258    /// * `params` - Optional query parameters for prepared statements
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if any stage of the pipeline fails.
263    pub fn process(
264        &self,
265        query: &str,
266        language: QueryLanguage,
267        params: Option<&QueryParams>,
268    ) -> Result<QueryResult> {
269        if language.is_lpg() {
270            self.process_lpg(query, language, params)
271        } else {
272            #[cfg(feature = "triple-store")]
273            {
274                self.process_rdf(query, language, params)
275            }
276            #[cfg(not(feature = "triple-store"))]
277            {
278                Err(Error::Internal(
279                    "RDF support not enabled. Compile with --features rdf".to_string(),
280                ))
281            }
282        }
283    }
284
285    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
286    fn process_lpg(
287        &self,
288        query: &str,
289        language: QueryLanguage,
290        params: Option<&QueryParams>,
291    ) -> Result<QueryResult> {
292        #[cfg(not(target_arch = "wasm32"))]
293        let start_time = std::time::Instant::now();
294
295        // 1. Parse and translate to logical plan
296        let mut logical_plan = self.translate_lpg(query, language)?;
297
298        // 2. Substitute parameters if provided (merge defaults from the plan first)
299        let has_defaults = !logical_plan.default_params.is_empty();
300        if params.is_some() || has_defaults {
301            let merged = if has_defaults {
302                let mut merged = logical_plan.default_params.clone();
303                if let Some(params) = params {
304                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
305                }
306                merged
307            } else {
308                params.cloned().unwrap_or_default()
309            };
310            substitute_params(&mut logical_plan, &merged)?;
311        }
312
313        // 3. Semantic validation
314        let mut binder = Binder::new();
315        let _binding_context = binder.bind(&logical_plan)?;
316
317        // 4. Optimize the plan
318        let optimized_plan = self.optimizer.optimize(logical_plan)?;
319
320        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
321        if optimized_plan.explain {
322            let mut plan = optimized_plan;
323            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
324            return Ok(explain_result(&plan));
325        }
326
327        // 5. Convert to physical plan with transaction context
328        // Read-only fast path: safe when no mutations AND no active transaction
329        // (an active transaction may have prior uncommitted writes from earlier statements)
330        let is_read_only =
331            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
332        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
333            Planner::with_context(
334                Arc::clone(&self.graph_store),
335                self.write_store.as_ref().map(Arc::clone),
336                Arc::clone(&self.transaction_manager),
337                Some(transaction_id),
338                epoch,
339            )
340        } else {
341            Planner::with_context(
342                Arc::clone(&self.graph_store),
343                self.write_store.as_ref().map(Arc::clone),
344                Arc::clone(&self.transaction_manager),
345                None,
346                self.transaction_manager.current_epoch(),
347            )
348        }
349        .with_read_only(is_read_only);
350        let mut physical_plan = planner.plan(&optimized_plan)?;
351
352        // 6. Execute and collect results
353        let executor = Executor::with_columns(physical_plan.columns.clone());
354        let mut result = executor.execute(physical_plan.operator.as_mut())?;
355
356        // Add execution metrics
357        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
358        #[cfg(not(target_arch = "wasm32"))]
359        {
360            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
361            result.execution_time_ms = Some(elapsed_ms);
362        }
363        result.rows_scanned = Some(rows_scanned);
364
365        Ok(result)
366    }
367
368    /// Translates an LPG query to a logical plan.
369    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
370        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
371        match language {
372            #[cfg(feature = "gql")]
373            QueryLanguage::Gql => {
374                use crate::query::translators::gql;
375                gql::translate(query)
376            }
377            #[cfg(feature = "cypher")]
378            QueryLanguage::Cypher => {
379                use crate::query::translators::cypher;
380                cypher::translate(query)
381            }
382            #[cfg(feature = "gremlin")]
383            QueryLanguage::Gremlin => {
384                use crate::query::translators::gremlin;
385                gremlin::translate(query)
386            }
387            #[cfg(feature = "graphql")]
388            QueryLanguage::GraphQL => {
389                use crate::query::translators::graphql;
390                graphql::translate(query)
391            }
392            #[cfg(feature = "sql-pgq")]
393            QueryLanguage::SqlPgq => {
394                use crate::query::translators::sql_pgq;
395                sql_pgq::translate(query)
396            }
397            #[allow(unreachable_patterns)]
398            _ => Err(Error::Internal(format!(
399                "Language {:?} is not an LPG language",
400                language
401            ))),
402        }
403    }
404
405    /// Returns a reference to the LPG store.
406    #[cfg(feature = "lpg")]
407    #[must_use]
408    pub fn lpg_store(&self) -> &Arc<LpgStore> {
409        &self.lpg_store
410    }
411
412    /// Returns a reference to the catalog.
413    #[must_use]
414    pub fn catalog(&self) -> &Arc<Catalog> {
415        &self.catalog
416    }
417
418    /// Returns a reference to the optimizer.
419    #[must_use]
420    pub fn optimizer(&self) -> &Optimizer {
421        &self.optimizer
422    }
423}
424
425impl QueryProcessor {
426    /// Returns a reference to the transaction manager.
427    #[must_use]
428    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
429        &self.transaction_manager
430    }
431}
432
433// =========================================================================
434// RDF-specific methods (gated behind `rdf` feature)
435// =========================================================================
436
437#[cfg(feature = "triple-store")]
438impl QueryProcessor {
439    /// Creates a new query processor with both LPG and RDF stores.
440    #[cfg(feature = "lpg")]
441    #[must_use]
442    pub fn with_rdf(
443        lpg_store: Arc<LpgStore>,
444        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
445    ) -> Self {
446        let optimizer = Optimizer::from_store(&lpg_store);
447        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreSearch>;
448        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
449        Self {
450            lpg_store,
451            graph_store,
452            write_store,
453            transaction_manager: Arc::new(TransactionManager::new()),
454            catalog: Arc::new(Catalog::new()),
455            optimizer,
456            transaction_context: None,
457            rdf_store: Some(rdf_store),
458        }
459    }
460
461    /// Returns a reference to the RDF store (if configured).
462    #[must_use]
463    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
464        self.rdf_store.as_ref()
465    }
466
467    /// Processes an RDF query (SPARQL, GraphQL-RDF).
468    fn process_rdf(
469        &self,
470        query: &str,
471        language: QueryLanguage,
472        params: Option<&QueryParams>,
473    ) -> Result<QueryResult> {
474        use crate::query::planner::rdf::RdfPlanner;
475
476        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
477            Error::Internal("RDF store not configured for this processor".to_string())
478        })?;
479
480        // 1. Parse and translate to logical plan
481        let mut logical_plan = self.translate_rdf(query, language)?;
482
483        // 2. Substitute parameters if provided (merge defaults from the plan first)
484        let has_defaults = !logical_plan.default_params.is_empty();
485        if params.is_some() || has_defaults {
486            let merged = if has_defaults {
487                let mut merged = logical_plan.default_params.clone();
488                if let Some(params) = params {
489                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
490                }
491                merged
492            } else {
493                params.cloned().unwrap_or_default()
494            };
495            substitute_params(&mut logical_plan, &merged)?;
496        }
497
498        // 3. Semantic validation
499        let mut binder = Binder::new();
500        let _binding_context = binder.bind(&logical_plan)?;
501
502        // 3. Optimize the plan (use RDF statistics for cost-based optimization)
503        let rdf_optimizer = {
504            let stats = rdf_store.get_or_collect_statistics();
505            Optimizer::from_rdf_statistics((*stats).clone())
506        };
507        let optimized_plan = rdf_optimizer.optimize(logical_plan)?;
508
509        // 3a. EXPLAIN: return the plan tree without executing.
510        // Includes physical operator names by planning with profiling to collect entries.
511        if optimized_plan.explain {
512            let planner = RdfPlanner::new(Arc::clone(rdf_store));
513            let (_, entries) = planner.plan_profiled(&optimized_plan)?;
514            return Ok(physical_explain_result(&optimized_plan, entries));
515        }
516
517        // 3b. EXPLAIN ANALYZE (PROFILE): execute with instrumentation, report stats.
518        if optimized_plan.profile {
519            let planner = RdfPlanner::new(Arc::clone(rdf_store));
520            let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
521
522            let start = std::time::Instant::now();
523            let executor = Executor::with_columns(physical_plan.columns.clone());
524            let _result = executor.execute(physical_plan.operator.as_mut())?;
525            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
526
527            let tree = crate::query::profile::build_profile_tree(
528                &optimized_plan.root,
529                &mut entries.into_iter(),
530            );
531            return Ok(crate::query::profile::profile_result(&tree, elapsed_ms));
532        }
533
534        // 4. Convert to physical plan (using RDF planner)
535        let planner = RdfPlanner::new(Arc::clone(rdf_store));
536        let mut physical_plan = planner.plan(&optimized_plan)?;
537
538        // 5. Execute and collect results
539        let executor = Executor::with_columns(physical_plan.columns.clone());
540        executor.execute(physical_plan.operator.as_mut())
541    }
542
543    /// Translates an RDF query to a logical plan.
544    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
545        match language {
546            #[cfg(feature = "sparql")]
547            QueryLanguage::Sparql => {
548                use crate::query::translators::sparql;
549                sparql::translate(query)
550            }
551            #[cfg(all(feature = "graphql", feature = "triple-store"))]
552            QueryLanguage::GraphQLRdf => {
553                use crate::query::translators::graphql_rdf;
554                // Default namespace for GraphQL-RDF queries
555                graphql_rdf::translate(query, "http://example.org/")
556            }
557            _ => Err(Error::Internal(format!(
558                "Language {:?} is not an RDF language",
559                language
560            ))),
561        }
562    }
563}
564
565/// Annotates filter operators in the plan with pushdown hints.
566///
567/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
568/// whether a property index exists for equality predicates.
569pub(crate) fn annotate_pushdown_hints(
570    op: &mut LogicalOperator,
571    store: &dyn grafeo_core::graph::GraphStore,
572) {
573    #[allow(clippy::wildcard_imports)]
574    use crate::query::plan::*;
575
576    match op {
577        LogicalOperator::Filter(filter) => {
578            // Recurse into children first
579            annotate_pushdown_hints(&mut filter.input, store);
580
581            // Annotate this filter if it sits on top of a NodeScan
582            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
583                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
584            }
585        }
586        LogicalOperator::NodeScan(op) => {
587            if let Some(input) = &mut op.input {
588                annotate_pushdown_hints(input, store);
589            }
590        }
591        LogicalOperator::EdgeScan(op) => {
592            if let Some(input) = &mut op.input {
593                annotate_pushdown_hints(input, store);
594            }
595        }
596        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
597        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
598        LogicalOperator::Join(op) => {
599            annotate_pushdown_hints(&mut op.left, store);
600            annotate_pushdown_hints(&mut op.right, store);
601        }
602        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
603        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
604        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
605        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
606        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
607        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
608        LogicalOperator::Union(op) => {
609            for input in &mut op.inputs {
610                annotate_pushdown_hints(input, store);
611            }
612        }
613        LogicalOperator::Apply(op) => {
614            annotate_pushdown_hints(&mut op.input, store);
615            annotate_pushdown_hints(&mut op.subplan, store);
616        }
617        LogicalOperator::Otherwise(op) => {
618            annotate_pushdown_hints(&mut op.left, store);
619            annotate_pushdown_hints(&mut op.right, store);
620        }
621        _ => {}
622    }
623}
624
625/// Infers the pushdown strategy for a filter predicate over a node scan.
626fn infer_pushdown(
627    predicate: &LogicalExpression,
628    scan: &crate::query::plan::NodeScanOp,
629    store: &dyn grafeo_core::graph::GraphStore,
630) -> Option<crate::query::plan::PushdownHint> {
631    #[allow(clippy::wildcard_imports)]
632    use crate::query::plan::*;
633
634    match predicate {
635        // Equality: n.prop = value
636        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
637            if let Some(prop) = extract_property_name(left, &scan.variable)
638                .or_else(|| extract_property_name(right, &scan.variable))
639            {
640                if store.has_property_index(&prop) {
641                    return Some(PushdownHint::IndexLookup { property: prop });
642                }
643                if scan.label.is_some() {
644                    return Some(PushdownHint::LabelFirst);
645                }
646            }
647            None
648        }
649        // Range: n.prop > value, n.prop < value, etc.
650        LogicalExpression::Binary {
651            left,
652            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
653            right,
654        } => {
655            if let Some(prop) = extract_property_name(left, &scan.variable)
656                .or_else(|| extract_property_name(right, &scan.variable))
657            {
658                if store.has_property_index(&prop) {
659                    return Some(PushdownHint::RangeScan { property: prop });
660                }
661                if scan.label.is_some() {
662                    return Some(PushdownHint::LabelFirst);
663                }
664            }
665            None
666        }
667        // AND: check the left side (first conjunct) for pushdown
668        LogicalExpression::Binary {
669            left,
670            op: BinaryOp::And,
671            ..
672        } => infer_pushdown(left, scan, store),
673        _ => {
674            // Any other predicate on a labeled scan gets label-first
675            if scan.label.is_some() {
676                Some(PushdownHint::LabelFirst)
677            } else {
678                None
679            }
680        }
681    }
682}
683
684/// Extracts the property name if the expression is `Property { variable, property }`
685/// and the variable matches the scan variable.
686fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
687    if let LogicalExpression::Property { variable, property } = expr
688        && variable == scan_var
689    {
690        Some(property.clone())
691    } else {
692        None
693    }
694}
695
696/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
697pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
698    let tree_text = plan.root.explain_tree();
699    QueryResult {
700        columns: vec!["plan".to_string()],
701        column_types: vec![grafeo_common::types::LogicalType::String],
702        rows: vec![vec![Value::String(tree_text.into())]],
703        execution_time_ms: None,
704        rows_scanned: None,
705        status_message: None,
706        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
707    }
708}
709
710/// Formats a physical EXPLAIN result showing both the logical plan and the
711/// physical operator names mapped to each logical operator.
712#[cfg(feature = "triple-store")]
713pub(crate) fn physical_explain_result(
714    plan: &LogicalPlan,
715    entries: Vec<crate::query::profile::ProfileEntry>,
716) -> QueryResult {
717    let tree = crate::query::profile::build_profile_tree(&plan.root, &mut entries.into_iter());
718
719    let mut output = String::new();
720    format_physical_node(&mut output, &tree, 0);
721
722    QueryResult {
723        columns: vec!["plan".to_string()],
724        column_types: vec![grafeo_common::types::LogicalType::String],
725        rows: vec![vec![Value::String(output.into())]],
726        execution_time_ms: None,
727        rows_scanned: None,
728        status_message: None,
729        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
730    }
731}
732
733/// Recursively formats a physical plan node showing operator name and label.
734#[cfg(feature = "triple-store")]
735fn format_physical_node(out: &mut String, node: &crate::query::profile::ProfileNode, depth: usize) {
736    use std::fmt::Write;
737    let indent = "  ".repeat(depth);
738    let _ = writeln!(out, "{indent}{} {}", node.name, node.label);
739    for child in &node.children {
740        format_physical_node(out, child, depth + 1);
741    }
742}
743
744/// Substitutes parameters in a logical plan with their values.
745///
746/// # Errors
747///
748/// Returns an error if a referenced parameter is not found in `params`.
749pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
750    substitute_in_operator(&mut plan.root, params)
751}
752
753/// Recursively substitutes parameters in an operator.
754fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
755    #[allow(clippy::wildcard_imports)]
756    use crate::query::plan::*;
757
758    match op {
759        LogicalOperator::Filter(filter) => {
760            substitute_in_expression(&mut filter.predicate, params)?;
761            substitute_in_operator(&mut filter.input, params)?;
762        }
763        LogicalOperator::Return(ret) => {
764            for item in &mut ret.items {
765                substitute_in_expression(&mut item.expression, params)?;
766            }
767            substitute_in_operator(&mut ret.input, params)?;
768        }
769        LogicalOperator::Project(proj) => {
770            for p in &mut proj.projections {
771                substitute_in_expression(&mut p.expression, params)?;
772            }
773            substitute_in_operator(&mut proj.input, params)?;
774        }
775        LogicalOperator::NodeScan(scan) => {
776            if let Some(input) = &mut scan.input {
777                substitute_in_operator(input, params)?;
778            }
779        }
780        LogicalOperator::EdgeScan(scan) => {
781            if let Some(input) = &mut scan.input {
782                substitute_in_operator(input, params)?;
783            }
784        }
785        LogicalOperator::Expand(expand) => {
786            substitute_in_operator(&mut expand.input, params)?;
787        }
788        LogicalOperator::Join(join) => {
789            substitute_in_operator(&mut join.left, params)?;
790            substitute_in_operator(&mut join.right, params)?;
791            for cond in &mut join.conditions {
792                substitute_in_expression(&mut cond.left, params)?;
793                substitute_in_expression(&mut cond.right, params)?;
794            }
795        }
796        LogicalOperator::LeftJoin(join) => {
797            substitute_in_operator(&mut join.left, params)?;
798            substitute_in_operator(&mut join.right, params)?;
799            if let Some(cond) = &mut join.condition {
800                substitute_in_expression(cond, params)?;
801            }
802        }
803        LogicalOperator::Aggregate(agg) => {
804            for expr in &mut agg.group_by {
805                substitute_in_expression(expr, params)?;
806            }
807            for agg_expr in &mut agg.aggregates {
808                if let Some(expr) = &mut agg_expr.expression {
809                    substitute_in_expression(expr, params)?;
810                }
811            }
812            substitute_in_operator(&mut agg.input, params)?;
813        }
814        LogicalOperator::Sort(sort) => {
815            for key in &mut sort.keys {
816                substitute_in_expression(&mut key.expression, params)?;
817            }
818            substitute_in_operator(&mut sort.input, params)?;
819        }
820        LogicalOperator::Limit(limit) => {
821            resolve_count_param(&mut limit.count, params)?;
822            substitute_in_operator(&mut limit.input, params)?;
823        }
824        LogicalOperator::Skip(skip) => {
825            resolve_count_param(&mut skip.count, params)?;
826            substitute_in_operator(&mut skip.input, params)?;
827        }
828        LogicalOperator::Distinct(distinct) => {
829            substitute_in_operator(&mut distinct.input, params)?;
830        }
831        LogicalOperator::CreateNode(create) => {
832            for (_, expr) in &mut create.properties {
833                substitute_in_expression(expr, params)?;
834            }
835            if let Some(input) = &mut create.input {
836                substitute_in_operator(input, params)?;
837            }
838        }
839        LogicalOperator::CreateEdge(create) => {
840            for (_, expr) in &mut create.properties {
841                substitute_in_expression(expr, params)?;
842            }
843            substitute_in_operator(&mut create.input, params)?;
844        }
845        LogicalOperator::DeleteNode(delete) => {
846            substitute_in_operator(&mut delete.input, params)?;
847        }
848        LogicalOperator::DeleteEdge(delete) => {
849            substitute_in_operator(&mut delete.input, params)?;
850        }
851        LogicalOperator::SetProperty(set) => {
852            for (_, expr) in &mut set.properties {
853                substitute_in_expression(expr, params)?;
854            }
855            substitute_in_operator(&mut set.input, params)?;
856        }
857        LogicalOperator::Union(union) => {
858            for input in &mut union.inputs {
859                substitute_in_operator(input, params)?;
860            }
861        }
862        LogicalOperator::AntiJoin(anti) => {
863            substitute_in_operator(&mut anti.left, params)?;
864            substitute_in_operator(&mut anti.right, params)?;
865        }
866        LogicalOperator::Bind(bind) => {
867            substitute_in_expression(&mut bind.expression, params)?;
868            substitute_in_operator(&mut bind.input, params)?;
869        }
870        LogicalOperator::TripleScan(scan) => {
871            if let Some(input) = &mut scan.input {
872                substitute_in_operator(input, params)?;
873            }
874        }
875        LogicalOperator::Unwind(unwind) => {
876            substitute_in_expression(&mut unwind.expression, params)?;
877            substitute_in_operator(&mut unwind.input, params)?;
878        }
879        LogicalOperator::MapCollect(mc) => {
880            substitute_in_operator(&mut mc.input, params)?;
881        }
882        LogicalOperator::Merge(merge) => {
883            for (_, expr) in &mut merge.match_properties {
884                substitute_in_expression(expr, params)?;
885            }
886            for (_, expr) in &mut merge.on_create {
887                substitute_in_expression(expr, params)?;
888            }
889            for (_, expr) in &mut merge.on_match {
890                substitute_in_expression(expr, params)?;
891            }
892            substitute_in_operator(&mut merge.input, params)?;
893        }
894        LogicalOperator::MergeRelationship(merge_rel) => {
895            for (_, expr) in &mut merge_rel.match_properties {
896                substitute_in_expression(expr, params)?;
897            }
898            for (_, expr) in &mut merge_rel.on_create {
899                substitute_in_expression(expr, params)?;
900            }
901            for (_, expr) in &mut merge_rel.on_match {
902                substitute_in_expression(expr, params)?;
903            }
904            substitute_in_operator(&mut merge_rel.input, params)?;
905        }
906        LogicalOperator::AddLabel(add_label) => {
907            substitute_in_operator(&mut add_label.input, params)?;
908        }
909        LogicalOperator::RemoveLabel(remove_label) => {
910            substitute_in_operator(&mut remove_label.input, params)?;
911        }
912        LogicalOperator::ShortestPath(sp) => {
913            substitute_in_operator(&mut sp.input, params)?;
914        }
915        // SPARQL Update operators
916        LogicalOperator::InsertTriple(insert) => {
917            if let Some(ref mut input) = insert.input {
918                substitute_in_operator(input, params)?;
919            }
920        }
921        LogicalOperator::DeleteTriple(delete) => {
922            if let Some(ref mut input) = delete.input {
923                substitute_in_operator(input, params)?;
924            }
925        }
926        LogicalOperator::Modify(modify) => {
927            substitute_in_operator(&mut modify.where_clause, params)?;
928        }
929        LogicalOperator::ClearGraph(_)
930        | LogicalOperator::CreateGraph(_)
931        | LogicalOperator::DropGraph(_)
932        | LogicalOperator::LoadGraph(_)
933        | LogicalOperator::CopyGraph(_)
934        | LogicalOperator::MoveGraph(_)
935        | LogicalOperator::AddGraph(_) => {}
936        LogicalOperator::HorizontalAggregate(op) => {
937            substitute_in_operator(&mut op.input, params)?;
938        }
939        LogicalOperator::Empty => {}
940        LogicalOperator::VectorScan(scan) => {
941            substitute_in_expression(&mut scan.query_vector, params)?;
942            if let Some(ref mut input) = scan.input {
943                substitute_in_operator(input, params)?;
944            }
945        }
946        LogicalOperator::VectorJoin(join) => {
947            substitute_in_expression(&mut join.query_vector, params)?;
948            substitute_in_operator(&mut join.input, params)?;
949        }
950        LogicalOperator::TextScan(scan) => {
951            substitute_in_expression(&mut scan.query, params)?;
952        }
953        LogicalOperator::Except(except) => {
954            substitute_in_operator(&mut except.left, params)?;
955            substitute_in_operator(&mut except.right, params)?;
956        }
957        LogicalOperator::Intersect(intersect) => {
958            substitute_in_operator(&mut intersect.left, params)?;
959            substitute_in_operator(&mut intersect.right, params)?;
960        }
961        LogicalOperator::Otherwise(otherwise) => {
962            substitute_in_operator(&mut otherwise.left, params)?;
963            substitute_in_operator(&mut otherwise.right, params)?;
964        }
965        LogicalOperator::Apply(apply) => {
966            substitute_in_operator(&mut apply.input, params)?;
967            substitute_in_operator(&mut apply.subplan, params)?;
968        }
969        // ParameterScan has no expressions to substitute
970        LogicalOperator::ParameterScan(_) => {}
971        LogicalOperator::MultiWayJoin(mwj) => {
972            for input in &mut mwj.inputs {
973                substitute_in_operator(input, params)?;
974            }
975            for cond in &mut mwj.conditions {
976                substitute_in_expression(&mut cond.left, params)?;
977                substitute_in_expression(&mut cond.right, params)?;
978            }
979        }
980        // DDL operators have no expressions to substitute
981        LogicalOperator::CreatePropertyGraph(_) => {}
982        // Procedure calls: arguments could contain parameters but we handle at execution time
983        LogicalOperator::CallProcedure(_) => {}
984        // LoadData: file path is a literal, no parameter substitution needed
985        LogicalOperator::LoadData(_) => {}
986        // Construct: template uses variables, substitute in the WHERE input
987        LogicalOperator::Construct(construct) => {
988            substitute_in_operator(&mut construct.input, params)?;
989        }
990    }
991    Ok(())
992}
993
994/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
995fn resolve_count_param(
996    count: &mut crate::query::plan::CountExpr,
997    params: &QueryParams,
998) -> Result<()> {
999    use crate::query::plan::CountExpr;
1000    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
1001
1002    if let CountExpr::Parameter(name) = count {
1003        let value = params.get(name.as_str()).ok_or_else(|| {
1004            Error::Query(QueryError::new(
1005                QueryErrorKind::Semantic,
1006                format!("Missing parameter for SKIP/LIMIT: ${name}"),
1007            ))
1008        })?;
1009        let n = match value {
1010            // reason: guard ensures *i >= 0
1011            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1012            Value::Int64(i) if *i >= 0 => *i as usize,
1013            Value::Int64(i) => {
1014                return Err(Error::Query(QueryError::new(
1015                    QueryErrorKind::Semantic,
1016                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
1017                )));
1018            }
1019            other => {
1020                return Err(Error::Query(QueryError::new(
1021                    QueryErrorKind::Semantic,
1022                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
1023                )));
1024            }
1025        };
1026        *count = CountExpr::Literal(n);
1027    }
1028    Ok(())
1029}
1030
1031/// Substitutes parameters in an expression with their values.
1032fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
1033    use crate::query::plan::LogicalExpression;
1034
1035    match expr {
1036        LogicalExpression::Parameter(name) => {
1037            if let Some(value) = params.get(name) {
1038                *expr = LogicalExpression::Literal(value.clone());
1039            } else {
1040                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
1041            }
1042        }
1043        LogicalExpression::Binary { left, right, .. } => {
1044            substitute_in_expression(left, params)?;
1045            substitute_in_expression(right, params)?;
1046        }
1047        LogicalExpression::Unary { operand, .. } => {
1048            substitute_in_expression(operand, params)?;
1049        }
1050        LogicalExpression::FunctionCall { args, .. } => {
1051            for arg in args {
1052                substitute_in_expression(arg, params)?;
1053            }
1054        }
1055        LogicalExpression::List(items) => {
1056            for item in items {
1057                substitute_in_expression(item, params)?;
1058            }
1059        }
1060        LogicalExpression::Map(pairs) => {
1061            for (_, value) in pairs {
1062                substitute_in_expression(value, params)?;
1063            }
1064        }
1065        LogicalExpression::IndexAccess { base, index } => {
1066            substitute_in_expression(base, params)?;
1067            substitute_in_expression(index, params)?;
1068        }
1069        LogicalExpression::SliceAccess { base, start, end } => {
1070            substitute_in_expression(base, params)?;
1071            if let Some(s) = start {
1072                substitute_in_expression(s, params)?;
1073            }
1074            if let Some(e) = end {
1075                substitute_in_expression(e, params)?;
1076            }
1077        }
1078        LogicalExpression::Case {
1079            operand,
1080            when_clauses,
1081            else_clause,
1082        } => {
1083            if let Some(op) = operand {
1084                substitute_in_expression(op, params)?;
1085            }
1086            for (cond, result) in when_clauses {
1087                substitute_in_expression(cond, params)?;
1088                substitute_in_expression(result, params)?;
1089            }
1090            if let Some(el) = else_clause {
1091                substitute_in_expression(el, params)?;
1092            }
1093        }
1094        LogicalExpression::Property { .. }
1095        | LogicalExpression::Variable(_)
1096        | LogicalExpression::Literal(_)
1097        | LogicalExpression::Labels(_)
1098        | LogicalExpression::Type(_)
1099        | LogicalExpression::Id(_) => {}
1100        LogicalExpression::ListComprehension {
1101            list_expr,
1102            filter_expr,
1103            map_expr,
1104            ..
1105        } => {
1106            substitute_in_expression(list_expr, params)?;
1107            if let Some(filter) = filter_expr {
1108                substitute_in_expression(filter, params)?;
1109            }
1110            substitute_in_expression(map_expr, params)?;
1111        }
1112        LogicalExpression::ListPredicate {
1113            list_expr,
1114            predicate,
1115            ..
1116        } => {
1117            substitute_in_expression(list_expr, params)?;
1118            substitute_in_expression(predicate, params)?;
1119        }
1120        LogicalExpression::ExistsSubquery(subplan)
1121        | LogicalExpression::CountSubquery(subplan)
1122        | LogicalExpression::ValueSubquery(subplan) => {
1123            substitute_in_operator(subplan, params)?;
1124        }
1125        LogicalExpression::PatternComprehension { projection, .. } => {
1126            substitute_in_expression(projection, params)?;
1127        }
1128        LogicalExpression::MapProjection { entries, .. } => {
1129            for entry in entries {
1130                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1131                    substitute_in_expression(expr, params)?;
1132                }
1133            }
1134        }
1135        LogicalExpression::Reduce {
1136            initial,
1137            list,
1138            expression,
1139            ..
1140        } => {
1141            substitute_in_expression(initial, params)?;
1142            substitute_in_expression(list, params)?;
1143            substitute_in_expression(expression, params)?;
1144        }
1145    }
1146    Ok(())
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    use super::*;
1152
1153    #[test]
1154    fn test_query_language_is_lpg() {
1155        #[cfg(feature = "gql")]
1156        assert!(QueryLanguage::Gql.is_lpg());
1157        #[cfg(feature = "cypher")]
1158        assert!(QueryLanguage::Cypher.is_lpg());
1159        #[cfg(feature = "sparql")]
1160        assert!(!QueryLanguage::Sparql.is_lpg());
1161    }
1162
1163    #[test]
1164    fn test_processor_creation() {
1165        let store = Arc::new(LpgStore::new().unwrap());
1166        let processor = QueryProcessor::for_lpg(store);
1167        assert!(processor.lpg_store().node_count() == 0);
1168    }
1169
1170    #[cfg(feature = "gql")]
1171    #[test]
1172    fn test_process_simple_gql() {
1173        let store = Arc::new(LpgStore::new().unwrap());
1174        store.create_node(&["Person"]);
1175        store.create_node(&["Person"]);
1176
1177        let processor = QueryProcessor::for_lpg(store);
1178        let result = processor
1179            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1180            .unwrap();
1181
1182        assert_eq!(result.row_count(), 2);
1183        assert_eq!(result.columns[0], "n");
1184    }
1185
1186    #[cfg(feature = "cypher")]
1187    #[test]
1188    fn test_process_simple_cypher() {
1189        let store = Arc::new(LpgStore::new().unwrap());
1190        store.create_node(&["Person"]);
1191
1192        let processor = QueryProcessor::for_lpg(store);
1193        let result = processor
1194            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1195            .unwrap();
1196
1197        assert_eq!(result.row_count(), 1);
1198    }
1199
1200    #[cfg(feature = "gql")]
1201    #[test]
1202    fn test_process_with_params() {
1203        let store = Arc::new(LpgStore::new().unwrap());
1204        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1205        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1206        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1207
1208        let processor = QueryProcessor::for_lpg(store);
1209
1210        // Query with parameter
1211        let mut params = HashMap::new();
1212        params.insert("min_age".to_string(), Value::Int64(30));
1213
1214        let result = processor
1215            .process(
1216                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1217                QueryLanguage::Gql,
1218                Some(&params),
1219            )
1220            .unwrap();
1221
1222        // Should return 2 people (ages 35 and 45)
1223        assert_eq!(result.row_count(), 2);
1224    }
1225
1226    #[cfg(feature = "gql")]
1227    #[test]
1228    fn test_missing_param_error() {
1229        let store = Arc::new(LpgStore::new().unwrap());
1230        store.create_node(&["Person"]);
1231
1232        let processor = QueryProcessor::for_lpg(store);
1233
1234        // Query with parameter but empty params map (missing the required param)
1235        let params: HashMap<String, Value> = HashMap::new();
1236        let result = processor.process(
1237            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1238            QueryLanguage::Gql,
1239            Some(&params),
1240        );
1241
1242        // Should fail with missing parameter error
1243        assert!(result.is_err());
1244        let err = result.unwrap_err();
1245        assert!(
1246            err.to_string().contains("Missing parameter"),
1247            "Expected 'Missing parameter' error, got: {}",
1248            err
1249        );
1250    }
1251
1252    #[cfg(feature = "gql")]
1253    #[test]
1254    fn test_params_in_filter_with_property() {
1255        // Tests parameter substitution in WHERE clause with property comparison
1256        let store = Arc::new(LpgStore::new().unwrap());
1257        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1258        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1259
1260        let processor = QueryProcessor::for_lpg(store);
1261
1262        let mut params = HashMap::new();
1263        params.insert("threshold".to_string(), Value::Int64(15));
1264
1265        let result = processor
1266            .process(
1267                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1268                QueryLanguage::Gql,
1269                Some(&params),
1270            )
1271            .unwrap();
1272
1273        // Only value=20 matches > 15
1274        assert_eq!(result.row_count(), 1);
1275        let row = &result.rows[0];
1276        assert_eq!(row[0], Value::Int64(20));
1277    }
1278
1279    #[cfg(feature = "gql")]
1280    #[test]
1281    fn test_params_in_multiple_where_conditions() {
1282        // Tests multiple parameters in WHERE clause with AND
1283        let store = Arc::new(LpgStore::new().unwrap());
1284        store.create_node_with_props(
1285            &["Person"],
1286            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1287        );
1288        store.create_node_with_props(
1289            &["Person"],
1290            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1291        );
1292        store.create_node_with_props(
1293            &["Person"],
1294            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1295        );
1296
1297        let processor = QueryProcessor::for_lpg(store);
1298
1299        let mut params = HashMap::new();
1300        params.insert("min_age".to_string(), Value::Int64(30));
1301        params.insert("min_score".to_string(), Value::Int64(75));
1302
1303        let result = processor
1304            .process(
1305                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1306                QueryLanguage::Gql,
1307                Some(&params),
1308            )
1309            .unwrap();
1310
1311        // Only the person with age=35, score=90 matches both conditions
1312        assert_eq!(result.row_count(), 1);
1313    }
1314
1315    #[cfg(feature = "gql")]
1316    #[test]
1317    fn test_params_with_in_list() {
1318        // Tests parameter as a value checked against IN list
1319        let store = Arc::new(LpgStore::new().unwrap());
1320        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1321        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1322        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1323
1324        let processor = QueryProcessor::for_lpg(store);
1325
1326        // Check if a parameter value matches any of the statuses
1327        let mut params = HashMap::new();
1328        params.insert("target".to_string(), Value::String("active".into()));
1329
1330        let result = processor
1331            .process(
1332                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1333                QueryLanguage::Gql,
1334                Some(&params),
1335            )
1336            .unwrap();
1337
1338        assert_eq!(result.row_count(), 1);
1339    }
1340
1341    #[cfg(feature = "gql")]
1342    #[test]
1343    fn test_params_same_type_comparison() {
1344        // Tests that same-type parameter comparisons work correctly
1345        let store = Arc::new(LpgStore::new().unwrap());
1346        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1347        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1348
1349        let processor = QueryProcessor::for_lpg(store);
1350
1351        // Compare int property with int parameter
1352        let mut params = HashMap::new();
1353        params.insert("threshold".to_string(), Value::Int64(75));
1354
1355        let result = processor
1356            .process(
1357                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1358                QueryLanguage::Gql,
1359                Some(&params),
1360            )
1361            .unwrap();
1362
1363        // Only value=100 matches > 75
1364        assert_eq!(result.row_count(), 1);
1365    }
1366
1367    #[cfg(feature = "gql")]
1368    #[test]
1369    fn test_process_empty_result_has_columns() {
1370        // Tests that empty results still have correct column names
1371        let store = Arc::new(LpgStore::new().unwrap());
1372        // Don't create any nodes
1373
1374        let processor = QueryProcessor::for_lpg(store);
1375        let result = processor
1376            .process(
1377                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1378                QueryLanguage::Gql,
1379                None,
1380            )
1381            .unwrap();
1382
1383        assert_eq!(result.row_count(), 0);
1384        assert_eq!(result.columns.len(), 2);
1385        assert_eq!(result.columns[0], "name");
1386        assert_eq!(result.columns[1], "age");
1387    }
1388
1389    #[cfg(feature = "gql")]
1390    #[test]
1391    fn test_params_string_equality() {
1392        // Tests string parameter equality comparison
1393        let store = Arc::new(LpgStore::new().unwrap());
1394        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1395        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1396        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1397
1398        let processor = QueryProcessor::for_lpg(store);
1399
1400        let mut params = HashMap::new();
1401        params.insert("target".to_string(), Value::String("beta".into()));
1402
1403        let result = processor
1404            .process(
1405                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1406                QueryLanguage::Gql,
1407                Some(&params),
1408            )
1409            .unwrap();
1410
1411        assert_eq!(result.row_count(), 1);
1412        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1413    }
1414
1415    #[cfg(feature = "cypher")]
1416    #[test]
1417    fn test_params_in_exists_subquery() {
1418        // Regression: parameters inside EXISTS/COUNT/VALUE subqueries were not
1419        // substituted, causing type mismatches or silently wrong results.
1420        let store = Arc::new(LpgStore::new().unwrap());
1421        let alix =
1422            store.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
1423        let gus =
1424            store.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
1425        let _jules =
1426            store.create_node_with_props(&["Person"], [("name", Value::String("Jules".into()))]);
1427
1428        // Alix follows Gus (but not Jules)
1429        store.create_edge(alix, gus, "FOLLOWS");
1430
1431        let processor = QueryProcessor::for_lpg(store);
1432
1433        // Find people NOT followed by the viewer ($viewer)
1434        let mut params = HashMap::new();
1435        params.insert("viewer".to_string(), Value::String("Alix".into()));
1436
1437        let result = processor
1438            .process(
1439                "MATCH (p:Person) \
1440                 WHERE p.name <> $viewer \
1441                   AND NOT EXISTS { MATCH (v:Person)-[:FOLLOWS]->(p) WHERE v.name = $viewer } \
1442                 RETURN p.name ORDER BY p.name",
1443                QueryLanguage::Cypher,
1444                Some(&params),
1445            )
1446            .unwrap();
1447
1448        // Alix follows Gus, so only Jules should be returned
1449        assert_eq!(result.row_count(), 1);
1450        assert_eq!(result.rows[0][0], Value::String("Jules".into()));
1451    }
1452}