Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SQL/PGQ (SQL:2023 GRAPH_TABLE)
41    #[cfg(feature = "sql-pgq")]
42    SqlPgq,
43    /// SPARQL 1.1 for RDF
44    #[cfg(feature = "sparql")]
45    Sparql,
46    /// GraphQL for RDF
47    #[cfg(all(feature = "graphql", feature = "rdf"))]
48    GraphQLRdf,
49}
50
51impl QueryLanguage {
52    /// Returns whether this language targets LPG (vs RDF).
53    #[must_use]
54    pub const fn is_lpg(&self) -> bool {
55        match self {
56            #[cfg(feature = "gql")]
57            Self::Gql => true,
58            #[cfg(feature = "cypher")]
59            Self::Cypher => true,
60            #[cfg(feature = "gremlin")]
61            Self::Gremlin => true,
62            #[cfg(feature = "graphql")]
63            Self::GraphQL => true,
64            #[cfg(feature = "sql-pgq")]
65            Self::SqlPgq => true,
66            #[cfg(feature = "sparql")]
67            Self::Sparql => false,
68            #[cfg(all(feature = "graphql", feature = "rdf"))]
69            Self::GraphQLRdf => false,
70        }
71    }
72}
73
74/// Query parameters for prepared statements.
75pub type QueryParams = HashMap<String, Value>;
76
77/// Processes queries through the full pipeline.
78///
79/// The processor holds references to the stores and provides a unified
80/// interface for executing queries in any supported language.
81///
82/// # Example
83///
84/// ```ignore
85/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
86///
87/// let processor = QueryProcessor::for_lpg(store);
88/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
89/// ```
90pub struct QueryProcessor {
91    /// LPG store for property graph queries.
92    lpg_store: Arc<LpgStore>,
93    /// Transaction manager for MVCC operations.
94    tx_manager: Arc<TransactionManager>,
95    /// Catalog for schema and index metadata.
96    catalog: Arc<Catalog>,
97    /// Query optimizer.
98    optimizer: Optimizer,
99    /// Current transaction context (if any).
100    tx_context: Option<(EpochId, TxId)>,
101    /// RDF store for triple pattern queries (optional).
102    #[cfg(feature = "rdf")]
103    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
104}
105
106impl QueryProcessor {
107    /// Creates a new query processor for LPG queries.
108    #[must_use]
109    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
110        let optimizer = Optimizer::from_store(&store);
111        Self {
112            lpg_store: store,
113            tx_manager: Arc::new(TransactionManager::new()),
114            catalog: Arc::new(Catalog::new()),
115            optimizer,
116            tx_context: None,
117            #[cfg(feature = "rdf")]
118            rdf_store: None,
119        }
120    }
121
122    /// Creates a new query processor with a transaction manager.
123    #[must_use]
124    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
125        let optimizer = Optimizer::from_store(&store);
126        Self {
127            lpg_store: store,
128            tx_manager,
129            catalog: Arc::new(Catalog::new()),
130            optimizer,
131            tx_context: None,
132            #[cfg(feature = "rdf")]
133            rdf_store: None,
134        }
135    }
136
137    /// Creates a new query processor with both LPG and RDF stores.
138    #[cfg(feature = "rdf")]
139    #[must_use]
140    pub fn with_rdf(
141        lpg_store: Arc<LpgStore>,
142        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
143    ) -> Self {
144        let optimizer = Optimizer::from_store(&lpg_store);
145        Self {
146            lpg_store,
147            tx_manager: Arc::new(TransactionManager::new()),
148            catalog: Arc::new(Catalog::new()),
149            optimizer,
150            tx_context: None,
151            rdf_store: Some(rdf_store),
152        }
153    }
154
155    /// Sets the transaction context for MVCC visibility.
156    ///
157    /// This should be called when the processor is used within a transaction.
158    #[must_use]
159    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
160        self.tx_context = Some((viewing_epoch, tx_id));
161        self
162    }
163
164    /// Sets a custom catalog.
165    #[must_use]
166    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
167        self.catalog = catalog;
168        self
169    }
170
171    /// Sets a custom optimizer.
172    #[must_use]
173    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
174        self.optimizer = optimizer;
175        self
176    }
177
178    /// Processes a query string and returns results.
179    ///
180    /// Pipeline:
181    /// 1. Parse (language-specific parser → AST)
182    /// 2. Translate (AST → LogicalPlan)
183    /// 3. Bind (semantic validation)
184    /// 4. Optimize (filter pushdown, join reorder, etc.)
185    /// 5. Plan (logical → physical operators)
186    /// 6. Execute (run operators, collect results)
187    ///
188    /// # Arguments
189    ///
190    /// * `query` - The query string
191    /// * `language` - Which query language to use
192    /// * `params` - Optional query parameters for prepared statements
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if any stage of the pipeline fails.
197    pub fn process(
198        &self,
199        query: &str,
200        language: QueryLanguage,
201        params: Option<&QueryParams>,
202    ) -> Result<QueryResult> {
203        if language.is_lpg() {
204            self.process_lpg(query, language, params)
205        } else {
206            #[cfg(feature = "rdf")]
207            {
208                self.process_rdf(query, language, params)
209            }
210            #[cfg(not(feature = "rdf"))]
211            {
212                Err(Error::Internal(
213                    "RDF support not enabled. Compile with --features rdf".to_string(),
214                ))
215            }
216        }
217    }
218
219    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
220    fn process_lpg(
221        &self,
222        query: &str,
223        language: QueryLanguage,
224        params: Option<&QueryParams>,
225    ) -> Result<QueryResult> {
226        let start_time = std::time::Instant::now();
227
228        // 1. Parse and translate to logical plan
229        let mut logical_plan = self.translate_lpg(query, language)?;
230
231        // 2. Substitute parameters if provided
232        if let Some(params) = params {
233            substitute_params(&mut logical_plan, params)?;
234        }
235
236        // 3. Semantic validation
237        let mut binder = Binder::new();
238        let _binding_context = binder.bind(&logical_plan)?;
239
240        // 4. Optimize the plan
241        let optimized_plan = self.optimizer.optimize(logical_plan)?;
242
243        // 5. Convert to physical plan with transaction context
244        let planner = if let Some((epoch, tx_id)) = self.tx_context {
245            Planner::with_context(
246                Arc::clone(&self.lpg_store),
247                Arc::clone(&self.tx_manager),
248                Some(tx_id),
249                epoch,
250            )
251        } else {
252            Planner::with_context(
253                Arc::clone(&self.lpg_store),
254                Arc::clone(&self.tx_manager),
255                None,
256                self.tx_manager.current_epoch(),
257            )
258        };
259        let mut physical_plan = planner.plan(&optimized_plan)?;
260
261        // 6. Execute and collect results
262        let executor = Executor::with_columns(physical_plan.columns.clone());
263        let mut result = executor.execute(physical_plan.operator.as_mut())?;
264
265        // Add execution metrics
266        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
267        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
268        result.execution_time_ms = Some(elapsed_ms);
269        result.rows_scanned = Some(rows_scanned);
270
271        Ok(result)
272    }
273
274    /// Translates an LPG query to a logical plan.
275    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
276        match language {
277            #[cfg(feature = "gql")]
278            QueryLanguage::Gql => {
279                use crate::query::gql_translator;
280                gql_translator::translate(query)
281            }
282            #[cfg(feature = "cypher")]
283            QueryLanguage::Cypher => {
284                use crate::query::cypher_translator;
285                cypher_translator::translate(query)
286            }
287            #[cfg(feature = "gremlin")]
288            QueryLanguage::Gremlin => {
289                use crate::query::gremlin_translator;
290                gremlin_translator::translate(query)
291            }
292            #[cfg(feature = "graphql")]
293            QueryLanguage::GraphQL => {
294                use crate::query::graphql_translator;
295                graphql_translator::translate(query)
296            }
297            #[cfg(feature = "sql-pgq")]
298            QueryLanguage::SqlPgq => {
299                use crate::query::sql_pgq_translator;
300                sql_pgq_translator::translate(query)
301            }
302            #[allow(unreachable_patterns)]
303            _ => Err(Error::Internal(format!(
304                "Language {:?} is not an LPG language",
305                language
306            ))),
307        }
308    }
309
310    /// Processes an RDF query (SPARQL, GraphQL-RDF).
311    #[cfg(feature = "rdf")]
312    fn process_rdf(
313        &self,
314        query: &str,
315        language: QueryLanguage,
316        _params: Option<&QueryParams>,
317    ) -> Result<QueryResult> {
318        use crate::query::planner_rdf::RdfPlanner;
319
320        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
321            Error::Internal("RDF store not configured for this processor".to_string())
322        })?;
323
324        // 1. Parse and translate to logical plan
325        let logical_plan = self.translate_rdf(query, language)?;
326
327        // 2. Semantic validation
328        let mut binder = Binder::new();
329        let _binding_context = binder.bind(&logical_plan)?;
330
331        // 3. Optimize the plan
332        let optimized_plan = self.optimizer.optimize(logical_plan)?;
333
334        // 4. Convert to physical plan (using RDF planner)
335        let planner = RdfPlanner::new(Arc::clone(rdf_store));
336        let mut physical_plan = planner.plan(&optimized_plan)?;
337
338        // 5. Execute and collect results
339        let executor = Executor::with_columns(physical_plan.columns.clone());
340        executor.execute(physical_plan.operator.as_mut())
341    }
342
343    /// Translates an RDF query to a logical plan.
344    #[cfg(feature = "rdf")]
345    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
346        match language {
347            #[cfg(feature = "sparql")]
348            QueryLanguage::Sparql => {
349                use crate::query::sparql_translator;
350                sparql_translator::translate(query)
351            }
352            #[cfg(all(feature = "graphql", feature = "rdf"))]
353            QueryLanguage::GraphQLRdf => {
354                use crate::query::graphql_rdf_translator;
355                // Default namespace for GraphQL-RDF queries
356                graphql_rdf_translator::translate(query, "http://example.org/")
357            }
358            _ => Err(Error::Internal(format!(
359                "Language {:?} is not an RDF language",
360                language
361            ))),
362        }
363    }
364
365    /// Returns a reference to the LPG store.
366    #[must_use]
367    pub fn lpg_store(&self) -> &Arc<LpgStore> {
368        &self.lpg_store
369    }
370
371    /// Returns a reference to the catalog.
372    #[must_use]
373    pub fn catalog(&self) -> &Arc<Catalog> {
374        &self.catalog
375    }
376
377    /// Returns a reference to the optimizer.
378    #[must_use]
379    pub fn optimizer(&self) -> &Optimizer {
380        &self.optimizer
381    }
382
383    /// Returns a reference to the RDF store (if configured).
384    #[cfg(feature = "rdf")]
385    #[must_use]
386    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
387        self.rdf_store.as_ref()
388    }
389}
390
391impl QueryProcessor {
392    /// Returns a reference to the transaction manager.
393    #[must_use]
394    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
395        &self.tx_manager
396    }
397}
398
399/// Substitutes parameters in a logical plan with their values.
400fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
401    substitute_in_operator(&mut plan.root, params)
402}
403
404/// Recursively substitutes parameters in an operator.
405fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
406    use crate::query::plan::*;
407
408    match op {
409        LogicalOperator::Filter(filter) => {
410            substitute_in_expression(&mut filter.predicate, params)?;
411            substitute_in_operator(&mut filter.input, params)?;
412        }
413        LogicalOperator::Return(ret) => {
414            for item in &mut ret.items {
415                substitute_in_expression(&mut item.expression, params)?;
416            }
417            substitute_in_operator(&mut ret.input, params)?;
418        }
419        LogicalOperator::Project(proj) => {
420            for p in &mut proj.projections {
421                substitute_in_expression(&mut p.expression, params)?;
422            }
423            substitute_in_operator(&mut proj.input, params)?;
424        }
425        LogicalOperator::NodeScan(scan) => {
426            if let Some(input) = &mut scan.input {
427                substitute_in_operator(input, params)?;
428            }
429        }
430        LogicalOperator::EdgeScan(scan) => {
431            if let Some(input) = &mut scan.input {
432                substitute_in_operator(input, params)?;
433            }
434        }
435        LogicalOperator::Expand(expand) => {
436            substitute_in_operator(&mut expand.input, params)?;
437        }
438        LogicalOperator::Join(join) => {
439            substitute_in_operator(&mut join.left, params)?;
440            substitute_in_operator(&mut join.right, params)?;
441            for cond in &mut join.conditions {
442                substitute_in_expression(&mut cond.left, params)?;
443                substitute_in_expression(&mut cond.right, params)?;
444            }
445        }
446        LogicalOperator::LeftJoin(join) => {
447            substitute_in_operator(&mut join.left, params)?;
448            substitute_in_operator(&mut join.right, params)?;
449            if let Some(cond) = &mut join.condition {
450                substitute_in_expression(cond, params)?;
451            }
452        }
453        LogicalOperator::Aggregate(agg) => {
454            for expr in &mut agg.group_by {
455                substitute_in_expression(expr, params)?;
456            }
457            for agg_expr in &mut agg.aggregates {
458                if let Some(expr) = &mut agg_expr.expression {
459                    substitute_in_expression(expr, params)?;
460                }
461            }
462            substitute_in_operator(&mut agg.input, params)?;
463        }
464        LogicalOperator::Sort(sort) => {
465            for key in &mut sort.keys {
466                substitute_in_expression(&mut key.expression, params)?;
467            }
468            substitute_in_operator(&mut sort.input, params)?;
469        }
470        LogicalOperator::Limit(limit) => {
471            substitute_in_operator(&mut limit.input, params)?;
472        }
473        LogicalOperator::Skip(skip) => {
474            substitute_in_operator(&mut skip.input, params)?;
475        }
476        LogicalOperator::Distinct(distinct) => {
477            substitute_in_operator(&mut distinct.input, params)?;
478        }
479        LogicalOperator::CreateNode(create) => {
480            for (_, expr) in &mut create.properties {
481                substitute_in_expression(expr, params)?;
482            }
483            if let Some(input) = &mut create.input {
484                substitute_in_operator(input, params)?;
485            }
486        }
487        LogicalOperator::CreateEdge(create) => {
488            for (_, expr) in &mut create.properties {
489                substitute_in_expression(expr, params)?;
490            }
491            substitute_in_operator(&mut create.input, params)?;
492        }
493        LogicalOperator::DeleteNode(delete) => {
494            substitute_in_operator(&mut delete.input, params)?;
495        }
496        LogicalOperator::DeleteEdge(delete) => {
497            substitute_in_operator(&mut delete.input, params)?;
498        }
499        LogicalOperator::SetProperty(set) => {
500            for (_, expr) in &mut set.properties {
501                substitute_in_expression(expr, params)?;
502            }
503            substitute_in_operator(&mut set.input, params)?;
504        }
505        LogicalOperator::Union(union) => {
506            for input in &mut union.inputs {
507                substitute_in_operator(input, params)?;
508            }
509        }
510        LogicalOperator::AntiJoin(anti) => {
511            substitute_in_operator(&mut anti.left, params)?;
512            substitute_in_operator(&mut anti.right, params)?;
513        }
514        LogicalOperator::Bind(bind) => {
515            substitute_in_expression(&mut bind.expression, params)?;
516            substitute_in_operator(&mut bind.input, params)?;
517        }
518        LogicalOperator::TripleScan(scan) => {
519            if let Some(input) = &mut scan.input {
520                substitute_in_operator(input, params)?;
521            }
522        }
523        LogicalOperator::Unwind(unwind) => {
524            substitute_in_expression(&mut unwind.expression, params)?;
525            substitute_in_operator(&mut unwind.input, params)?;
526        }
527        LogicalOperator::Merge(merge) => {
528            for (_, expr) in &mut merge.match_properties {
529                substitute_in_expression(expr, params)?;
530            }
531            for (_, expr) in &mut merge.on_create {
532                substitute_in_expression(expr, params)?;
533            }
534            for (_, expr) in &mut merge.on_match {
535                substitute_in_expression(expr, params)?;
536            }
537            substitute_in_operator(&mut merge.input, params)?;
538        }
539        LogicalOperator::AddLabel(add_label) => {
540            substitute_in_operator(&mut add_label.input, params)?;
541        }
542        LogicalOperator::RemoveLabel(remove_label) => {
543            substitute_in_operator(&mut remove_label.input, params)?;
544        }
545        LogicalOperator::ShortestPath(sp) => {
546            substitute_in_operator(&mut sp.input, params)?;
547        }
548        // SPARQL Update operators
549        LogicalOperator::InsertTriple(insert) => {
550            if let Some(ref mut input) = insert.input {
551                substitute_in_operator(input, params)?;
552            }
553        }
554        LogicalOperator::DeleteTriple(delete) => {
555            if let Some(ref mut input) = delete.input {
556                substitute_in_operator(input, params)?;
557            }
558        }
559        LogicalOperator::Modify(modify) => {
560            substitute_in_operator(&mut modify.where_clause, params)?;
561        }
562        LogicalOperator::ClearGraph(_)
563        | LogicalOperator::CreateGraph(_)
564        | LogicalOperator::DropGraph(_)
565        | LogicalOperator::LoadGraph(_)
566        | LogicalOperator::CopyGraph(_)
567        | LogicalOperator::MoveGraph(_)
568        | LogicalOperator::AddGraph(_) => {}
569        LogicalOperator::Empty => {}
570        LogicalOperator::VectorScan(scan) => {
571            substitute_in_expression(&mut scan.query_vector, params)?;
572            if let Some(ref mut input) = scan.input {
573                substitute_in_operator(input, params)?;
574            }
575        }
576        LogicalOperator::VectorJoin(join) => {
577            substitute_in_expression(&mut join.query_vector, params)?;
578            substitute_in_operator(&mut join.input, params)?;
579        }
580        // DDL operators have no expressions to substitute
581        LogicalOperator::CreatePropertyGraph(_) => {}
582        // Procedure calls: arguments could contain parameters but we handle at execution time
583        LogicalOperator::CallProcedure(_) => {}
584    }
585    Ok(())
586}
587
588/// Substitutes parameters in an expression with their values.
589fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
590    use crate::query::plan::LogicalExpression;
591
592    match expr {
593        LogicalExpression::Parameter(name) => {
594            if let Some(value) = params.get(name) {
595                *expr = LogicalExpression::Literal(value.clone());
596            } else {
597                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
598            }
599        }
600        LogicalExpression::Binary { left, right, .. } => {
601            substitute_in_expression(left, params)?;
602            substitute_in_expression(right, params)?;
603        }
604        LogicalExpression::Unary { operand, .. } => {
605            substitute_in_expression(operand, params)?;
606        }
607        LogicalExpression::FunctionCall { args, .. } => {
608            for arg in args {
609                substitute_in_expression(arg, params)?;
610            }
611        }
612        LogicalExpression::List(items) => {
613            for item in items {
614                substitute_in_expression(item, params)?;
615            }
616        }
617        LogicalExpression::Map(pairs) => {
618            for (_, value) in pairs {
619                substitute_in_expression(value, params)?;
620            }
621        }
622        LogicalExpression::IndexAccess { base, index } => {
623            substitute_in_expression(base, params)?;
624            substitute_in_expression(index, params)?;
625        }
626        LogicalExpression::SliceAccess { base, start, end } => {
627            substitute_in_expression(base, params)?;
628            if let Some(s) = start {
629                substitute_in_expression(s, params)?;
630            }
631            if let Some(e) = end {
632                substitute_in_expression(e, params)?;
633            }
634        }
635        LogicalExpression::Case {
636            operand,
637            when_clauses,
638            else_clause,
639        } => {
640            if let Some(op) = operand {
641                substitute_in_expression(op, params)?;
642            }
643            for (cond, result) in when_clauses {
644                substitute_in_expression(cond, params)?;
645                substitute_in_expression(result, params)?;
646            }
647            if let Some(el) = else_clause {
648                substitute_in_expression(el, params)?;
649            }
650        }
651        LogicalExpression::Property { .. }
652        | LogicalExpression::Variable(_)
653        | LogicalExpression::Literal(_)
654        | LogicalExpression::Labels(_)
655        | LogicalExpression::Type(_)
656        | LogicalExpression::Id(_) => {}
657        LogicalExpression::ListComprehension {
658            list_expr,
659            filter_expr,
660            map_expr,
661            ..
662        } => {
663            substitute_in_expression(list_expr, params)?;
664            if let Some(filter) = filter_expr {
665                substitute_in_expression(filter, params)?;
666            }
667            substitute_in_expression(map_expr, params)?;
668        }
669        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
670            // Subqueries would need recursive parameter substitution
671        }
672    }
673    Ok(())
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn test_query_language_is_lpg() {
682        #[cfg(feature = "gql")]
683        assert!(QueryLanguage::Gql.is_lpg());
684        #[cfg(feature = "cypher")]
685        assert!(QueryLanguage::Cypher.is_lpg());
686        #[cfg(feature = "sparql")]
687        assert!(!QueryLanguage::Sparql.is_lpg());
688    }
689
690    #[test]
691    fn test_processor_creation() {
692        let store = Arc::new(LpgStore::new());
693        let processor = QueryProcessor::for_lpg(store);
694        assert!(processor.lpg_store().node_count() == 0);
695    }
696
697    #[cfg(feature = "gql")]
698    #[test]
699    fn test_process_simple_gql() {
700        let store = Arc::new(LpgStore::new());
701        store.create_node(&["Person"]);
702        store.create_node(&["Person"]);
703
704        let processor = QueryProcessor::for_lpg(store);
705        let result = processor
706            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
707            .unwrap();
708
709        assert_eq!(result.row_count(), 2);
710        assert_eq!(result.columns[0], "n");
711    }
712
713    #[cfg(feature = "cypher")]
714    #[test]
715    fn test_process_simple_cypher() {
716        let store = Arc::new(LpgStore::new());
717        store.create_node(&["Person"]);
718
719        let processor = QueryProcessor::for_lpg(store);
720        let result = processor
721            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
722            .unwrap();
723
724        assert_eq!(result.row_count(), 1);
725    }
726
727    #[cfg(feature = "gql")]
728    #[test]
729    fn test_process_with_params() {
730        let store = Arc::new(LpgStore::new());
731        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
732        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
733        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
734
735        let processor = QueryProcessor::for_lpg(store);
736
737        // Query with parameter
738        let mut params = HashMap::new();
739        params.insert("min_age".to_string(), Value::Int64(30));
740
741        let result = processor
742            .process(
743                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
744                QueryLanguage::Gql,
745                Some(&params),
746            )
747            .unwrap();
748
749        // Should return 2 people (ages 35 and 45)
750        assert_eq!(result.row_count(), 2);
751    }
752
753    #[cfg(feature = "gql")]
754    #[test]
755    fn test_missing_param_error() {
756        let store = Arc::new(LpgStore::new());
757        store.create_node(&["Person"]);
758
759        let processor = QueryProcessor::for_lpg(store);
760
761        // Query with parameter but empty params map (missing the required param)
762        let params: HashMap<String, Value> = HashMap::new();
763        let result = processor.process(
764            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
765            QueryLanguage::Gql,
766            Some(&params),
767        );
768
769        // Should fail with missing parameter error
770        assert!(result.is_err());
771        let err = result.unwrap_err();
772        assert!(
773            err.to_string().contains("Missing parameter"),
774            "Expected 'Missing parameter' error, got: {}",
775            err
776        );
777    }
778
779    #[cfg(feature = "gql")]
780    #[test]
781    fn test_params_in_filter_with_property() {
782        // Tests parameter substitution in WHERE clause with property comparison
783        let store = Arc::new(LpgStore::new());
784        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
785        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
786
787        let processor = QueryProcessor::for_lpg(store);
788
789        let mut params = HashMap::new();
790        params.insert("threshold".to_string(), Value::Int64(15));
791
792        let result = processor
793            .process(
794                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
795                QueryLanguage::Gql,
796                Some(&params),
797            )
798            .unwrap();
799
800        // Only value=20 matches > 15
801        assert_eq!(result.row_count(), 1);
802        let row = &result.rows[0];
803        assert_eq!(row[0], Value::Int64(20));
804    }
805
806    #[cfg(feature = "gql")]
807    #[test]
808    fn test_params_in_multiple_where_conditions() {
809        // Tests multiple parameters in WHERE clause with AND
810        let store = Arc::new(LpgStore::new());
811        store.create_node_with_props(
812            &["Person"],
813            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
814        );
815        store.create_node_with_props(
816            &["Person"],
817            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
818        );
819        store.create_node_with_props(
820            &["Person"],
821            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
822        );
823
824        let processor = QueryProcessor::for_lpg(store);
825
826        let mut params = HashMap::new();
827        params.insert("min_age".to_string(), Value::Int64(30));
828        params.insert("min_score".to_string(), Value::Int64(75));
829
830        let result = processor
831            .process(
832                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
833                QueryLanguage::Gql,
834                Some(&params),
835            )
836            .unwrap();
837
838        // Only the person with age=35, score=90 matches both conditions
839        assert_eq!(result.row_count(), 1);
840    }
841
842    #[cfg(feature = "gql")]
843    #[test]
844    fn test_params_with_in_list() {
845        // Tests parameter as a value checked against IN list
846        let store = Arc::new(LpgStore::new());
847        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
848        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
849        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
850
851        let processor = QueryProcessor::for_lpg(store);
852
853        // Check if a parameter value matches any of the statuses
854        let mut params = HashMap::new();
855        params.insert("target".to_string(), Value::String("active".into()));
856
857        let result = processor
858            .process(
859                "MATCH (n:Item) WHERE n.status = $target RETURN n",
860                QueryLanguage::Gql,
861                Some(&params),
862            )
863            .unwrap();
864
865        assert_eq!(result.row_count(), 1);
866    }
867
868    #[cfg(feature = "gql")]
869    #[test]
870    fn test_params_same_type_comparison() {
871        // Tests that same-type parameter comparisons work correctly
872        let store = Arc::new(LpgStore::new());
873        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
874        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
875
876        let processor = QueryProcessor::for_lpg(store);
877
878        // Compare int property with int parameter
879        let mut params = HashMap::new();
880        params.insert("threshold".to_string(), Value::Int64(75));
881
882        let result = processor
883            .process(
884                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
885                QueryLanguage::Gql,
886                Some(&params),
887            )
888            .unwrap();
889
890        // Only value=100 matches > 75
891        assert_eq!(result.row_count(), 1);
892    }
893
894    #[cfg(feature = "gql")]
895    #[test]
896    fn test_process_empty_result_has_columns() {
897        // Tests that empty results still have correct column names
898        let store = Arc::new(LpgStore::new());
899        // Don't create any nodes
900
901        let processor = QueryProcessor::for_lpg(store);
902        let result = processor
903            .process(
904                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
905                QueryLanguage::Gql,
906                None,
907            )
908            .unwrap();
909
910        assert_eq!(result.row_count(), 0);
911        assert_eq!(result.columns.len(), 2);
912        assert_eq!(result.columns[0], "name");
913        assert_eq!(result.columns[1], "age");
914    }
915
916    #[cfg(feature = "gql")]
917    #[test]
918    fn test_params_string_equality() {
919        // Tests string parameter equality comparison
920        let store = Arc::new(LpgStore::new());
921        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
922        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
923        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
924
925        let processor = QueryProcessor::for_lpg(store);
926
927        let mut params = HashMap::new();
928        params.insert("target".to_string(), Value::String("beta".into()));
929
930        let result = processor
931            .process(
932                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
933                QueryLanguage::Gql,
934                Some(&params),
935            )
936            .unwrap();
937
938        assert_eq!(result.row_count(), 1);
939        assert_eq!(result.rows[0][0], Value::String("beta".into()));
940    }
941}