Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStore, GraphStoreMut};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28/// Supported query languages.
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32    /// GQL (ISO/IEC 39075:2024) - default for LPG
33    #[cfg(feature = "gql")]
34    Gql,
35    /// openCypher 9.0
36    #[cfg(feature = "cypher")]
37    Cypher,
38    /// Apache TinkerPop Gremlin
39    #[cfg(feature = "gremlin")]
40    Gremlin,
41    /// GraphQL for LPG
42    #[cfg(feature = "graphql")]
43    GraphQL,
44    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
45    #[cfg(feature = "sql-pgq")]
46    SqlPgq,
47    /// SPARQL 1.1 for RDF
48    #[cfg(feature = "sparql")]
49    Sparql,
50    /// GraphQL for RDF
51    #[cfg(all(feature = "graphql", feature = "triple-store"))]
52    GraphQLRdf,
53}
54
55impl QueryLanguage {
56    /// Returns whether this language targets LPG (vs RDF).
57    #[must_use]
58    pub const fn is_lpg(&self) -> bool {
59        match self {
60            #[cfg(feature = "gql")]
61            Self::Gql => true,
62            #[cfg(feature = "cypher")]
63            Self::Cypher => true,
64            #[cfg(feature = "gremlin")]
65            Self::Gremlin => true,
66            #[cfg(feature = "graphql")]
67            Self::GraphQL => true,
68            #[cfg(feature = "sql-pgq")]
69            Self::SqlPgq => true,
70            #[cfg(feature = "sparql")]
71            Self::Sparql => false,
72            #[cfg(all(feature = "graphql", feature = "triple-store"))]
73            Self::GraphQLRdf => false,
74        }
75    }
76}
77
78/// Query parameters for prepared statements.
79pub type QueryParams = HashMap<String, Value>;
80
81/// Processes queries through the full pipeline.
82///
83/// The processor holds references to the stores and provides a unified
84/// interface for executing queries in any supported language.
85///
86/// # Example
87///
88/// ```no_run
89/// # use std::sync::Arc;
90/// # use grafeo_core::graph::lpg::LpgStore;
91/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
92///
93/// # fn main() -> grafeo_common::utils::error::Result<()> {
94/// let store = Arc::new(LpgStore::new().unwrap());
95/// let processor = QueryProcessor::for_lpg(store);
96/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
97/// # Ok(())
98/// # }
99/// ```
100pub struct QueryProcessor {
101    /// LPG store for property graph queries.
102    #[cfg(feature = "lpg")]
103    lpg_store: Arc<LpgStore>,
104    /// Graph store trait object for pluggable storage backends (read path).
105    graph_store: Arc<dyn GraphStore>,
106    /// Writable graph store (None when read-only).
107    write_store: Option<Arc<dyn GraphStoreMut>>,
108    /// Transaction manager for MVCC operations.
109    transaction_manager: Arc<TransactionManager>,
110    /// Catalog for schema and index metadata.
111    catalog: Arc<Catalog>,
112    /// Query optimizer.
113    optimizer: Optimizer,
114    /// Current transaction context (if any).
115    transaction_context: Option<(EpochId, TransactionId)>,
116    /// RDF store for triple pattern queries (optional).
117    #[cfg(feature = "triple-store")]
118    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
119}
120
121impl QueryProcessor {
122    /// Creates a new query processor for LPG queries.
123    #[cfg(feature = "lpg")]
124    #[must_use]
125    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
126        let optimizer = Optimizer::from_store(&store);
127        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
128        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
129        Self {
130            lpg_store: store,
131            graph_store,
132            write_store,
133            transaction_manager: Arc::new(TransactionManager::new()),
134            catalog: Arc::new(Catalog::new()),
135            optimizer,
136            transaction_context: None,
137            #[cfg(feature = "triple-store")]
138            rdf_store: None,
139        }
140    }
141
142    /// Creates a new query processor with a transaction manager.
143    #[cfg(feature = "lpg")]
144    #[must_use]
145    pub fn for_lpg_with_transaction(
146        store: Arc<LpgStore>,
147        transaction_manager: Arc<TransactionManager>,
148    ) -> Self {
149        let optimizer = Optimizer::from_store(&store);
150        let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
151        let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
152        Self {
153            lpg_store: store,
154            graph_store,
155            write_store,
156            transaction_manager,
157            catalog: Arc::new(Catalog::new()),
158            optimizer,
159            transaction_context: None,
160            #[cfg(feature = "triple-store")]
161            rdf_store: None,
162        }
163    }
164
165    /// Creates a query processor backed by any `GraphStoreMut` implementation.
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if the internal arena allocation fails (out of memory).
170    pub fn for_graph_store_with_transaction(
171        store: Arc<dyn GraphStoreMut>,
172        transaction_manager: Arc<TransactionManager>,
173    ) -> Result<Self> {
174        let optimizer = Optimizer::from_graph_store(&*store);
175        let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
176        Ok(Self {
177            #[cfg(feature = "lpg")]
178            lpg_store: Arc::new(LpgStore::new()?),
179            graph_store: read_store,
180            write_store: Some(store),
181            transaction_manager,
182            catalog: Arc::new(Catalog::new()),
183            optimizer,
184            transaction_context: None,
185            #[cfg(feature = "triple-store")]
186            rdf_store: None,
187        })
188    }
189
190    /// Creates a query processor from split read/write stores.
191    ///
192    /// # Errors
193    ///
194    /// Returns an error if the internal arena allocation fails (out of memory).
195    pub fn for_stores_with_transaction(
196        read_store: Arc<dyn GraphStore>,
197        write_store: Option<Arc<dyn GraphStoreMut>>,
198        transaction_manager: Arc<TransactionManager>,
199    ) -> Result<Self> {
200        let optimizer = Optimizer::from_graph_store(&*read_store);
201        Ok(Self {
202            #[cfg(feature = "lpg")]
203            lpg_store: Arc::new(LpgStore::new()?),
204            graph_store: read_store,
205            write_store,
206            transaction_manager,
207            catalog: Arc::new(Catalog::new()),
208            optimizer,
209            transaction_context: None,
210            #[cfg(feature = "triple-store")]
211            rdf_store: None,
212        })
213    }
214
215    /// Sets the transaction context for MVCC visibility.
216    ///
217    /// This should be called when the processor is used within a transaction.
218    #[must_use]
219    pub fn with_transaction_context(
220        mut self,
221        viewing_epoch: EpochId,
222        transaction_id: TransactionId,
223    ) -> Self {
224        self.transaction_context = Some((viewing_epoch, transaction_id));
225        self
226    }
227
228    /// Sets a custom catalog.
229    #[must_use]
230    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
231        self.catalog = catalog;
232        self
233    }
234
235    /// Sets a custom optimizer.
236    #[must_use]
237    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
238        self.optimizer = optimizer;
239        self
240    }
241
242    /// Processes a query string and returns results.
243    ///
244    /// Pipeline:
245    /// 1. Parse (language-specific parser → AST)
246    /// 2. Translate (AST → LogicalPlan)
247    /// 3. Bind (semantic validation)
248    /// 4. Optimize (filter pushdown, join reorder, etc.)
249    /// 5. Plan (logical → physical operators)
250    /// 6. Execute (run operators, collect results)
251    ///
252    /// # Arguments
253    ///
254    /// * `query` - The query string
255    /// * `language` - Which query language to use
256    /// * `params` - Optional query parameters for prepared statements
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if any stage of the pipeline fails.
261    pub fn process(
262        &self,
263        query: &str,
264        language: QueryLanguage,
265        params: Option<&QueryParams>,
266    ) -> Result<QueryResult> {
267        if language.is_lpg() {
268            self.process_lpg(query, language, params)
269        } else {
270            #[cfg(feature = "triple-store")]
271            {
272                self.process_rdf(query, language, params)
273            }
274            #[cfg(not(feature = "triple-store"))]
275            {
276                Err(Error::Internal(
277                    "RDF support not enabled. Compile with --features rdf".to_string(),
278                ))
279            }
280        }
281    }
282
283    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
284    fn process_lpg(
285        &self,
286        query: &str,
287        language: QueryLanguage,
288        params: Option<&QueryParams>,
289    ) -> Result<QueryResult> {
290        #[cfg(not(target_arch = "wasm32"))]
291        let start_time = std::time::Instant::now();
292
293        // 1. Parse and translate to logical plan
294        let mut logical_plan = self.translate_lpg(query, language)?;
295
296        // 2. Substitute parameters if provided (merge defaults from the plan first)
297        let has_defaults = !logical_plan.default_params.is_empty();
298        if params.is_some() || has_defaults {
299            let merged = if has_defaults {
300                let mut merged = logical_plan.default_params.clone();
301                if let Some(params) = params {
302                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
303                }
304                merged
305            } else {
306                params.cloned().unwrap_or_default()
307            };
308            substitute_params(&mut logical_plan, &merged)?;
309        }
310
311        // 3. Semantic validation
312        let mut binder = Binder::new();
313        let _binding_context = binder.bind(&logical_plan)?;
314
315        // 4. Optimize the plan
316        let optimized_plan = self.optimizer.optimize(logical_plan)?;
317
318        // 4a. EXPLAIN: annotate pushdown hints and return the plan tree
319        if optimized_plan.explain {
320            let mut plan = optimized_plan;
321            annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
322            return Ok(explain_result(&plan));
323        }
324
325        // 5. Convert to physical plan with transaction context
326        // Read-only fast path: safe when no mutations AND no active transaction
327        // (an active transaction may have prior uncommitted writes from earlier statements)
328        let is_read_only =
329            !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
330        let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
331            Planner::with_context(
332                Arc::clone(&self.graph_store),
333                self.write_store.as_ref().map(Arc::clone),
334                Arc::clone(&self.transaction_manager),
335                Some(transaction_id),
336                epoch,
337            )
338        } else {
339            Planner::with_context(
340                Arc::clone(&self.graph_store),
341                self.write_store.as_ref().map(Arc::clone),
342                Arc::clone(&self.transaction_manager),
343                None,
344                self.transaction_manager.current_epoch(),
345            )
346        }
347        .with_read_only(is_read_only);
348        let mut physical_plan = planner.plan(&optimized_plan)?;
349
350        // 6. Execute and collect results
351        let executor = Executor::with_columns(physical_plan.columns.clone());
352        let mut result = executor.execute(physical_plan.operator.as_mut())?;
353
354        // Add execution metrics
355        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
356        #[cfg(not(target_arch = "wasm32"))]
357        {
358            let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
359            result.execution_time_ms = Some(elapsed_ms);
360        }
361        result.rows_scanned = Some(rows_scanned);
362
363        Ok(result)
364    }
365
366    /// Translates an LPG query to a logical plan.
367    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
368        let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
369        match language {
370            #[cfg(feature = "gql")]
371            QueryLanguage::Gql => {
372                use crate::query::translators::gql;
373                gql::translate(query)
374            }
375            #[cfg(feature = "cypher")]
376            QueryLanguage::Cypher => {
377                use crate::query::translators::cypher;
378                cypher::translate(query)
379            }
380            #[cfg(feature = "gremlin")]
381            QueryLanguage::Gremlin => {
382                use crate::query::translators::gremlin;
383                gremlin::translate(query)
384            }
385            #[cfg(feature = "graphql")]
386            QueryLanguage::GraphQL => {
387                use crate::query::translators::graphql;
388                graphql::translate(query)
389            }
390            #[cfg(feature = "sql-pgq")]
391            QueryLanguage::SqlPgq => {
392                use crate::query::translators::sql_pgq;
393                sql_pgq::translate(query)
394            }
395            #[allow(unreachable_patterns)]
396            _ => Err(Error::Internal(format!(
397                "Language {:?} is not an LPG language",
398                language
399            ))),
400        }
401    }
402
403    /// Returns a reference to the LPG store.
404    #[cfg(feature = "lpg")]
405    #[must_use]
406    pub fn lpg_store(&self) -> &Arc<LpgStore> {
407        &self.lpg_store
408    }
409
410    /// Returns a reference to the catalog.
411    #[must_use]
412    pub fn catalog(&self) -> &Arc<Catalog> {
413        &self.catalog
414    }
415
416    /// Returns a reference to the optimizer.
417    #[must_use]
418    pub fn optimizer(&self) -> &Optimizer {
419        &self.optimizer
420    }
421}
422
423impl QueryProcessor {
424    /// Returns a reference to the transaction manager.
425    #[must_use]
426    pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
427        &self.transaction_manager
428    }
429}
430
431// =========================================================================
432// RDF-specific methods (gated behind `rdf` feature)
433// =========================================================================
434
435#[cfg(feature = "triple-store")]
436impl QueryProcessor {
437    /// Creates a new query processor with both LPG and RDF stores.
438    #[cfg(feature = "lpg")]
439    #[must_use]
440    pub fn with_rdf(
441        lpg_store: Arc<LpgStore>,
442        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
443    ) -> Self {
444        let optimizer = Optimizer::from_store(&lpg_store);
445        let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
446        let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
447        Self {
448            lpg_store,
449            graph_store,
450            write_store,
451            transaction_manager: Arc::new(TransactionManager::new()),
452            catalog: Arc::new(Catalog::new()),
453            optimizer,
454            transaction_context: None,
455            rdf_store: Some(rdf_store),
456        }
457    }
458
459    /// Returns a reference to the RDF store (if configured).
460    #[must_use]
461    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
462        self.rdf_store.as_ref()
463    }
464
465    /// Processes an RDF query (SPARQL, GraphQL-RDF).
466    fn process_rdf(
467        &self,
468        query: &str,
469        language: QueryLanguage,
470        params: Option<&QueryParams>,
471    ) -> Result<QueryResult> {
472        use crate::query::planner::rdf::RdfPlanner;
473
474        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
475            Error::Internal("RDF store not configured for this processor".to_string())
476        })?;
477
478        // 1. Parse and translate to logical plan
479        let mut logical_plan = self.translate_rdf(query, language)?;
480
481        // 2. Substitute parameters if provided (merge defaults from the plan first)
482        let has_defaults = !logical_plan.default_params.is_empty();
483        if params.is_some() || has_defaults {
484            let merged = if has_defaults {
485                let mut merged = logical_plan.default_params.clone();
486                if let Some(params) = params {
487                    merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
488                }
489                merged
490            } else {
491                params.cloned().unwrap_or_default()
492            };
493            substitute_params(&mut logical_plan, &merged)?;
494        }
495
496        // 3. Semantic validation
497        let mut binder = Binder::new();
498        let _binding_context = binder.bind(&logical_plan)?;
499
500        // 3. Optimize the plan (use RDF statistics for cost-based optimization)
501        let rdf_optimizer = {
502            let stats = rdf_store.get_or_collect_statistics();
503            Optimizer::from_rdf_statistics((*stats).clone())
504        };
505        let optimized_plan = rdf_optimizer.optimize(logical_plan)?;
506
507        // 3a. EXPLAIN: return the plan tree without executing.
508        // Includes physical operator names by planning with profiling to collect entries.
509        if optimized_plan.explain {
510            let planner = RdfPlanner::new(Arc::clone(rdf_store));
511            let (_, entries) = planner.plan_profiled(&optimized_plan)?;
512            return Ok(physical_explain_result(&optimized_plan, entries));
513        }
514
515        // 3b. EXPLAIN ANALYZE (PROFILE): execute with instrumentation, report stats.
516        if optimized_plan.profile {
517            let planner = RdfPlanner::new(Arc::clone(rdf_store));
518            let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
519
520            let start = std::time::Instant::now();
521            let executor = Executor::with_columns(physical_plan.columns.clone());
522            let _result = executor.execute(physical_plan.operator.as_mut())?;
523            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
524
525            let tree = crate::query::profile::build_profile_tree(
526                &optimized_plan.root,
527                &mut entries.into_iter(),
528            );
529            return Ok(crate::query::profile::profile_result(&tree, elapsed_ms));
530        }
531
532        // 4. Convert to physical plan (using RDF planner)
533        let planner = RdfPlanner::new(Arc::clone(rdf_store));
534        let mut physical_plan = planner.plan(&optimized_plan)?;
535
536        // 5. Execute and collect results
537        let executor = Executor::with_columns(physical_plan.columns.clone());
538        executor.execute(physical_plan.operator.as_mut())
539    }
540
541    /// Translates an RDF query to a logical plan.
542    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
543        match language {
544            #[cfg(feature = "sparql")]
545            QueryLanguage::Sparql => {
546                use crate::query::translators::sparql;
547                sparql::translate(query)
548            }
549            #[cfg(all(feature = "graphql", feature = "triple-store"))]
550            QueryLanguage::GraphQLRdf => {
551                use crate::query::translators::graphql_rdf;
552                // Default namespace for GraphQL-RDF queries
553                graphql_rdf::translate(query, "http://example.org/")
554            }
555            _ => Err(Error::Internal(format!(
556                "Language {:?} is not an RDF language",
557                language
558            ))),
559        }
560    }
561}
562
563/// Annotates filter operators in the plan with pushdown hints.
564///
565/// Walks the plan tree looking for `Filter -> NodeScan` patterns and checks
566/// whether a property index exists for equality predicates.
567pub(crate) fn annotate_pushdown_hints(
568    op: &mut LogicalOperator,
569    store: &dyn grafeo_core::graph::GraphStore,
570) {
571    #[allow(clippy::wildcard_imports)]
572    use crate::query::plan::*;
573
574    match op {
575        LogicalOperator::Filter(filter) => {
576            // Recurse into children first
577            annotate_pushdown_hints(&mut filter.input, store);
578
579            // Annotate this filter if it sits on top of a NodeScan
580            if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
581                filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
582            }
583        }
584        LogicalOperator::NodeScan(op) => {
585            if let Some(input) = &mut op.input {
586                annotate_pushdown_hints(input, store);
587            }
588        }
589        LogicalOperator::EdgeScan(op) => {
590            if let Some(input) = &mut op.input {
591                annotate_pushdown_hints(input, store);
592            }
593        }
594        LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
595        LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
596        LogicalOperator::Join(op) => {
597            annotate_pushdown_hints(&mut op.left, store);
598            annotate_pushdown_hints(&mut op.right, store);
599        }
600        LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
601        LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
602        LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
603        LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
604        LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
605        LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
606        LogicalOperator::Union(op) => {
607            for input in &mut op.inputs {
608                annotate_pushdown_hints(input, store);
609            }
610        }
611        LogicalOperator::Apply(op) => {
612            annotate_pushdown_hints(&mut op.input, store);
613            annotate_pushdown_hints(&mut op.subplan, store);
614        }
615        LogicalOperator::Otherwise(op) => {
616            annotate_pushdown_hints(&mut op.left, store);
617            annotate_pushdown_hints(&mut op.right, store);
618        }
619        _ => {}
620    }
621}
622
623/// Infers the pushdown strategy for a filter predicate over a node scan.
624fn infer_pushdown(
625    predicate: &LogicalExpression,
626    scan: &crate::query::plan::NodeScanOp,
627    store: &dyn grafeo_core::graph::GraphStore,
628) -> Option<crate::query::plan::PushdownHint> {
629    #[allow(clippy::wildcard_imports)]
630    use crate::query::plan::*;
631
632    match predicate {
633        // Equality: n.prop = value
634        LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
635            if let Some(prop) = extract_property_name(left, &scan.variable)
636                .or_else(|| extract_property_name(right, &scan.variable))
637            {
638                if store.has_property_index(&prop) {
639                    return Some(PushdownHint::IndexLookup { property: prop });
640                }
641                if scan.label.is_some() {
642                    return Some(PushdownHint::LabelFirst);
643                }
644            }
645            None
646        }
647        // Range: n.prop > value, n.prop < value, etc.
648        LogicalExpression::Binary {
649            left,
650            op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
651            right,
652        } => {
653            if let Some(prop) = extract_property_name(left, &scan.variable)
654                .or_else(|| extract_property_name(right, &scan.variable))
655            {
656                if store.has_property_index(&prop) {
657                    return Some(PushdownHint::RangeScan { property: prop });
658                }
659                if scan.label.is_some() {
660                    return Some(PushdownHint::LabelFirst);
661                }
662            }
663            None
664        }
665        // AND: check the left side (first conjunct) for pushdown
666        LogicalExpression::Binary {
667            left,
668            op: BinaryOp::And,
669            ..
670        } => infer_pushdown(left, scan, store),
671        _ => {
672            // Any other predicate on a labeled scan gets label-first
673            if scan.label.is_some() {
674                Some(PushdownHint::LabelFirst)
675            } else {
676                None
677            }
678        }
679    }
680}
681
682/// Extracts the property name if the expression is `Property { variable, property }`
683/// and the variable matches the scan variable.
684fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
685    if let LogicalExpression::Property { variable, property } = expr
686        && variable == scan_var
687    {
688        Some(property.clone())
689    } else {
690        None
691    }
692}
693
694/// Builds a `QueryResult` containing the EXPLAIN plan tree text.
695pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
696    let tree_text = plan.root.explain_tree();
697    QueryResult {
698        columns: vec!["plan".to_string()],
699        column_types: vec![grafeo_common::types::LogicalType::String],
700        rows: vec![vec![Value::String(tree_text.into())]],
701        execution_time_ms: None,
702        rows_scanned: None,
703        status_message: None,
704        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
705    }
706}
707
708/// Formats a physical EXPLAIN result showing both the logical plan and the
709/// physical operator names mapped to each logical operator.
710pub(crate) fn physical_explain_result(
711    plan: &LogicalPlan,
712    entries: Vec<crate::query::profile::ProfileEntry>,
713) -> QueryResult {
714    let tree = crate::query::profile::build_profile_tree(&plan.root, &mut entries.into_iter());
715
716    let mut output = String::new();
717    format_physical_node(&mut output, &tree, 0);
718
719    QueryResult {
720        columns: vec!["plan".to_string()],
721        column_types: vec![grafeo_common::types::LogicalType::String],
722        rows: vec![vec![Value::String(output.into())]],
723        execution_time_ms: None,
724        rows_scanned: None,
725        status_message: None,
726        gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
727    }
728}
729
730/// Recursively formats a physical plan node showing operator name and label.
731fn format_physical_node(out: &mut String, node: &crate::query::profile::ProfileNode, depth: usize) {
732    use std::fmt::Write;
733    let indent = "  ".repeat(depth);
734    let _ = writeln!(out, "{indent}{} {}", node.name, node.label);
735    for child in &node.children {
736        format_physical_node(out, child, depth + 1);
737    }
738}
739
740/// Substitutes parameters in a logical plan with their values.
741///
742/// # Errors
743///
744/// Returns an error if a referenced parameter is not found in `params`.
745pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
746    substitute_in_operator(&mut plan.root, params)
747}
748
749/// Recursively substitutes parameters in an operator.
750fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
751    #[allow(clippy::wildcard_imports)]
752    use crate::query::plan::*;
753
754    match op {
755        LogicalOperator::Filter(filter) => {
756            substitute_in_expression(&mut filter.predicate, params)?;
757            substitute_in_operator(&mut filter.input, params)?;
758        }
759        LogicalOperator::Return(ret) => {
760            for item in &mut ret.items {
761                substitute_in_expression(&mut item.expression, params)?;
762            }
763            substitute_in_operator(&mut ret.input, params)?;
764        }
765        LogicalOperator::Project(proj) => {
766            for p in &mut proj.projections {
767                substitute_in_expression(&mut p.expression, params)?;
768            }
769            substitute_in_operator(&mut proj.input, params)?;
770        }
771        LogicalOperator::NodeScan(scan) => {
772            if let Some(input) = &mut scan.input {
773                substitute_in_operator(input, params)?;
774            }
775        }
776        LogicalOperator::EdgeScan(scan) => {
777            if let Some(input) = &mut scan.input {
778                substitute_in_operator(input, params)?;
779            }
780        }
781        LogicalOperator::Expand(expand) => {
782            substitute_in_operator(&mut expand.input, params)?;
783        }
784        LogicalOperator::Join(join) => {
785            substitute_in_operator(&mut join.left, params)?;
786            substitute_in_operator(&mut join.right, params)?;
787            for cond in &mut join.conditions {
788                substitute_in_expression(&mut cond.left, params)?;
789                substitute_in_expression(&mut cond.right, params)?;
790            }
791        }
792        LogicalOperator::LeftJoin(join) => {
793            substitute_in_operator(&mut join.left, params)?;
794            substitute_in_operator(&mut join.right, params)?;
795            if let Some(cond) = &mut join.condition {
796                substitute_in_expression(cond, params)?;
797            }
798        }
799        LogicalOperator::Aggregate(agg) => {
800            for expr in &mut agg.group_by {
801                substitute_in_expression(expr, params)?;
802            }
803            for agg_expr in &mut agg.aggregates {
804                if let Some(expr) = &mut agg_expr.expression {
805                    substitute_in_expression(expr, params)?;
806                }
807            }
808            substitute_in_operator(&mut agg.input, params)?;
809        }
810        LogicalOperator::Sort(sort) => {
811            for key in &mut sort.keys {
812                substitute_in_expression(&mut key.expression, params)?;
813            }
814            substitute_in_operator(&mut sort.input, params)?;
815        }
816        LogicalOperator::Limit(limit) => {
817            resolve_count_param(&mut limit.count, params)?;
818            substitute_in_operator(&mut limit.input, params)?;
819        }
820        LogicalOperator::Skip(skip) => {
821            resolve_count_param(&mut skip.count, params)?;
822            substitute_in_operator(&mut skip.input, params)?;
823        }
824        LogicalOperator::Distinct(distinct) => {
825            substitute_in_operator(&mut distinct.input, params)?;
826        }
827        LogicalOperator::CreateNode(create) => {
828            for (_, expr) in &mut create.properties {
829                substitute_in_expression(expr, params)?;
830            }
831            if let Some(input) = &mut create.input {
832                substitute_in_operator(input, params)?;
833            }
834        }
835        LogicalOperator::CreateEdge(create) => {
836            for (_, expr) in &mut create.properties {
837                substitute_in_expression(expr, params)?;
838            }
839            substitute_in_operator(&mut create.input, params)?;
840        }
841        LogicalOperator::DeleteNode(delete) => {
842            substitute_in_operator(&mut delete.input, params)?;
843        }
844        LogicalOperator::DeleteEdge(delete) => {
845            substitute_in_operator(&mut delete.input, params)?;
846        }
847        LogicalOperator::SetProperty(set) => {
848            for (_, expr) in &mut set.properties {
849                substitute_in_expression(expr, params)?;
850            }
851            substitute_in_operator(&mut set.input, params)?;
852        }
853        LogicalOperator::Union(union) => {
854            for input in &mut union.inputs {
855                substitute_in_operator(input, params)?;
856            }
857        }
858        LogicalOperator::AntiJoin(anti) => {
859            substitute_in_operator(&mut anti.left, params)?;
860            substitute_in_operator(&mut anti.right, params)?;
861        }
862        LogicalOperator::Bind(bind) => {
863            substitute_in_expression(&mut bind.expression, params)?;
864            substitute_in_operator(&mut bind.input, params)?;
865        }
866        LogicalOperator::TripleScan(scan) => {
867            if let Some(input) = &mut scan.input {
868                substitute_in_operator(input, params)?;
869            }
870        }
871        LogicalOperator::Unwind(unwind) => {
872            substitute_in_expression(&mut unwind.expression, params)?;
873            substitute_in_operator(&mut unwind.input, params)?;
874        }
875        LogicalOperator::MapCollect(mc) => {
876            substitute_in_operator(&mut mc.input, params)?;
877        }
878        LogicalOperator::Merge(merge) => {
879            for (_, expr) in &mut merge.match_properties {
880                substitute_in_expression(expr, params)?;
881            }
882            for (_, expr) in &mut merge.on_create {
883                substitute_in_expression(expr, params)?;
884            }
885            for (_, expr) in &mut merge.on_match {
886                substitute_in_expression(expr, params)?;
887            }
888            substitute_in_operator(&mut merge.input, params)?;
889        }
890        LogicalOperator::MergeRelationship(merge_rel) => {
891            for (_, expr) in &mut merge_rel.match_properties {
892                substitute_in_expression(expr, params)?;
893            }
894            for (_, expr) in &mut merge_rel.on_create {
895                substitute_in_expression(expr, params)?;
896            }
897            for (_, expr) in &mut merge_rel.on_match {
898                substitute_in_expression(expr, params)?;
899            }
900            substitute_in_operator(&mut merge_rel.input, params)?;
901        }
902        LogicalOperator::AddLabel(add_label) => {
903            substitute_in_operator(&mut add_label.input, params)?;
904        }
905        LogicalOperator::RemoveLabel(remove_label) => {
906            substitute_in_operator(&mut remove_label.input, params)?;
907        }
908        LogicalOperator::ShortestPath(sp) => {
909            substitute_in_operator(&mut sp.input, params)?;
910        }
911        // SPARQL Update operators
912        LogicalOperator::InsertTriple(insert) => {
913            if let Some(ref mut input) = insert.input {
914                substitute_in_operator(input, params)?;
915            }
916        }
917        LogicalOperator::DeleteTriple(delete) => {
918            if let Some(ref mut input) = delete.input {
919                substitute_in_operator(input, params)?;
920            }
921        }
922        LogicalOperator::Modify(modify) => {
923            substitute_in_operator(&mut modify.where_clause, params)?;
924        }
925        LogicalOperator::ClearGraph(_)
926        | LogicalOperator::CreateGraph(_)
927        | LogicalOperator::DropGraph(_)
928        | LogicalOperator::LoadGraph(_)
929        | LogicalOperator::CopyGraph(_)
930        | LogicalOperator::MoveGraph(_)
931        | LogicalOperator::AddGraph(_) => {}
932        LogicalOperator::HorizontalAggregate(op) => {
933            substitute_in_operator(&mut op.input, params)?;
934        }
935        LogicalOperator::Empty => {}
936        LogicalOperator::VectorScan(scan) => {
937            substitute_in_expression(&mut scan.query_vector, params)?;
938            if let Some(ref mut input) = scan.input {
939                substitute_in_operator(input, params)?;
940            }
941        }
942        LogicalOperator::VectorJoin(join) => {
943            substitute_in_expression(&mut join.query_vector, params)?;
944            substitute_in_operator(&mut join.input, params)?;
945        }
946        LogicalOperator::Except(except) => {
947            substitute_in_operator(&mut except.left, params)?;
948            substitute_in_operator(&mut except.right, params)?;
949        }
950        LogicalOperator::Intersect(intersect) => {
951            substitute_in_operator(&mut intersect.left, params)?;
952            substitute_in_operator(&mut intersect.right, params)?;
953        }
954        LogicalOperator::Otherwise(otherwise) => {
955            substitute_in_operator(&mut otherwise.left, params)?;
956            substitute_in_operator(&mut otherwise.right, params)?;
957        }
958        LogicalOperator::Apply(apply) => {
959            substitute_in_operator(&mut apply.input, params)?;
960            substitute_in_operator(&mut apply.subplan, params)?;
961        }
962        // ParameterScan has no expressions to substitute
963        LogicalOperator::ParameterScan(_) => {}
964        LogicalOperator::MultiWayJoin(mwj) => {
965            for input in &mut mwj.inputs {
966                substitute_in_operator(input, params)?;
967            }
968            for cond in &mut mwj.conditions {
969                substitute_in_expression(&mut cond.left, params)?;
970                substitute_in_expression(&mut cond.right, params)?;
971            }
972        }
973        // DDL operators have no expressions to substitute
974        LogicalOperator::CreatePropertyGraph(_) => {}
975        // Procedure calls: arguments could contain parameters but we handle at execution time
976        LogicalOperator::CallProcedure(_) => {}
977        // LoadData: file path is a literal, no parameter substitution needed
978        LogicalOperator::LoadData(_) => {}
979        // Construct: template uses variables, substitute in the WHERE input
980        LogicalOperator::Construct(construct) => {
981            substitute_in_operator(&mut construct.input, params)?;
982        }
983    }
984    Ok(())
985}
986
987/// Resolves a `CountExpr::Parameter` by looking up the parameter value.
988fn resolve_count_param(
989    count: &mut crate::query::plan::CountExpr,
990    params: &QueryParams,
991) -> Result<()> {
992    use crate::query::plan::CountExpr;
993    use grafeo_common::utils::error::{QueryError, QueryErrorKind};
994
995    if let CountExpr::Parameter(name) = count {
996        let value = params.get(name.as_str()).ok_or_else(|| {
997            Error::Query(QueryError::new(
998                QueryErrorKind::Semantic,
999                format!("Missing parameter for SKIP/LIMIT: ${name}"),
1000            ))
1001        })?;
1002        let n = match value {
1003            Value::Int64(i) if *i >= 0 => *i as usize,
1004            Value::Int64(i) => {
1005                return Err(Error::Query(QueryError::new(
1006                    QueryErrorKind::Semantic,
1007                    format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
1008                )));
1009            }
1010            other => {
1011                return Err(Error::Query(QueryError::new(
1012                    QueryErrorKind::Semantic,
1013                    format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
1014                )));
1015            }
1016        };
1017        *count = CountExpr::Literal(n);
1018    }
1019    Ok(())
1020}
1021
1022/// Substitutes parameters in an expression with their values.
1023fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
1024    use crate::query::plan::LogicalExpression;
1025
1026    match expr {
1027        LogicalExpression::Parameter(name) => {
1028            if let Some(value) = params.get(name) {
1029                *expr = LogicalExpression::Literal(value.clone());
1030            } else {
1031                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
1032            }
1033        }
1034        LogicalExpression::Binary { left, right, .. } => {
1035            substitute_in_expression(left, params)?;
1036            substitute_in_expression(right, params)?;
1037        }
1038        LogicalExpression::Unary { operand, .. } => {
1039            substitute_in_expression(operand, params)?;
1040        }
1041        LogicalExpression::FunctionCall { args, .. } => {
1042            for arg in args {
1043                substitute_in_expression(arg, params)?;
1044            }
1045        }
1046        LogicalExpression::List(items) => {
1047            for item in items {
1048                substitute_in_expression(item, params)?;
1049            }
1050        }
1051        LogicalExpression::Map(pairs) => {
1052            for (_, value) in pairs {
1053                substitute_in_expression(value, params)?;
1054            }
1055        }
1056        LogicalExpression::IndexAccess { base, index } => {
1057            substitute_in_expression(base, params)?;
1058            substitute_in_expression(index, params)?;
1059        }
1060        LogicalExpression::SliceAccess { base, start, end } => {
1061            substitute_in_expression(base, params)?;
1062            if let Some(s) = start {
1063                substitute_in_expression(s, params)?;
1064            }
1065            if let Some(e) = end {
1066                substitute_in_expression(e, params)?;
1067            }
1068        }
1069        LogicalExpression::Case {
1070            operand,
1071            when_clauses,
1072            else_clause,
1073        } => {
1074            if let Some(op) = operand {
1075                substitute_in_expression(op, params)?;
1076            }
1077            for (cond, result) in when_clauses {
1078                substitute_in_expression(cond, params)?;
1079                substitute_in_expression(result, params)?;
1080            }
1081            if let Some(el) = else_clause {
1082                substitute_in_expression(el, params)?;
1083            }
1084        }
1085        LogicalExpression::Property { .. }
1086        | LogicalExpression::Variable(_)
1087        | LogicalExpression::Literal(_)
1088        | LogicalExpression::Labels(_)
1089        | LogicalExpression::Type(_)
1090        | LogicalExpression::Id(_) => {}
1091        LogicalExpression::ListComprehension {
1092            list_expr,
1093            filter_expr,
1094            map_expr,
1095            ..
1096        } => {
1097            substitute_in_expression(list_expr, params)?;
1098            if let Some(filter) = filter_expr {
1099                substitute_in_expression(filter, params)?;
1100            }
1101            substitute_in_expression(map_expr, params)?;
1102        }
1103        LogicalExpression::ListPredicate {
1104            list_expr,
1105            predicate,
1106            ..
1107        } => {
1108            substitute_in_expression(list_expr, params)?;
1109            substitute_in_expression(predicate, params)?;
1110        }
1111        LogicalExpression::ExistsSubquery(_)
1112        | LogicalExpression::CountSubquery(_)
1113        | LogicalExpression::ValueSubquery(_) => {
1114            // Subqueries would need recursive parameter substitution
1115        }
1116        LogicalExpression::PatternComprehension { projection, .. } => {
1117            substitute_in_expression(projection, params)?;
1118        }
1119        LogicalExpression::MapProjection { entries, .. } => {
1120            for entry in entries {
1121                if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1122                    substitute_in_expression(expr, params)?;
1123                }
1124            }
1125        }
1126        LogicalExpression::Reduce {
1127            initial,
1128            list,
1129            expression,
1130            ..
1131        } => {
1132            substitute_in_expression(initial, params)?;
1133            substitute_in_expression(list, params)?;
1134            substitute_in_expression(expression, params)?;
1135        }
1136    }
1137    Ok(())
1138}
1139
1140#[cfg(test)]
1141mod tests {
1142    use super::*;
1143
1144    #[test]
1145    fn test_query_language_is_lpg() {
1146        #[cfg(feature = "gql")]
1147        assert!(QueryLanguage::Gql.is_lpg());
1148        #[cfg(feature = "cypher")]
1149        assert!(QueryLanguage::Cypher.is_lpg());
1150        #[cfg(feature = "sparql")]
1151        assert!(!QueryLanguage::Sparql.is_lpg());
1152    }
1153
1154    #[test]
1155    fn test_processor_creation() {
1156        let store = Arc::new(LpgStore::new().unwrap());
1157        let processor = QueryProcessor::for_lpg(store);
1158        assert!(processor.lpg_store().node_count() == 0);
1159    }
1160
1161    #[cfg(feature = "gql")]
1162    #[test]
1163    fn test_process_simple_gql() {
1164        let store = Arc::new(LpgStore::new().unwrap());
1165        store.create_node(&["Person"]);
1166        store.create_node(&["Person"]);
1167
1168        let processor = QueryProcessor::for_lpg(store);
1169        let result = processor
1170            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1171            .unwrap();
1172
1173        assert_eq!(result.row_count(), 2);
1174        assert_eq!(result.columns[0], "n");
1175    }
1176
1177    #[cfg(feature = "cypher")]
1178    #[test]
1179    fn test_process_simple_cypher() {
1180        let store = Arc::new(LpgStore::new().unwrap());
1181        store.create_node(&["Person"]);
1182
1183        let processor = QueryProcessor::for_lpg(store);
1184        let result = processor
1185            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1186            .unwrap();
1187
1188        assert_eq!(result.row_count(), 1);
1189    }
1190
1191    #[cfg(feature = "gql")]
1192    #[test]
1193    fn test_process_with_params() {
1194        let store = Arc::new(LpgStore::new().unwrap());
1195        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1196        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1197        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1198
1199        let processor = QueryProcessor::for_lpg(store);
1200
1201        // Query with parameter
1202        let mut params = HashMap::new();
1203        params.insert("min_age".to_string(), Value::Int64(30));
1204
1205        let result = processor
1206            .process(
1207                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1208                QueryLanguage::Gql,
1209                Some(&params),
1210            )
1211            .unwrap();
1212
1213        // Should return 2 people (ages 35 and 45)
1214        assert_eq!(result.row_count(), 2);
1215    }
1216
1217    #[cfg(feature = "gql")]
1218    #[test]
1219    fn test_missing_param_error() {
1220        let store = Arc::new(LpgStore::new().unwrap());
1221        store.create_node(&["Person"]);
1222
1223        let processor = QueryProcessor::for_lpg(store);
1224
1225        // Query with parameter but empty params map (missing the required param)
1226        let params: HashMap<String, Value> = HashMap::new();
1227        let result = processor.process(
1228            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1229            QueryLanguage::Gql,
1230            Some(&params),
1231        );
1232
1233        // Should fail with missing parameter error
1234        assert!(result.is_err());
1235        let err = result.unwrap_err();
1236        assert!(
1237            err.to_string().contains("Missing parameter"),
1238            "Expected 'Missing parameter' error, got: {}",
1239            err
1240        );
1241    }
1242
1243    #[cfg(feature = "gql")]
1244    #[test]
1245    fn test_params_in_filter_with_property() {
1246        // Tests parameter substitution in WHERE clause with property comparison
1247        let store = Arc::new(LpgStore::new().unwrap());
1248        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1249        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1250
1251        let processor = QueryProcessor::for_lpg(store);
1252
1253        let mut params = HashMap::new();
1254        params.insert("threshold".to_string(), Value::Int64(15));
1255
1256        let result = processor
1257            .process(
1258                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1259                QueryLanguage::Gql,
1260                Some(&params),
1261            )
1262            .unwrap();
1263
1264        // Only value=20 matches > 15
1265        assert_eq!(result.row_count(), 1);
1266        let row = &result.rows[0];
1267        assert_eq!(row[0], Value::Int64(20));
1268    }
1269
1270    #[cfg(feature = "gql")]
1271    #[test]
1272    fn test_params_in_multiple_where_conditions() {
1273        // Tests multiple parameters in WHERE clause with AND
1274        let store = Arc::new(LpgStore::new().unwrap());
1275        store.create_node_with_props(
1276            &["Person"],
1277            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1278        );
1279        store.create_node_with_props(
1280            &["Person"],
1281            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1282        );
1283        store.create_node_with_props(
1284            &["Person"],
1285            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1286        );
1287
1288        let processor = QueryProcessor::for_lpg(store);
1289
1290        let mut params = HashMap::new();
1291        params.insert("min_age".to_string(), Value::Int64(30));
1292        params.insert("min_score".to_string(), Value::Int64(75));
1293
1294        let result = processor
1295            .process(
1296                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1297                QueryLanguage::Gql,
1298                Some(&params),
1299            )
1300            .unwrap();
1301
1302        // Only the person with age=35, score=90 matches both conditions
1303        assert_eq!(result.row_count(), 1);
1304    }
1305
1306    #[cfg(feature = "gql")]
1307    #[test]
1308    fn test_params_with_in_list() {
1309        // Tests parameter as a value checked against IN list
1310        let store = Arc::new(LpgStore::new().unwrap());
1311        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1312        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1313        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1314
1315        let processor = QueryProcessor::for_lpg(store);
1316
1317        // Check if a parameter value matches any of the statuses
1318        let mut params = HashMap::new();
1319        params.insert("target".to_string(), Value::String("active".into()));
1320
1321        let result = processor
1322            .process(
1323                "MATCH (n:Item) WHERE n.status = $target RETURN n",
1324                QueryLanguage::Gql,
1325                Some(&params),
1326            )
1327            .unwrap();
1328
1329        assert_eq!(result.row_count(), 1);
1330    }
1331
1332    #[cfg(feature = "gql")]
1333    #[test]
1334    fn test_params_same_type_comparison() {
1335        // Tests that same-type parameter comparisons work correctly
1336        let store = Arc::new(LpgStore::new().unwrap());
1337        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1338        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1339
1340        let processor = QueryProcessor::for_lpg(store);
1341
1342        // Compare int property with int parameter
1343        let mut params = HashMap::new();
1344        params.insert("threshold".to_string(), Value::Int64(75));
1345
1346        let result = processor
1347            .process(
1348                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1349                QueryLanguage::Gql,
1350                Some(&params),
1351            )
1352            .unwrap();
1353
1354        // Only value=100 matches > 75
1355        assert_eq!(result.row_count(), 1);
1356    }
1357
1358    #[cfg(feature = "gql")]
1359    #[test]
1360    fn test_process_empty_result_has_columns() {
1361        // Tests that empty results still have correct column names
1362        let store = Arc::new(LpgStore::new().unwrap());
1363        // Don't create any nodes
1364
1365        let processor = QueryProcessor::for_lpg(store);
1366        let result = processor
1367            .process(
1368                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1369                QueryLanguage::Gql,
1370                None,
1371            )
1372            .unwrap();
1373
1374        assert_eq!(result.row_count(), 0);
1375        assert_eq!(result.columns.len(), 2);
1376        assert_eq!(result.columns[0], "name");
1377        assert_eq!(result.columns[1], "age");
1378    }
1379
1380    #[cfg(feature = "gql")]
1381    #[test]
1382    fn test_params_string_equality() {
1383        // Tests string parameter equality comparison
1384        let store = Arc::new(LpgStore::new().unwrap());
1385        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1386        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1387        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1388
1389        let processor = QueryProcessor::for_lpg(store);
1390
1391        let mut params = HashMap::new();
1392        params.insert("target".to_string(), Value::String("beta".into()));
1393
1394        let result = processor
1395            .process(
1396                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1397                QueryLanguage::Gql,
1398                Some(&params),
1399            )
1400            .unwrap();
1401
1402        assert_eq!(result.row_count(), 1);
1403        assert_eq!(result.rows[0][0], Value::String("beta".into()));
1404    }
1405}