Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
41    #[cfg(feature = "sql-pgq")]
42    SqlPgq,
43    /// SPARQL 1.1 for RDF
44    #[cfg(feature = "sparql")]
45    Sparql,
46    /// GraphQL for RDF
47    #[cfg(all(feature = "graphql", feature = "rdf"))]
48    GraphQLRdf,
49}
50
51impl QueryLanguage {
52    /// Returns whether this language targets LPG (vs RDF).
53    #[must_use]
54    pub const fn is_lpg(&self) -> bool {
55        match self {
56            #[cfg(feature = "gql")]
57            Self::Gql => true,
58            #[cfg(feature = "cypher")]
59            Self::Cypher => true,
60            #[cfg(feature = "gremlin")]
61            Self::Gremlin => true,
62            #[cfg(feature = "graphql")]
63            Self::GraphQL => true,
64            #[cfg(feature = "sql-pgq")]
65            Self::SqlPgq => true,
66            #[cfg(feature = "sparql")]
67            Self::Sparql => false,
68            #[cfg(all(feature = "graphql", feature = "rdf"))]
69            Self::GraphQLRdf => false,
70        }
71    }
72}
73
74/// Query parameters for prepared statements.
75pub type QueryParams = HashMap<String, Value>;
76
77/// Processes queries through the full pipeline.
78///
79/// The processor holds references to the stores and provides a unified
80/// interface for executing queries in any supported language.
81///
82/// # Example
83///
84/// ```no_run
85/// # use std::sync::Arc;
86/// # use grafeo_core::graph::lpg::LpgStore;
87/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
88///
89/// # fn main() -> grafeo_common::utils::error::Result<()> {
90/// let store = Arc::new(LpgStore::new());
91/// let processor = QueryProcessor::for_lpg(store);
92/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
93/// # Ok(())
94/// # }
95/// ```
96pub struct QueryProcessor {
97    /// LPG store for property graph queries.
98    lpg_store: Arc<LpgStore>,
99    /// Transaction manager for MVCC operations.
100    tx_manager: Arc<TransactionManager>,
101    /// Catalog for schema and index metadata.
102    catalog: Arc<Catalog>,
103    /// Query optimizer.
104    optimizer: Optimizer,
105    /// Current transaction context (if any).
106    tx_context: Option<(EpochId, TxId)>,
107    /// RDF store for triple pattern queries (optional).
108    #[cfg(feature = "rdf")]
109    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
110}
111
112impl QueryProcessor {
113    /// Creates a new query processor for LPG queries.
114    #[must_use]
115    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
116        let optimizer = Optimizer::from_store(&store);
117        Self {
118            lpg_store: store,
119            tx_manager: Arc::new(TransactionManager::new()),
120            catalog: Arc::new(Catalog::new()),
121            optimizer,
122            tx_context: None,
123            #[cfg(feature = "rdf")]
124            rdf_store: None,
125        }
126    }
127
128    /// Creates a new query processor with a transaction manager.
129    #[must_use]
130    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
131        let optimizer = Optimizer::from_store(&store);
132        Self {
133            lpg_store: store,
134            tx_manager,
135            catalog: Arc::new(Catalog::new()),
136            optimizer,
137            tx_context: None,
138            #[cfg(feature = "rdf")]
139            rdf_store: None,
140        }
141    }
142
143    /// Creates a new query processor with both LPG and RDF stores.
144    #[cfg(feature = "rdf")]
145    #[must_use]
146    pub fn with_rdf(
147        lpg_store: Arc<LpgStore>,
148        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
149    ) -> Self {
150        let optimizer = Optimizer::from_store(&lpg_store);
151        Self {
152            lpg_store,
153            tx_manager: Arc::new(TransactionManager::new()),
154            catalog: Arc::new(Catalog::new()),
155            optimizer,
156            tx_context: None,
157            rdf_store: Some(rdf_store),
158        }
159    }
160
161    /// Sets the transaction context for MVCC visibility.
162    ///
163    /// This should be called when the processor is used within a transaction.
164    #[must_use]
165    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
166        self.tx_context = Some((viewing_epoch, tx_id));
167        self
168    }
169
170    /// Sets a custom catalog.
171    #[must_use]
172    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
173        self.catalog = catalog;
174        self
175    }
176
177    /// Sets a custom optimizer.
178    #[must_use]
179    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
180        self.optimizer = optimizer;
181        self
182    }
183
184    /// Processes a query string and returns results.
185    ///
186    /// Pipeline:
187    /// 1. Parse (language-specific parser → AST)
188    /// 2. Translate (AST → LogicalPlan)
189    /// 3. Bind (semantic validation)
190    /// 4. Optimize (filter pushdown, join reorder, etc.)
191    /// 5. Plan (logical → physical operators)
192    /// 6. Execute (run operators, collect results)
193    ///
194    /// # Arguments
195    ///
196    /// * `query` - The query string
197    /// * `language` - Which query language to use
198    /// * `params` - Optional query parameters for prepared statements
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if any stage of the pipeline fails.
203    pub fn process(
204        &self,
205        query: &str,
206        language: QueryLanguage,
207        params: Option<&QueryParams>,
208    ) -> Result<QueryResult> {
209        if language.is_lpg() {
210            self.process_lpg(query, language, params)
211        } else {
212            #[cfg(feature = "rdf")]
213            {
214                self.process_rdf(query, language, params)
215            }
216            #[cfg(not(feature = "rdf"))]
217            {
218                Err(Error::Internal(
219                    "RDF support not enabled. Compile with --features rdf".to_string(),
220                ))
221            }
222        }
223    }
224
225    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
226    fn process_lpg(
227        &self,
228        query: &str,
229        language: QueryLanguage,
230        params: Option<&QueryParams>,
231    ) -> Result<QueryResult> {
232        let start_time = std::time::Instant::now();
233
234        // 1. Parse and translate to logical plan
235        let mut logical_plan = self.translate_lpg(query, language)?;
236
237        // 2. Substitute parameters if provided
238        if let Some(params) = params {
239            substitute_params(&mut logical_plan, params)?;
240        }
241
242        // 3. Semantic validation
243        let mut binder = Binder::new();
244        let _binding_context = binder.bind(&logical_plan)?;
245
246        // 4. Optimize the plan
247        let optimized_plan = self.optimizer.optimize(logical_plan)?;
248
249        // 5. Convert to physical plan with transaction context
250        let planner = if let Some((epoch, tx_id)) = self.tx_context {
251            Planner::with_context(
252                Arc::clone(&self.lpg_store),
253                Arc::clone(&self.tx_manager),
254                Some(tx_id),
255                epoch,
256            )
257        } else {
258            Planner::with_context(
259                Arc::clone(&self.lpg_store),
260                Arc::clone(&self.tx_manager),
261                None,
262                self.tx_manager.current_epoch(),
263            )
264        };
265        let mut physical_plan = planner.plan(&optimized_plan)?;
266
267        // 6. Execute and collect results
268        let executor = Executor::with_columns(physical_plan.columns.clone());
269        let mut result = executor.execute(physical_plan.operator.as_mut())?;
270
271        // Add execution metrics
272        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
273        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
274        result.execution_time_ms = Some(elapsed_ms);
275        result.rows_scanned = Some(rows_scanned);
276
277        Ok(result)
278    }
279
280    /// Translates an LPG query to a logical plan.
281    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
282        match language {
283            #[cfg(feature = "gql")]
284            QueryLanguage::Gql => {
285                use crate::query::gql_translator;
286                gql_translator::translate(query)
287            }
288            #[cfg(feature = "cypher")]
289            QueryLanguage::Cypher => {
290                use crate::query::cypher_translator;
291                cypher_translator::translate(query)
292            }
293            #[cfg(feature = "gremlin")]
294            QueryLanguage::Gremlin => {
295                use crate::query::gremlin_translator;
296                gremlin_translator::translate(query)
297            }
298            #[cfg(feature = "graphql")]
299            QueryLanguage::GraphQL => {
300                use crate::query::graphql_translator;
301                graphql_translator::translate(query)
302            }
303            #[cfg(feature = "sql-pgq")]
304            QueryLanguage::SqlPgq => {
305                use crate::query::sql_pgq_translator;
306                sql_pgq_translator::translate(query)
307            }
308            #[allow(unreachable_patterns)]
309            _ => Err(Error::Internal(format!(
310                "Language {:?} is not an LPG language",
311                language
312            ))),
313        }
314    }
315
316    /// Processes an RDF query (SPARQL, GraphQL-RDF).
317    #[cfg(feature = "rdf")]
318    fn process_rdf(
319        &self,
320        query: &str,
321        language: QueryLanguage,
322        _params: Option<&QueryParams>,
323    ) -> Result<QueryResult> {
324        use crate::query::planner_rdf::RdfPlanner;
325
326        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
327            Error::Internal("RDF store not configured for this processor".to_string())
328        })?;
329
330        // 1. Parse and translate to logical plan
331        let logical_plan = self.translate_rdf(query, language)?;
332
333        // 2. Semantic validation
334        let mut binder = Binder::new();
335        let _binding_context = binder.bind(&logical_plan)?;
336
337        // 3. Optimize the plan
338        let optimized_plan = self.optimizer.optimize(logical_plan)?;
339
340        // 4. Convert to physical plan (using RDF planner)
341        let planner = RdfPlanner::new(Arc::clone(rdf_store));
342        let mut physical_plan = planner.plan(&optimized_plan)?;
343
344        // 5. Execute and collect results
345        let executor = Executor::with_columns(physical_plan.columns.clone());
346        executor.execute(physical_plan.operator.as_mut())
347    }
348
349    /// Translates an RDF query to a logical plan.
350    #[cfg(feature = "rdf")]
351    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
352        match language {
353            #[cfg(feature = "sparql")]
354            QueryLanguage::Sparql => {
355                use crate::query::sparql_translator;
356                sparql_translator::translate(query)
357            }
358            #[cfg(all(feature = "graphql", feature = "rdf"))]
359            QueryLanguage::GraphQLRdf => {
360                use crate::query::graphql_rdf_translator;
361                // Default namespace for GraphQL-RDF queries
362                graphql_rdf_translator::translate(query, "http://example.org/")
363            }
364            _ => Err(Error::Internal(format!(
365                "Language {:?} is not an RDF language",
366                language
367            ))),
368        }
369    }
370
371    /// Returns a reference to the LPG store.
372    #[must_use]
373    pub fn lpg_store(&self) -> &Arc<LpgStore> {
374        &self.lpg_store
375    }
376
377    /// Returns a reference to the catalog.
378    #[must_use]
379    pub fn catalog(&self) -> &Arc<Catalog> {
380        &self.catalog
381    }
382
383    /// Returns a reference to the optimizer.
384    #[must_use]
385    pub fn optimizer(&self) -> &Optimizer {
386        &self.optimizer
387    }
388
389    /// Returns a reference to the RDF store (if configured).
390    #[cfg(feature = "rdf")]
391    #[must_use]
392    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
393        self.rdf_store.as_ref()
394    }
395}
396
397impl QueryProcessor {
398    /// Returns a reference to the transaction manager.
399    #[must_use]
400    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
401        &self.tx_manager
402    }
403}
404
405/// Substitutes parameters in a logical plan with their values.
406fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
407    substitute_in_operator(&mut plan.root, params)
408}
409
410/// Recursively substitutes parameters in an operator.
411fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
412    use crate::query::plan::*;
413
414    match op {
415        LogicalOperator::Filter(filter) => {
416            substitute_in_expression(&mut filter.predicate, params)?;
417            substitute_in_operator(&mut filter.input, params)?;
418        }
419        LogicalOperator::Return(ret) => {
420            for item in &mut ret.items {
421                substitute_in_expression(&mut item.expression, params)?;
422            }
423            substitute_in_operator(&mut ret.input, params)?;
424        }
425        LogicalOperator::Project(proj) => {
426            for p in &mut proj.projections {
427                substitute_in_expression(&mut p.expression, params)?;
428            }
429            substitute_in_operator(&mut proj.input, params)?;
430        }
431        LogicalOperator::NodeScan(scan) => {
432            if let Some(input) = &mut scan.input {
433                substitute_in_operator(input, params)?;
434            }
435        }
436        LogicalOperator::EdgeScan(scan) => {
437            if let Some(input) = &mut scan.input {
438                substitute_in_operator(input, params)?;
439            }
440        }
441        LogicalOperator::Expand(expand) => {
442            substitute_in_operator(&mut expand.input, params)?;
443        }
444        LogicalOperator::Join(join) => {
445            substitute_in_operator(&mut join.left, params)?;
446            substitute_in_operator(&mut join.right, params)?;
447            for cond in &mut join.conditions {
448                substitute_in_expression(&mut cond.left, params)?;
449                substitute_in_expression(&mut cond.right, params)?;
450            }
451        }
452        LogicalOperator::LeftJoin(join) => {
453            substitute_in_operator(&mut join.left, params)?;
454            substitute_in_operator(&mut join.right, params)?;
455            if let Some(cond) = &mut join.condition {
456                substitute_in_expression(cond, params)?;
457            }
458        }
459        LogicalOperator::Aggregate(agg) => {
460            for expr in &mut agg.group_by {
461                substitute_in_expression(expr, params)?;
462            }
463            for agg_expr in &mut agg.aggregates {
464                if let Some(expr) = &mut agg_expr.expression {
465                    substitute_in_expression(expr, params)?;
466                }
467            }
468            substitute_in_operator(&mut agg.input, params)?;
469        }
470        LogicalOperator::Sort(sort) => {
471            for key in &mut sort.keys {
472                substitute_in_expression(&mut key.expression, params)?;
473            }
474            substitute_in_operator(&mut sort.input, params)?;
475        }
476        LogicalOperator::Limit(limit) => {
477            substitute_in_operator(&mut limit.input, params)?;
478        }
479        LogicalOperator::Skip(skip) => {
480            substitute_in_operator(&mut skip.input, params)?;
481        }
482        LogicalOperator::Distinct(distinct) => {
483            substitute_in_operator(&mut distinct.input, params)?;
484        }
485        LogicalOperator::CreateNode(create) => {
486            for (_, expr) in &mut create.properties {
487                substitute_in_expression(expr, params)?;
488            }
489            if let Some(input) = &mut create.input {
490                substitute_in_operator(input, params)?;
491            }
492        }
493        LogicalOperator::CreateEdge(create) => {
494            for (_, expr) in &mut create.properties {
495                substitute_in_expression(expr, params)?;
496            }
497            substitute_in_operator(&mut create.input, params)?;
498        }
499        LogicalOperator::DeleteNode(delete) => {
500            substitute_in_operator(&mut delete.input, params)?;
501        }
502        LogicalOperator::DeleteEdge(delete) => {
503            substitute_in_operator(&mut delete.input, params)?;
504        }
505        LogicalOperator::SetProperty(set) => {
506            for (_, expr) in &mut set.properties {
507                substitute_in_expression(expr, params)?;
508            }
509            substitute_in_operator(&mut set.input, params)?;
510        }
511        LogicalOperator::Union(union) => {
512            for input in &mut union.inputs {
513                substitute_in_operator(input, params)?;
514            }
515        }
516        LogicalOperator::AntiJoin(anti) => {
517            substitute_in_operator(&mut anti.left, params)?;
518            substitute_in_operator(&mut anti.right, params)?;
519        }
520        LogicalOperator::Bind(bind) => {
521            substitute_in_expression(&mut bind.expression, params)?;
522            substitute_in_operator(&mut bind.input, params)?;
523        }
524        LogicalOperator::TripleScan(scan) => {
525            if let Some(input) = &mut scan.input {
526                substitute_in_operator(input, params)?;
527            }
528        }
529        LogicalOperator::Unwind(unwind) => {
530            substitute_in_expression(&mut unwind.expression, params)?;
531            substitute_in_operator(&mut unwind.input, params)?;
532        }
533        LogicalOperator::Merge(merge) => {
534            for (_, expr) in &mut merge.match_properties {
535                substitute_in_expression(expr, params)?;
536            }
537            for (_, expr) in &mut merge.on_create {
538                substitute_in_expression(expr, params)?;
539            }
540            for (_, expr) in &mut merge.on_match {
541                substitute_in_expression(expr, params)?;
542            }
543            substitute_in_operator(&mut merge.input, params)?;
544        }
545        LogicalOperator::AddLabel(add_label) => {
546            substitute_in_operator(&mut add_label.input, params)?;
547        }
548        LogicalOperator::RemoveLabel(remove_label) => {
549            substitute_in_operator(&mut remove_label.input, params)?;
550        }
551        LogicalOperator::ShortestPath(sp) => {
552            substitute_in_operator(&mut sp.input, params)?;
553        }
554        // SPARQL Update operators
555        LogicalOperator::InsertTriple(insert) => {
556            if let Some(ref mut input) = insert.input {
557                substitute_in_operator(input, params)?;
558            }
559        }
560        LogicalOperator::DeleteTriple(delete) => {
561            if let Some(ref mut input) = delete.input {
562                substitute_in_operator(input, params)?;
563            }
564        }
565        LogicalOperator::Modify(modify) => {
566            substitute_in_operator(&mut modify.where_clause, params)?;
567        }
568        LogicalOperator::ClearGraph(_)
569        | LogicalOperator::CreateGraph(_)
570        | LogicalOperator::DropGraph(_)
571        | LogicalOperator::LoadGraph(_)
572        | LogicalOperator::CopyGraph(_)
573        | LogicalOperator::MoveGraph(_)
574        | LogicalOperator::AddGraph(_) => {}
575        LogicalOperator::Empty => {}
576        LogicalOperator::VectorScan(scan) => {
577            substitute_in_expression(&mut scan.query_vector, params)?;
578            if let Some(ref mut input) = scan.input {
579                substitute_in_operator(input, params)?;
580            }
581        }
582        LogicalOperator::VectorJoin(join) => {
583            substitute_in_expression(&mut join.query_vector, params)?;
584            substitute_in_operator(&mut join.input, params)?;
585        }
586        // DDL operators have no expressions to substitute
587        LogicalOperator::CreatePropertyGraph(_) => {}
588        // Procedure calls: arguments could contain parameters but we handle at execution time
589        LogicalOperator::CallProcedure(_) => {}
590    }
591    Ok(())
592}
593
594/// Substitutes parameters in an expression with their values.
595fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
596    use crate::query::plan::LogicalExpression;
597
598    match expr {
599        LogicalExpression::Parameter(name) => {
600            if let Some(value) = params.get(name) {
601                *expr = LogicalExpression::Literal(value.clone());
602            } else {
603                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
604            }
605        }
606        LogicalExpression::Binary { left, right, .. } => {
607            substitute_in_expression(left, params)?;
608            substitute_in_expression(right, params)?;
609        }
610        LogicalExpression::Unary { operand, .. } => {
611            substitute_in_expression(operand, params)?;
612        }
613        LogicalExpression::FunctionCall { args, .. } => {
614            for arg in args {
615                substitute_in_expression(arg, params)?;
616            }
617        }
618        LogicalExpression::List(items) => {
619            for item in items {
620                substitute_in_expression(item, params)?;
621            }
622        }
623        LogicalExpression::Map(pairs) => {
624            for (_, value) in pairs {
625                substitute_in_expression(value, params)?;
626            }
627        }
628        LogicalExpression::IndexAccess { base, index } => {
629            substitute_in_expression(base, params)?;
630            substitute_in_expression(index, params)?;
631        }
632        LogicalExpression::SliceAccess { base, start, end } => {
633            substitute_in_expression(base, params)?;
634            if let Some(s) = start {
635                substitute_in_expression(s, params)?;
636            }
637            if let Some(e) = end {
638                substitute_in_expression(e, params)?;
639            }
640        }
641        LogicalExpression::Case {
642            operand,
643            when_clauses,
644            else_clause,
645        } => {
646            if let Some(op) = operand {
647                substitute_in_expression(op, params)?;
648            }
649            for (cond, result) in when_clauses {
650                substitute_in_expression(cond, params)?;
651                substitute_in_expression(result, params)?;
652            }
653            if let Some(el) = else_clause {
654                substitute_in_expression(el, params)?;
655            }
656        }
657        LogicalExpression::Property { .. }
658        | LogicalExpression::Variable(_)
659        | LogicalExpression::Literal(_)
660        | LogicalExpression::Labels(_)
661        | LogicalExpression::Type(_)
662        | LogicalExpression::Id(_) => {}
663        LogicalExpression::ListComprehension {
664            list_expr,
665            filter_expr,
666            map_expr,
667            ..
668        } => {
669            substitute_in_expression(list_expr, params)?;
670            if let Some(filter) = filter_expr {
671                substitute_in_expression(filter, params)?;
672            }
673            substitute_in_expression(map_expr, params)?;
674        }
675        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
676            // Subqueries would need recursive parameter substitution
677        }
678    }
679    Ok(())
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685
686    #[test]
687    fn test_query_language_is_lpg() {
688        #[cfg(feature = "gql")]
689        assert!(QueryLanguage::Gql.is_lpg());
690        #[cfg(feature = "cypher")]
691        assert!(QueryLanguage::Cypher.is_lpg());
692        #[cfg(feature = "sparql")]
693        assert!(!QueryLanguage::Sparql.is_lpg());
694    }
695
696    #[test]
697    fn test_processor_creation() {
698        let store = Arc::new(LpgStore::new());
699        let processor = QueryProcessor::for_lpg(store);
700        assert!(processor.lpg_store().node_count() == 0);
701    }
702
703    #[cfg(feature = "gql")]
704    #[test]
705    fn test_process_simple_gql() {
706        let store = Arc::new(LpgStore::new());
707        store.create_node(&["Person"]);
708        store.create_node(&["Person"]);
709
710        let processor = QueryProcessor::for_lpg(store);
711        let result = processor
712            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
713            .unwrap();
714
715        assert_eq!(result.row_count(), 2);
716        assert_eq!(result.columns[0], "n");
717    }
718
719    #[cfg(feature = "cypher")]
720    #[test]
721    fn test_process_simple_cypher() {
722        let store = Arc::new(LpgStore::new());
723        store.create_node(&["Person"]);
724
725        let processor = QueryProcessor::for_lpg(store);
726        let result = processor
727            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
728            .unwrap();
729
730        assert_eq!(result.row_count(), 1);
731    }
732
733    #[cfg(feature = "gql")]
734    #[test]
735    fn test_process_with_params() {
736        let store = Arc::new(LpgStore::new());
737        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
738        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
739        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
740
741        let processor = QueryProcessor::for_lpg(store);
742
743        // Query with parameter
744        let mut params = HashMap::new();
745        params.insert("min_age".to_string(), Value::Int64(30));
746
747        let result = processor
748            .process(
749                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
750                QueryLanguage::Gql,
751                Some(&params),
752            )
753            .unwrap();
754
755        // Should return 2 people (ages 35 and 45)
756        assert_eq!(result.row_count(), 2);
757    }
758
759    #[cfg(feature = "gql")]
760    #[test]
761    fn test_missing_param_error() {
762        let store = Arc::new(LpgStore::new());
763        store.create_node(&["Person"]);
764
765        let processor = QueryProcessor::for_lpg(store);
766
767        // Query with parameter but empty params map (missing the required param)
768        let params: HashMap<String, Value> = HashMap::new();
769        let result = processor.process(
770            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
771            QueryLanguage::Gql,
772            Some(&params),
773        );
774
775        // Should fail with missing parameter error
776        assert!(result.is_err());
777        let err = result.unwrap_err();
778        assert!(
779            err.to_string().contains("Missing parameter"),
780            "Expected 'Missing parameter' error, got: {}",
781            err
782        );
783    }
784
785    #[cfg(feature = "gql")]
786    #[test]
787    fn test_params_in_filter_with_property() {
788        // Tests parameter substitution in WHERE clause with property comparison
789        let store = Arc::new(LpgStore::new());
790        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
791        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
792
793        let processor = QueryProcessor::for_lpg(store);
794
795        let mut params = HashMap::new();
796        params.insert("threshold".to_string(), Value::Int64(15));
797
798        let result = processor
799            .process(
800                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
801                QueryLanguage::Gql,
802                Some(&params),
803            )
804            .unwrap();
805
806        // Only value=20 matches > 15
807        assert_eq!(result.row_count(), 1);
808        let row = &result.rows[0];
809        assert_eq!(row[0], Value::Int64(20));
810    }
811
812    #[cfg(feature = "gql")]
813    #[test]
814    fn test_params_in_multiple_where_conditions() {
815        // Tests multiple parameters in WHERE clause with AND
816        let store = Arc::new(LpgStore::new());
817        store.create_node_with_props(
818            &["Person"],
819            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
820        );
821        store.create_node_with_props(
822            &["Person"],
823            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
824        );
825        store.create_node_with_props(
826            &["Person"],
827            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
828        );
829
830        let processor = QueryProcessor::for_lpg(store);
831
832        let mut params = HashMap::new();
833        params.insert("min_age".to_string(), Value::Int64(30));
834        params.insert("min_score".to_string(), Value::Int64(75));
835
836        let result = processor
837            .process(
838                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
839                QueryLanguage::Gql,
840                Some(&params),
841            )
842            .unwrap();
843
844        // Only the person with age=35, score=90 matches both conditions
845        assert_eq!(result.row_count(), 1);
846    }
847
848    #[cfg(feature = "gql")]
849    #[test]
850    fn test_params_with_in_list() {
851        // Tests parameter as a value checked against IN list
852        let store = Arc::new(LpgStore::new());
853        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
854        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
855        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
856
857        let processor = QueryProcessor::for_lpg(store);
858
859        // Check if a parameter value matches any of the statuses
860        let mut params = HashMap::new();
861        params.insert("target".to_string(), Value::String("active".into()));
862
863        let result = processor
864            .process(
865                "MATCH (n:Item) WHERE n.status = $target RETURN n",
866                QueryLanguage::Gql,
867                Some(&params),
868            )
869            .unwrap();
870
871        assert_eq!(result.row_count(), 1);
872    }
873
874    #[cfg(feature = "gql")]
875    #[test]
876    fn test_params_same_type_comparison() {
877        // Tests that same-type parameter comparisons work correctly
878        let store = Arc::new(LpgStore::new());
879        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
880        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
881
882        let processor = QueryProcessor::for_lpg(store);
883
884        // Compare int property with int parameter
885        let mut params = HashMap::new();
886        params.insert("threshold".to_string(), Value::Int64(75));
887
888        let result = processor
889            .process(
890                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
891                QueryLanguage::Gql,
892                Some(&params),
893            )
894            .unwrap();
895
896        // Only value=100 matches > 75
897        assert_eq!(result.row_count(), 1);
898    }
899
900    #[cfg(feature = "gql")]
901    #[test]
902    fn test_process_empty_result_has_columns() {
903        // Tests that empty results still have correct column names
904        let store = Arc::new(LpgStore::new());
905        // Don't create any nodes
906
907        let processor = QueryProcessor::for_lpg(store);
908        let result = processor
909            .process(
910                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
911                QueryLanguage::Gql,
912                None,
913            )
914            .unwrap();
915
916        assert_eq!(result.row_count(), 0);
917        assert_eq!(result.columns.len(), 2);
918        assert_eq!(result.columns[0], "name");
919        assert_eq!(result.columns[1], "age");
920    }
921
922    #[cfg(feature = "gql")]
923    #[test]
924    fn test_params_string_equality() {
925        // Tests string parameter equality comparison
926        let store = Arc::new(LpgStore::new());
927        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
928        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
929        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
930
931        let processor = QueryProcessor::for_lpg(store);
932
933        let mut params = HashMap::new();
934        params.insert("target".to_string(), Value::String("beta".into()));
935
936        let result = processor
937            .process(
938                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
939                QueryLanguage::Gql,
940                Some(&params),
941            )
942            .unwrap();
943
944        assert_eq!(result.row_count(), 1);
945        assert_eq!(result.rows[0][0], Value::String("beta".into()));
946    }
947}