Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStore, GraphStoreMut};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28/// Supported query languages.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32    /// GQL (ISO/IEC 39075:2024) - default for LPG
33    #[cfg(feature = "gql")]
34    Gql,
35    /// openCypher 9.0
36    #[cfg(feature = "cypher")]
37    Cypher,
38    /// Apache TinkerPop Gremlin
39    #[cfg(feature = "gremlin")]
40    Gremlin,
41    /// GraphQL for LPG
42    #[cfg(feature = "graphql")]
43    GraphQL,
44    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
45    #[cfg(feature = "sql-pgq")]
46    SqlPgq,
47    /// SPARQL 1.1 for RDF
48    #[cfg(feature = "sparql")]
49    Sparql,
50    /// GraphQL for RDF
51    #[cfg(all(feature = "graphql", feature = "triple-store"))]
52    GraphQLRdf,
53}
54
55impl QueryLanguage {
56    /// Returns whether this language targets LPG (vs RDF).
57    #[must_use]
58    pub const fn is_lpg(&self) -> bool {
59        match self {
60            #[cfg(feature = "gql")]
61            Self::Gql => true,
62            #[cfg(feature = "cypher")]
63            Self::Cypher => true,
64            #[cfg(feature = "gremlin")]
65            Self::Gremlin => true,
66            #[cfg(feature = "graphql")]
67            Self::GraphQL => true,
68            #[cfg(feature = "sql-pgq")]
69            Self::SqlPgq => true,
70            #[cfg(feature = "sparql")]
71            Self::Sparql => false,
72            #[cfg(all(feature = "graphql", feature = "triple-store"))]
73            Self::GraphQLRdf => false,
74            #[allow(unreachable_patterns)]
75            _ => false,
76        }
77    }
78}
79
80/// Query parameters for prepared statements.
81pub type QueryParams = HashMap<String, Value>;
82
83/// Processes queries through the full pipeline.
84///
85/// The processor holds references to the stores and provides a unified
86/// interface for executing queries in any supported language.
87///
88/// # Example
89///
90/// ```no_run
91/// # use std::sync::Arc;
92/// # use grafeo_core::graph::lpg::LpgStore;
93/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
94///
95/// # fn main() -> grafeo_common::utils::error::Result<()> {
96/// let store = Arc::new(LpgStore::new().unwrap());
97/// let processor = QueryProcessor::for_lpg(store);
98/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
99/// # Ok(())
100/// # }
101/// ```
102pub struct QueryProcessor {
103    /// LPG store for property graph queries.
104    #[cfg(feature = "lpg")]
105    lpg_store: Arc<LpgStore>,
106    /// Graph store trait object for pluggable storage backends (read path).
107    graph_store: Arc<dyn GraphStore>,
108    /// Writable graph store (None when read-only).
109    write_store: Option<Arc<dyn GraphStoreMut>>,
110    /// Transaction manager for MVCC operations.
111    transaction_manager: Arc<TransactionManager>,
112    /// Catalog for schema and index metadata.
113    catalog: Arc<Catalog>,
114    /// Query optimizer.
115    optimizer: Optimizer,
116    /// Current transaction context (if any).
117    transaction_context: Option<(EpochId, TransactionId)>,
118    /// RDF store for triple pattern queries (optional).
119    #[cfg(feature = "triple-store")]
120    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
121}
122
123impl QueryProcessor {
124    /// Creates a new query processor for LPG queries.
125    #[cfg(feature = "lpg")]
126    #[must_use]
127    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
128        let optimizer = Optimizer::from_store(&store);
129        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
130        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
131        Self {
132            lpg_store: store,
133            graph_store,
134            write_store,
135            transaction_manager: Arc::new(TransactionManager::new()),
136            catalog: Arc::new(Catalog::new()),
137            optimizer,
138            transaction_context: None,
139            #[cfg(feature = "triple-store")]
140            rdf_store: None,
141        }
142    }
143
144    /// Creates a new query processor with a transaction manager.
145    #[cfg(feature = "lpg")]
146    #[must_use]
147    pub fn for_lpg_with_transaction(
148        store: Arc<LpgStore>,
149        transaction_manager: Arc<TransactionManager>,
150    ) -> Self {
151        let optimizer = Optimizer::from_store(&store);
152        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
153        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
154        Self {
155            lpg_store: store,
156            graph_store,
157            write_store,
158            transaction_manager,
159            catalog: Arc::new(Catalog::new()),
160            optimizer,
161            transaction_context: None,
162            #[cfg(feature = "triple-store")]
163            rdf_store: None,
164        }
165    }
166
167    /// Creates a query processor backed by any `GraphStoreMut` implementation.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the internal arena allocation fails (out of memory).
172    pub fn for_graph_store_with_transaction(
173        store: Arc<dyn GraphStoreMut>,
174        transaction_manager: Arc<TransactionManager>,
175    ) -> Result<Self> {
176        let optimizer = Optimizer::from_graph_store(&*store);
177        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
178        Ok(Self {
179            #[cfg(feature = "lpg")]
180            lpg_store: Arc::new(LpgStore::new()?),
181            graph_store: read_store,
182            write_store: Some(store),
183            transaction_manager,
184            catalog: Arc::new(Catalog::new()),
185            optimizer,
186            transaction_context: None,
187            #[cfg(feature = "triple-store")]
188            rdf_store: None,
189        })
190    }
191
192    /// Creates a query processor from split read/write stores.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the internal arena allocation fails (out of memory).
197    pub fn for_stores_with_transaction(
198        read_store: Arc<dyn GraphStore>,
199        write_store: Option<Arc<dyn GraphStoreMut>>,
200        transaction_manager: Arc<TransactionManager>,
201    ) -> Result<Self> {
202        let optimizer = Optimizer::from_graph_store(&*read_store);
203        Ok(Self {
204            #[cfg(feature = "lpg")]
205            lpg_store: Arc::new(LpgStore::new()?),
206            graph_store: read_store,
207            write_store,
208            transaction_manager,
209            catalog: Arc::new(Catalog::new()),
210            optimizer,
211            transaction_context: None,
212            #[cfg(feature = "triple-store")]
213            rdf_store: None,
214        })
215    }
216
217    /// Sets the transaction context for MVCC visibility.
218    ///
219    /// This should be called when the processor is used within a transaction.
220    #[must_use]
221    pub fn with_transaction_context(
222        mut self,
223        viewing_epoch: EpochId,
224        transaction_id: TransactionId,
225    ) -> Self {
226        self.transaction_context = Some((viewing_epoch, transaction_id));
227        self
228    }
229
230    /// Sets a custom catalog.
231    #[must_use]
232    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
233        self.catalog = catalog;
234        self
235    }
236
237    /// Sets a custom optimizer.
238    #[must_use]
239    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
240        self.optimizer = optimizer;
241        self
242    }
243
244    /// Processes a query string and returns results.
245    ///
246    /// Pipeline:
247    /// 1. Parse (language-specific parser → AST)
248    /// 2. Translate (AST → LogicalPlan)
249    /// 3. Bind (semantic validation)
250    /// 4. Optimize (filter pushdown, join reorder, etc.)
251    /// 5. Plan (logical → physical operators)
252    /// 6. Execute (run operators, collect results)
253    ///
254    /// # Arguments
255    ///
256    /// * `query` - The query string
257    /// * `language` - Which query language to use
258    /// * `params` - Optional query parameters for prepared statements
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if any stage of the pipeline fails.
263    pub fn process(
264        &self,
265        query: &str,
266        language: QueryLanguage,
267        params: Option<&QueryParams>,
268    ) -> Result<QueryResult> {
269        if language.is_lpg() {
270            self.process_lpg(query, language, params)
271        } else {
272            #[cfg(feature = "triple-store")]
273            {
274                self.process_rdf(query, language, params)
275            }
276            #[cfg(not(feature = "triple-store"))]
277            {
278                Err(Error::Internal(
279                    "RDF support not enabled. Compile with --features rdf".to_string(),
280                ))
281            }
282        }
283    }
284
285    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
286    fn process_lpg(
287        &self,
288        query: &str,
289        language: QueryLanguage,
290        params: Option<&QueryParams>,
291    ) -> Result<QueryResult> {
292        #[cfg(not(target_arch = "wasm32"))]
293        let start_time = std::time::Instant::now();
294
295        // 1. Parse and translate to logical plan
296        let mut logical_plan = self.translate_lpg(query, language)?;
297
298        // 2. Substitute parameters if provided (merge defaults from the plan first)
299        let has_defaults = !logical_plan.default_params.is_empty();
300        if params.is_some() || has_defaults {
301            let merged = if has_defaults {
302                let mut merged = logical_plan.default_params.clone();
303                if let Some(params) = params {
304                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
305                }
306                merged
307            } else {
308                params.cloned().unwrap_or_default()
309            };
310            substitute_params(&mut logical_plan, &merged)?;
311        }
312
313        // 3. Semantic validation
314        let mut binder = Binder::new();
315        let _binding_context = binder.bind(&logical_plan)?;
316
317        // 4. Optimize the plan
318        let optimized_plan = self.optimizer.optimize(logical_plan)?;
319
320        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
321        if optimized_plan.explain {
322            let mut plan = optimized_plan;
323            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
324            return Ok(explain_result(&plan));
325        }
326
327        // 5. Convert to physical plan with transaction context
328        // Read-only fast path: safe when no mutations AND no active transaction
329        // (an active transaction may have prior uncommitted writes from earlier statements)
330        let is_read_only =
331            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
332        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
333            Planner::with_context(
334                Arc::clone(&self.graph_store),
335                self.write_store.as_ref().map(Arc::clone),
336                Arc::clone(&self.transaction_manager),
337                Some(transaction_id),
338                epoch,
339            )
340        } else {
341            Planner::with_context(
342                Arc::clone(&self.graph_store),
343                self.write_store.as_ref().map(Arc::clone),
344                Arc::clone(&self.transaction_manager),
345                None,
346                self.transaction_manager.current_epoch(),
347            )
348        }
349        .with_read_only(is_read_only);
350        let mut physical_plan = planner.plan(&optimized_plan)?;
351
352        // 6. Execute and collect results
353        let executor = Executor::with_columns(physical_plan.columns.clone());
354        let mut result = executor.execute(physical_plan.operator.as_mut())?;
355
356        // Add execution metrics
357        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
358        #[cfg(not(target_arch = "wasm32"))]
359        {
360            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
361            result.execution_time_ms = Some(elapsed_ms);
362        }
363        result.rows_scanned = Some(rows_scanned);
364
365        Ok(result)
366    }
367
368    /// Translates an LPG query to a logical plan.
369    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
370        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
371        match language {
372            #[cfg(feature = "gql")]
373            QueryLanguage::Gql => {
374                use crate::query::translators::gql;
375                gql::translate(query)
376            }
377            #[cfg(feature = "cypher")]
378            QueryLanguage::Cypher => {
379                use crate::query::translators::cypher;
380                cypher::translate(query)
381            }
382            #[cfg(feature = "gremlin")]
383            QueryLanguage::Gremlin => {
384                use crate::query::translators::gremlin;
385                gremlin::translate(query)
386            }
387            #[cfg(feature = "graphql")]
388            QueryLanguage::GraphQL => {
389                use crate::query::translators::graphql;
390                graphql::translate(query)
391            }
392            #[cfg(feature = "sql-pgq")]
393            QueryLanguage::SqlPgq => {
394                use crate::query::translators::sql_pgq;
395                sql_pgq::translate(query)
396            }
397            #[allow(unreachable_patterns)]
398            _ => Err(Error::Internal(format!(
399                "Language {:?} is not an LPG language",
400                language
401            ))),
402        }
403    }
404
405    /// Returns a reference to the LPG store.
406    #[cfg(feature = "lpg")]
407    #[must_use]
408    pub fn lpg_store(&self) -> &Arc<LpgStore> {
409        &self.lpg_store
410    }
411
412    /// Returns a reference to the catalog.
413    #[must_use]
414    pub fn catalog(&self) -> &Arc<Catalog> {
415        &self.catalog
416    }
417
418    /// Returns a reference to the optimizer.
419    #[must_use]
420    pub fn optimizer(&self) -> &Optimizer {
421        &self.optimizer
422    }
423}
424
425impl QueryProcessor {
426    /// Returns a reference to the transaction manager.
427    #[must_use]
428    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
429        &self.transaction_manager
430    }
431}
432
433// =========================================================================
434// RDF-specific methods (gated behind `rdf` feature)
435// =========================================================================
436
437#[cfg(feature = "triple-store")]
438impl QueryProcessor {
439    /// Creates a new query processor with both LPG and RDF stores.
440    #[cfg(feature = "lpg")]
441    #[must_use]
442    pub fn with_rdf(
443        lpg_store: Arc<LpgStore>,
444        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
445    ) -> Self {
446        let optimizer = Optimizer::from_store(&lpg_store);
447        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
448        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
449        Self {
450            lpg_store,
451            graph_store,
452            write_store,
453            transaction_manager: Arc::new(TransactionManager::new()),
454            catalog: Arc::new(Catalog::new()),
455            optimizer,
456            transaction_context: None,
457            rdf_store: Some(rdf_store),
458        }
459    }
460
461    /// Returns a reference to the RDF store (if configured).
462    #[must_use]
463    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
464        self.rdf_store.as_ref()
465    }
466
467    /// Processes an RDF query (SPARQL, GraphQL-RDF).
468    fn process_rdf(
469        &self,
470        query: &str,
471        language: QueryLanguage,
472        params: Option<&QueryParams>,
473    ) -> Result<QueryResult> {
474        use crate::query::planner::rdf::RdfPlanner;
475
476        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
477            Error::Internal("RDF store not configured for this processor".to_string())
478        })?;
479
480        // 1. Parse and translate to logical plan
481        let mut logical_plan = self.translate_rdf(query, language)?;
482
483        // 2. Substitute parameters if provided (merge defaults from the plan first)
484        let has_defaults = !logical_plan.default_params.is_empty();
485        if params.is_some() || has_defaults {
486            let merged = if has_defaults {
487                let mut merged = logical_plan.default_params.clone();
488                if let Some(params) = params {
489                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
490                }
491                merged
492            } else {
493                params.cloned().unwrap_or_default()
494            };
495            substitute_params(&mut logical_plan, &merged)?;
496        }
497
498        // 3. Semantic validation
499        let mut binder = Binder::new();
500        let _binding_context = binder.bind(&logical_plan)?;
501
502        // 3. Optimize the plan (use RDF statistics for cost-based optimization)
503        let rdf_optimizer = {
504            let stats = rdf_store.get_or_collect_statistics();
505            Optimizer::from_rdf_statistics((*stats).clone())
506        };
507        let optimized_plan = rdf_optimizer.optimize(logical_plan)?;
508
509        // 3a. EXPLAIN: return the plan tree without executing.
510        // Includes physical operator names by planning with profiling to collect entries.
511        if optimized_plan.explain {
512            let planner = RdfPlanner::new(Arc::clone(rdf_store));
513            let (_, entries) = planner.plan_profiled(&optimized_plan)?;
514            return Ok(physical_explain_result(&optimized_plan, entries));
515        }
516
517        // 3b. EXPLAIN ANALYZE (PROFILE): execute with instrumentation, report stats.
518        if optimized_plan.profile {
519            let planner = RdfPlanner::new(Arc::clone(rdf_store));
520            let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
521
522            let start = std::time::Instant::now();
523            let executor = Executor::with_columns(physical_plan.columns.clone());
524            let _result = executor.execute(physical_plan.operator.as_mut())?;
525            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
526
527            let tree = crate::query::profile::build_profile_tree(
528                &optimized_plan.root,
529                &mut entries.into_iter(),
530            );
531            return Ok(crate::query::profile::profile_result(&tree, elapsed_ms));
532        }
533
534        // 4. Convert to physical plan (using RDF planner)
535        let planner = RdfPlanner::new(Arc::clone(rdf_store));
536        let mut physical_plan = planner.plan(&optimized_plan)?;
537
538        // 5. Execute and collect results
539        let executor = Executor::with_columns(physical_plan.columns.clone());
540        executor.execute(physical_plan.operator.as_mut())
541    }
542
543    /// Translates an RDF query to a logical plan.
544    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
545        match language {
546            #[cfg(feature = "sparql")]
547            QueryLanguage::Sparql => {
548                use crate::query::translators::sparql;
549                sparql::translate(query)
550            }
551            #[cfg(all(feature = "graphql", feature = "triple-store"))]
552            QueryLanguage::GraphQLRdf => {
553                use crate::query::translators::graphql_rdf;
554                // Default namespace for GraphQL-RDF queries
555                graphql_rdf::translate(query, "http://example.org/")
556            }
557            _ => Err(Error::Internal(format!(
558                "Language {:?} is not an RDF language",
559                language
560            ))),
561        }
562    }
563}
564
565/// Annotates filter operators in the plan with pushdown hints.
566///
567/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
568/// whether a property index exists for equality predicates.
569pub(crate) fn annotate_pushdown_hints(
570    op: &mut LogicalOperator,
571    store: &dyn grafeo_core::graph::GraphStore,
572) {
573    #[allow(clippy::wildcard_imports)]
574    use crate::query::plan::*;
575
576    match op {
577        LogicalOperator::Filter(filter) => {
578            // Recurse into children first
579            annotate_pushdown_hints(&mut filter.input, store);
580
581            // Annotate this filter if it sits on top of a NodeScan
582            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
583                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
584            }
585        }
586        LogicalOperator::NodeScan(op) => {
587            if let Some(input) = &mut op.input {
588                annotate_pushdown_hints(input, store);
589            }
590        }
591        LogicalOperator::EdgeScan(op) => {
592            if let Some(input) = &mut op.input {
593                annotate_pushdown_hints(input, store);
594            }
595        }
596        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
597        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
598        LogicalOperator::Join(op) => {
599            annotate_pushdown_hints(&mut op.left, store);
600            annotate_pushdown_hints(&mut op.right, store);
601        }
602        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
603        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
604        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
605        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
606        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
607        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
608        LogicalOperator::Union(op) => {
609            for input in &mut op.inputs {
610                annotate_pushdown_hints(input, store);
611            }
612        }
613        LogicalOperator::Apply(op) => {
614            annotate_pushdown_hints(&mut op.input, store);
615            annotate_pushdown_hints(&mut op.subplan, store);
616        }
617        LogicalOperator::Otherwise(op) => {
618            annotate_pushdown_hints(&mut op.left, store);
619            annotate_pushdown_hints(&mut op.right, store);
620        }
621        _ => {}
622    }
623}
624
625/// Infers the pushdown strategy for a filter predicate over a node scan.
626fn infer_pushdown(
627    predicate: &LogicalExpression,
628    scan: &crate::query::plan::NodeScanOp,
629    store: &dyn grafeo_core::graph::GraphStore,
630) -> Option<crate::query::plan::PushdownHint> {
631    #[allow(clippy::wildcard_imports)]
632    use crate::query::plan::*;
633
634    match predicate {
635        // Equality: n.prop = value
636        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
637            if let Some(prop) = extract_property_name(left, &scan.variable)
638                .or_else(|| extract_property_name(right, &scan.variable))
639            {
640                if store.has_property_index(&prop) {
641                    return Some(PushdownHint::IndexLookup { property: prop });
642                }
643                if scan.label.is_some() {
644                    return Some(PushdownHint::LabelFirst);
645                }
646            }
647            None
648        }
649        // Range: n.prop > value, n.prop < value, etc.
650        LogicalExpression::Binary {
651            left,
652            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
653            right,
654        } => {
655            if let Some(prop) = extract_property_name(left, &scan.variable)
656                .or_else(|| extract_property_name(right, &scan.variable))
657            {
658                if store.has_property_index(&prop) {
659                    return Some(PushdownHint::RangeScan { property: prop });
660                }
661                if scan.label.is_some() {
662                    return Some(PushdownHint::LabelFirst);
663                }
664            }
665            None
666        }
667        // AND: check the left side (first conjunct) for pushdown
668        LogicalExpression::Binary {
669            left,
670            op: BinaryOp::And,
671            ..
672        } => infer_pushdown(left, scan, store),
673        _ => {
674            // Any other predicate on a labeled scan gets label-first
675            if scan.label.is_some() {
676                Some(PushdownHint::LabelFirst)
677            } else {
678                None
679            }
680        }
681    }
682}
683
684/// Extracts the property name if the expression is `Property { variable, property }`
685/// and the variable matches the scan variable.
686fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
687    if let LogicalExpression::Property { variable, property } = expr
688        && variable == scan_var
689    {
690        Some(property.clone())
691    } else {
692        None
693    }
694}
695
696/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
697pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
698    let tree_text = plan.root.explain_tree();
699    QueryResult {
700        columns: vec!["plan".to_string()],
701        column_types: vec![grafeo_common::types::LogicalType::String],
702        rows: vec![vec![Value::String(tree_text.into())]],
703        execution_time_ms: None,
704        rows_scanned: None,
705        status_message: None,
706        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
707    }
708}
709
710/// Formats a physical EXPLAIN result showing both the logical plan and the
711/// physical operator names mapped to each logical operator.
712pub(crate) fn physical_explain_result(
713    plan: &LogicalPlan,
714    entries: Vec<crate::query::profile::ProfileEntry>,
715) -> QueryResult {
716    let tree = crate::query::profile::build_profile_tree(&plan.root, &mut entries.into_iter());
717
718    let mut output = String::new();
719    format_physical_node(&mut output, &tree, 0);
720
721    QueryResult {
722        columns: vec!["plan".to_string()],
723        column_types: vec![grafeo_common::types::LogicalType::String],
724        rows: vec![vec![Value::String(output.into())]],
725        execution_time_ms: None,
726        rows_scanned: None,
727        status_message: None,
728        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
729    }
730}
731
732/// Recursively formats a physical plan node showing operator name and label.
733fn format_physical_node(out: &mut String, node: &crate::query::profile::ProfileNode, depth: usize) {
734    use std::fmt::Write;
735    let indent = "  ".repeat(depth);
736    let _ = writeln!(out, "{indent}{} {}", node.name, node.label);
737    for child in &node.children {
738        format_physical_node(out, child, depth + 1);
739    }
740}
741
742/// Substitutes parameters in a logical plan with their values.
743///
744/// # Errors
745///
746/// Returns an error if a referenced parameter is not found in `params`.
747pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
748    substitute_in_operator(&mut plan.root, params)
749}
750
751/// Recursively substitutes parameters in an operator.
752fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
753    #[allow(clippy::wildcard_imports)]
754    use crate::query::plan::*;
755
756    match op {
757        LogicalOperator::Filter(filter) => {
758            substitute_in_expression(&mut filter.predicate, params)?;
759            substitute_in_operator(&mut filter.input, params)?;
760        }
761        LogicalOperator::Return(ret) => {
762            for item in &mut ret.items {
763                substitute_in_expression(&mut item.expression, params)?;
764            }
765            substitute_in_operator(&mut ret.input, params)?;
766        }
767        LogicalOperator::Project(proj) => {
768            for p in &mut proj.projections {
769                substitute_in_expression(&mut p.expression, params)?;
770            }
771            substitute_in_operator(&mut proj.input, params)?;
772        }
773        LogicalOperator::NodeScan(scan) => {
774            if let Some(input) = &mut scan.input {
775                substitute_in_operator(input, params)?;
776            }
777        }
778        LogicalOperator::EdgeScan(scan) => {
779            if let Some(input) = &mut scan.input {
780                substitute_in_operator(input, params)?;
781            }
782        }
783        LogicalOperator::Expand(expand) => {
784            substitute_in_operator(&mut expand.input, params)?;
785        }
786        LogicalOperator::Join(join) => {
787            substitute_in_operator(&mut join.left, params)?;
788            substitute_in_operator(&mut join.right, params)?;
789            for cond in &mut join.conditions {
790                substitute_in_expression(&mut cond.left, params)?;
791                substitute_in_expression(&mut cond.right, params)?;
792            }
793        }
794        LogicalOperator::LeftJoin(join) => {
795            substitute_in_operator(&mut join.left, params)?;
796            substitute_in_operator(&mut join.right, params)?;
797            if let Some(cond) = &mut join.condition {
798                substitute_in_expression(cond, params)?;
799            }
800        }
801        LogicalOperator::Aggregate(agg) => {
802            for expr in &mut agg.group_by {
803                substitute_in_expression(expr, params)?;
804            }
805            for agg_expr in &mut agg.aggregates {
806                if let Some(expr) = &mut agg_expr.expression {
807                    substitute_in_expression(expr, params)?;
808                }
809            }
810            substitute_in_operator(&mut agg.input, params)?;
811        }
812        LogicalOperator::Sort(sort) => {
813            for key in &mut sort.keys {
814                substitute_in_expression(&mut key.expression, params)?;
815            }
816            substitute_in_operator(&mut sort.input, params)?;
817        }
818        LogicalOperator::Limit(limit) => {
819            resolve_count_param(&mut limit.count, params)?;
820            substitute_in_operator(&mut limit.input, params)?;
821        }
822        LogicalOperator::Skip(skip) => {
823            resolve_count_param(&mut skip.count, params)?;
824            substitute_in_operator(&mut skip.input, params)?;
825        }
826        LogicalOperator::Distinct(distinct) => {
827            substitute_in_operator(&mut distinct.input, params)?;
828        }
829        LogicalOperator::CreateNode(create) => {
830            for (_, expr) in &mut create.properties {
831                substitute_in_expression(expr, params)?;
832            }
833            if let Some(input) = &mut create.input {
834                substitute_in_operator(input, params)?;
835            }
836        }
837        LogicalOperator::CreateEdge(create) => {
838            for (_, expr) in &mut create.properties {
839                substitute_in_expression(expr, params)?;
840            }
841            substitute_in_operator(&mut create.input, params)?;
842        }
843        LogicalOperator::DeleteNode(delete) => {
844            substitute_in_operator(&mut delete.input, params)?;
845        }
846        LogicalOperator::DeleteEdge(delete) => {
847            substitute_in_operator(&mut delete.input, params)?;
848        }
849        LogicalOperator::SetProperty(set) => {
850            for (_, expr) in &mut set.properties {
851                substitute_in_expression(expr, params)?;
852            }
853            substitute_in_operator(&mut set.input, params)?;
854        }
855        LogicalOperator::Union(union) => {
856            for input in &mut union.inputs {
857                substitute_in_operator(input, params)?;
858            }
859        }
860        LogicalOperator::AntiJoin(anti) => {
861            substitute_in_operator(&mut anti.left, params)?;
862            substitute_in_operator(&mut anti.right, params)?;
863        }
864        LogicalOperator::Bind(bind) => {
865            substitute_in_expression(&mut bind.expression, params)?;
866            substitute_in_operator(&mut bind.input, params)?;
867        }
868        LogicalOperator::TripleScan(scan) => {
869            if let Some(input) = &mut scan.input {
870                substitute_in_operator(input, params)?;
871            }
872        }
873        LogicalOperator::Unwind(unwind) => {
874            substitute_in_expression(&mut unwind.expression, params)?;
875            substitute_in_operator(&mut unwind.input, params)?;
876        }
877        LogicalOperator::MapCollect(mc) => {
878            substitute_in_operator(&mut mc.input, params)?;
879        }
880        LogicalOperator::Merge(merge) => {
881            for (_, expr) in &mut merge.match_properties {
882                substitute_in_expression(expr, params)?;
883            }
884            for (_, expr) in &mut merge.on_create {
885                substitute_in_expression(expr, params)?;
886            }
887            for (_, expr) in &mut merge.on_match {
888                substitute_in_expression(expr, params)?;
889            }
890            substitute_in_operator(&mut merge.input, params)?;
891        }
892        LogicalOperator::MergeRelationship(merge_rel) => {
893            for (_, expr) in &mut merge_rel.match_properties {
894                substitute_in_expression(expr, params)?;
895            }
896            for (_, expr) in &mut merge_rel.on_create {
897                substitute_in_expression(expr, params)?;
898            }
899            for (_, expr) in &mut merge_rel.on_match {
900                substitute_in_expression(expr, params)?;
901            }
902            substitute_in_operator(&mut merge_rel.input, params)?;
903        }
904        LogicalOperator::AddLabel(add_label) => {
905            substitute_in_operator(&mut add_label.input, params)?;
906        }
907        LogicalOperator::RemoveLabel(remove_label) => {
908            substitute_in_operator(&mut remove_label.input, params)?;
909        }
910        LogicalOperator::ShortestPath(sp) => {
911            substitute_in_operator(&mut sp.input, params)?;
912        }
913        // SPARQL Update operators
914        LogicalOperator::InsertTriple(insert) => {
915            if let Some(ref mut input) = insert.input {
916                substitute_in_operator(input, params)?;
917            }
918        }
919        LogicalOperator::DeleteTriple(delete) => {
920            if let Some(ref mut input) = delete.input {
921                substitute_in_operator(input, params)?;
922            }
923        }
924        LogicalOperator::Modify(modify) => {
925            substitute_in_operator(&mut modify.where_clause, params)?;
926        }
927        LogicalOperator::ClearGraph(_)
928        | LogicalOperator::CreateGraph(_)
929        | LogicalOperator::DropGraph(_)
930        | LogicalOperator::LoadGraph(_)
931        | LogicalOperator::CopyGraph(_)
932        | LogicalOperator::MoveGraph(_)
933        | LogicalOperator::AddGraph(_) => {}
934        LogicalOperator::HorizontalAggregate(op) => {
935            substitute_in_operator(&mut op.input, params)?;
936        }
937        LogicalOperator::Empty => {}
938        LogicalOperator::VectorScan(scan) => {
939            substitute_in_expression(&mut scan.query_vector, params)?;
940            if let Some(ref mut input) = scan.input {
941                substitute_in_operator(input, params)?;
942            }
943        }
944        LogicalOperator::VectorJoin(join) => {
945            substitute_in_expression(&mut join.query_vector, params)?;
946            substitute_in_operator(&mut join.input, params)?;
947        }
948        LogicalOperator::Except(except) => {
949            substitute_in_operator(&mut except.left, params)?;
950            substitute_in_operator(&mut except.right, params)?;
951        }
952        LogicalOperator::Intersect(intersect) => {
953            substitute_in_operator(&mut intersect.left, params)?;
954            substitute_in_operator(&mut intersect.right, params)?;
955        }
956        LogicalOperator::Otherwise(otherwise) => {
957            substitute_in_operator(&mut otherwise.left, params)?;
958            substitute_in_operator(&mut otherwise.right, params)?;
959        }
960        LogicalOperator::Apply(apply) => {
961            substitute_in_operator(&mut apply.input, params)?;
962            substitute_in_operator(&mut apply.subplan, params)?;
963        }
964        // ParameterScan has no expressions to substitute
965        LogicalOperator::ParameterScan(_) => {}
966        LogicalOperator::MultiWayJoin(mwj) => {
967            for input in &mut mwj.inputs {
968                substitute_in_operator(input, params)?;
969            }
970            for cond in &mut mwj.conditions {
971                substitute_in_expression(&mut cond.left, params)?;
972                substitute_in_expression(&mut cond.right, params)?;
973            }
974        }
975        // DDL operators have no expressions to substitute
976        LogicalOperator::CreatePropertyGraph(_) => {}
977        // Procedure calls: arguments could contain parameters but we handle at execution time
978        LogicalOperator::CallProcedure(_) => {}
979        // LoadData: file path is a literal, no parameter substitution needed
980        LogicalOperator::LoadData(_) => {}
981        // Construct: template uses variables, substitute in the WHERE input
982        LogicalOperator::Construct(construct) => {
983            substitute_in_operator(&mut construct.input, params)?;
984        }
985    }
986    Ok(())
987}
988
989/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
990fn resolve_count_param(
991    count: &mut crate::query::plan::CountExpr,
992    params: &QueryParams,
993) -> Result<()> {
994    use crate::query::plan::CountExpr;
995    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
996
997    if let CountExpr::Parameter(name) = count {
998        let value = params.get(name.as_str()).ok_or_else(|| {
999            Error::Query(QueryError::new(
1000                QueryErrorKind::Semantic,
1001                format!("Missing parameter for SKIP/LIMIT: ${name}"),
1002            ))
1003        })?;
1004        let n = match value {
1005            // reason: guard ensures *i >= 0
1006            #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1007            Value::Int64(i) if *i >= 0 => *i as usize,
1008            Value::Int64(i) => {
1009                return Err(Error::Query(QueryError::new(
1010                    QueryErrorKind::Semantic,
1011                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
1012                )));
1013            }
1014            other => {
1015                return Err(Error::Query(QueryError::new(
1016                    QueryErrorKind::Semantic,
1017                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
1018                )));
1019            }
1020        };
1021        *count = CountExpr::Literal(n);
1022    }
1023    Ok(())
1024}
1025
1026/// Substitutes parameters in an expression with their values.
1027fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
1028    use crate::query::plan::LogicalExpression;
1029
1030    match expr {
1031        LogicalExpression::Parameter(name) => {
1032            if let Some(value) = params.get(name) {
1033                *expr = LogicalExpression::Literal(value.clone());
1034            } else {
1035                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
1036            }
1037        }
1038        LogicalExpression::Binary { left, right, .. } => {
1039            substitute_in_expression(left, params)?;
1040            substitute_in_expression(right, params)?;
1041        }
1042        LogicalExpression::Unary { operand, .. } => {
1043            substitute_in_expression(operand, params)?;
1044        }
1045        LogicalExpression::FunctionCall { args, .. } => {
1046            for arg in args {
1047                substitute_in_expression(arg, params)?;
1048            }
1049        }
1050        LogicalExpression::List(items) => {
1051            for item in items {
1052                substitute_in_expression(item, params)?;
1053            }
1054        }
1055        LogicalExpression::Map(pairs) => {
1056            for (_, value) in pairs {
1057                substitute_in_expression(value, params)?;
1058            }
1059        }
1060        LogicalExpression::IndexAccess { base, index } => {
1061            substitute_in_expression(base, params)?;
1062            substitute_in_expression(index, params)?;
1063        }
1064        LogicalExpression::SliceAccess { base, start, end } => {
1065            substitute_in_expression(base, params)?;
1066            if let Some(s) = start {
1067                substitute_in_expression(s, params)?;
1068            }
1069            if let Some(e) = end {
1070                substitute_in_expression(e, params)?;
1071            }
1072        }
1073        LogicalExpression::Case {
1074            operand,
1075            when_clauses,
1076            else_clause,
1077        } => {
1078            if let Some(op) = operand {
1079                substitute_in_expression(op, params)?;
1080            }
1081            for (cond, result) in when_clauses {
1082                substitute_in_expression(cond, params)?;
1083                substitute_in_expression(result, params)?;
1084            }
1085            if let Some(el) = else_clause {
1086                substitute_in_expression(el, params)?;
1087            }
1088        }
1089        LogicalExpression::Property { .. }
1090        | LogicalExpression::Variable(_)
1091        | LogicalExpression::Literal(_)
1092        | LogicalExpression::Labels(_)
1093        | LogicalExpression::Type(_)
1094        | LogicalExpression::Id(_) => {}
1095        LogicalExpression::ListComprehension {
1096            list_expr,
1097            filter_expr,
1098            map_expr,
1099            ..
1100        } => {
1101            substitute_in_expression(list_expr, params)?;
1102            if let Some(filter) = filter_expr {
1103                substitute_in_expression(filter, params)?;
1104            }
1105            substitute_in_expression(map_expr, params)?;
1106        }
1107        LogicalExpression::ListPredicate {
1108            list_expr,
1109            predicate,
1110            ..
1111        } => {
1112            substitute_in_expression(list_expr, params)?;
1113            substitute_in_expression(predicate, params)?;
1114        }
1115        LogicalExpression::ExistsSubquery(subplan)
1116        | LogicalExpression::CountSubquery(subplan)
1117        | LogicalExpression::ValueSubquery(subplan) => {
1118            substitute_in_operator(subplan, params)?;
1119        }
1120        LogicalExpression::PatternComprehension { projection, .. } => {
1121            substitute_in_expression(projection, params)?;
1122        }
1123        LogicalExpression::MapProjection { entries, .. } => {
1124            for entry in entries {
1125                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1126                    substitute_in_expression(expr, params)?;
1127                }
1128            }
1129        }
1130        LogicalExpression::Reduce {
1131            initial,
1132            list,
1133            expression,
1134            ..
1135        } => {
1136            substitute_in_expression(initial, params)?;
1137            substitute_in_expression(list, params)?;
1138            substitute_in_expression(expression, params)?;
1139        }
1140    }
1141    Ok(())
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147
1148    #[test]
1149    fn test_query_language_is_lpg() {
1150        #[cfg(feature = "gql")]
1151        assert!(QueryLanguage::Gql.is_lpg());
1152        #[cfg(feature = "cypher")]
1153        assert!(QueryLanguage::Cypher.is_lpg());
1154        #[cfg(feature = "sparql")]
1155        assert!(!QueryLanguage::Sparql.is_lpg());
1156    }
1157
1158    #[test]
1159    fn test_processor_creation() {
1160        let store = Arc::new(LpgStore::new().unwrap());
1161        let processor = QueryProcessor::for_lpg(store);
1162        assert!(processor.lpg_store().node_count() == 0);
1163    }
1164
1165    #[cfg(feature = "gql")]
1166    #[test]
1167    fn test_process_simple_gql() {
1168        let store = Arc::new(LpgStore::new().unwrap());
1169        store.create_node(&["Person"]);
1170        store.create_node(&["Person"]);
1171
1172        let processor = QueryProcessor::for_lpg(store);
1173        let result = processor
1174            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1175            .unwrap();
1176
1177        assert_eq!(result.row_count(), 2);
1178        assert_eq!(result.columns[0], "n");
1179    }
1180
1181    #[cfg(feature = "cypher")]
1182    #[test]
1183    fn test_process_simple_cypher() {
1184        let store = Arc::new(LpgStore::new().unwrap());
1185        store.create_node(&["Person"]);
1186
1187        let processor = QueryProcessor::for_lpg(store);
1188        let result = processor
1189            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1190            .unwrap();
1191
1192        assert_eq!(result.row_count(), 1);
1193    }
1194
1195    #[cfg(feature = "gql")]
1196    #[test]
1197    fn test_process_with_params() {
1198        let store = Arc::new(LpgStore::new().unwrap());
1199        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1200        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1201        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1202
1203        let processor = QueryProcessor::for_lpg(store);
1204
1205        // Query with parameter
1206        let mut params = HashMap::new();
1207        params.insert("min_age".to_string(), Value::Int64(30));
1208
1209        let result = processor
1210            .process(
1211                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1212                QueryLanguage::Gql,
1213                Some(&params),
1214            )
1215            .unwrap();
1216
1217        // Should return 2 people (ages 35 and 45)
1218        assert_eq!(result.row_count(), 2);
1219    }
1220
1221    #[cfg(feature = "gql")]
1222    #[test]
1223    fn test_missing_param_error() {
1224        let store = Arc::new(LpgStore::new().unwrap());
1225        store.create_node(&["Person"]);
1226
1227        let processor = QueryProcessor::for_lpg(store);
1228
1229        // Query with parameter but empty params map (missing the required param)
1230        let params: HashMap<String, Value> = HashMap::new();
1231        let result = processor.process(
1232            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1233            QueryLanguage::Gql,
1234            Some(&params),
1235        );
1236
1237        // Should fail with missing parameter error
1238        assert!(result.is_err());
1239        let err = result.unwrap_err();
1240        assert!(
1241            err.to_string().contains("Missing parameter"),
1242            "Expected 'Missing parameter' error, got: {}",
1243            err
1244        );
1245    }
1246
1247    #[cfg(feature = "gql")]
1248    #[test]
1249    fn test_params_in_filter_with_property() {
1250        // Tests parameter substitution in WHERE clause with property comparison
1251        let store = Arc::new(LpgStore::new().unwrap());
1252        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1253        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1254
1255        let processor = QueryProcessor::for_lpg(store);
1256
1257        let mut params = HashMap::new();
1258        params.insert("threshold".to_string(), Value::Int64(15));
1259
1260        let result = processor
1261            .process(
1262                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1263                QueryLanguage::Gql,
1264                Some(&params),
1265            )
1266            .unwrap();
1267
1268        // Only value=20 matches > 15
1269        assert_eq!(result.row_count(), 1);
1270        let row = &result.rows[0];
1271        assert_eq!(row[0], Value::Int64(20));
1272    }
1273
1274    #[cfg(feature = "gql")]
1275    #[test]
1276    fn test_params_in_multiple_where_conditions() {
1277        // Tests multiple parameters in WHERE clause with AND
1278        let store = Arc::new(LpgStore::new().unwrap());
1279        store.create_node_with_props(
1280            &["Person"],
1281            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1282        );
1283        store.create_node_with_props(
1284            &["Person"],
1285            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1286        );
1287        store.create_node_with_props(
1288            &["Person"],
1289            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1290        );
1291
1292        let processor = QueryProcessor::for_lpg(store);
1293
1294        let mut params = HashMap::new();
1295        params.insert("min_age".to_string(), Value::Int64(30));
1296        params.insert("min_score".to_string(), Value::Int64(75));
1297
1298        let result = processor
1299            .process(
1300                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1301                QueryLanguage::Gql,
1302                Some(&params),
1303            )
1304            .unwrap();
1305
1306        // Only the person with age=35, score=90 matches both conditions
1307        assert_eq!(result.row_count(), 1);
1308    }
1309
1310    #[cfg(feature = "gql")]
1311    #[test]
1312    fn test_params_with_in_list() {
1313        // Tests parameter as a value checked against IN list
1314        let store = Arc::new(LpgStore::new().unwrap());
1315        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1316        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1317        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1318
1319        let processor = QueryProcessor::for_lpg(store);
1320
1321        // Check if a parameter value matches any of the statuses
1322        let mut params = HashMap::new();
1323        params.insert("target".to_string(), Value::String("active".into()));
1324
1325        let result = processor
1326            .process(
1327                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1328                QueryLanguage::Gql,
1329                Some(&params),
1330            )
1331            .unwrap();
1332
1333        assert_eq!(result.row_count(), 1);
1334    }
1335
1336    #[cfg(feature = "gql")]
1337    #[test]
1338    fn test_params_same_type_comparison() {
1339        // Tests that same-type parameter comparisons work correctly
1340        let store = Arc::new(LpgStore::new().unwrap());
1341        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1342        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1343
1344        let processor = QueryProcessor::for_lpg(store);
1345
1346        // Compare int property with int parameter
1347        let mut params = HashMap::new();
1348        params.insert("threshold".to_string(), Value::Int64(75));
1349
1350        let result = processor
1351            .process(
1352                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1353                QueryLanguage::Gql,
1354                Some(&params),
1355            )
1356            .unwrap();
1357
1358        // Only value=100 matches > 75
1359        assert_eq!(result.row_count(), 1);
1360    }
1361
1362    #[cfg(feature = "gql")]
1363    #[test]
1364    fn test_process_empty_result_has_columns() {
1365        // Tests that empty results still have correct column names
1366        let store = Arc::new(LpgStore::new().unwrap());
1367        // Don't create any nodes
1368
1369        let processor = QueryProcessor::for_lpg(store);
1370        let result = processor
1371            .process(
1372                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1373                QueryLanguage::Gql,
1374                None,
1375            )
1376            .unwrap();
1377
1378        assert_eq!(result.row_count(), 0);
1379        assert_eq!(result.columns.len(), 2);
1380        assert_eq!(result.columns[0], "name");
1381        assert_eq!(result.columns[1], "age");
1382    }
1383
1384    #[cfg(feature = "gql")]
1385    #[test]
1386    fn test_params_string_equality() {
1387        // Tests string parameter equality comparison
1388        let store = Arc::new(LpgStore::new().unwrap());
1389        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1390        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1391        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1392
1393        let processor = QueryProcessor::for_lpg(store);
1394
1395        let mut params = HashMap::new();
1396        params.insert("target".to_string(), Value::String("beta".into()));
1397
1398        let result = processor
1399            .process(
1400                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1401                QueryLanguage::Gql,
1402                Some(&params),
1403            )
1404            .unwrap();
1405
1406        assert_eq!(result.row_count(), 1);
1407        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1408    }
1409
1410    #[cfg(feature = "cypher")]
1411    #[test]
1412    fn test_params_in_exists_subquery() {
1413        // Regression: parameters inside EXISTS/COUNT/VALUE subqueries were not
1414        // substituted, causing type mismatches or silently wrong results.
1415        let store = Arc::new(LpgStore::new().unwrap());
1416        let alix =
1417            store.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
1418        let gus =
1419            store.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
1420        let _jules =
1421            store.create_node_with_props(&["Person"], [("name", Value::String("Jules".into()))]);
1422
1423        // Alix follows Gus (but not Jules)
1424        store.create_edge(alix, gus, "FOLLOWS");
1425
1426        let processor = QueryProcessor::for_lpg(store);
1427
1428        // Find people NOT followed by the viewer ($viewer)
1429        let mut params = HashMap::new();
1430        params.insert("viewer".to_string(), Value::String("Alix".into()));
1431
1432        let result = processor
1433            .process(
1434                "MATCH (p:Person) \
1435                 WHERE p.name <> $viewer \
1436                   AND NOT EXISTS { MATCH (v:Person)-[:FOLLOWS]->(p) WHERE v.name = $viewer } \
1437                 RETURN p.name ORDER BY p.name",
1438                QueryLanguage::Cypher,
1439                Some(&params),
1440            )
1441            .unwrap();
1442
1443        // Alix follows Gus, so only Jules should be returned
1444        assert_eq!(result.row_count(), 1);
1445        assert_eq!(result.rows[0][0], Value::String("Jules".into()));
1446    }
1447}