Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SPARQL 1.1 for RDF
41    #[cfg(feature = "sparql")]
42    Sparql,
43    /// GraphQL for RDF
44    #[cfg(all(feature = "graphql", feature = "rdf"))]
45    GraphQLRdf,
46}
47
48impl QueryLanguage {
49    /// Returns whether this language targets LPG (vs RDF).
50    #[must_use]
51    pub const fn is_lpg(&self) -> bool {
52        match self {
53            #[cfg(feature = "gql")]
54            Self::Gql => true,
55            #[cfg(feature = "cypher")]
56            Self::Cypher => true,
57            #[cfg(feature = "gremlin")]
58            Self::Gremlin => true,
59            #[cfg(feature = "graphql")]
60            Self::GraphQL => true,
61            #[cfg(feature = "sparql")]
62            Self::Sparql => false,
63            #[cfg(all(feature = "graphql", feature = "rdf"))]
64            Self::GraphQLRdf => false,
65        }
66    }
67}
68
69/// Query parameters for prepared statements.
70pub type QueryParams = HashMap<String, Value>;
71
72/// Processes queries through the full pipeline.
73///
74/// The processor holds references to the stores and provides a unified
75/// interface for executing queries in any supported language.
76///
77/// # Example
78///
79/// ```ignore
80/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
81///
82/// let processor = QueryProcessor::for_lpg(store);
83/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
84/// ```
85pub struct QueryProcessor {
86    /// LPG store for property graph queries.
87    lpg_store: Arc<LpgStore>,
88    /// Transaction manager for MVCC operations.
89    tx_manager: Arc<TransactionManager>,
90    /// Catalog for schema and index metadata.
91    catalog: Arc<Catalog>,
92    /// Query optimizer.
93    optimizer: Optimizer,
94    /// Current transaction context (if any).
95    tx_context: Option<(EpochId, TxId)>,
96    /// RDF store for triple pattern queries (optional).
97    #[cfg(feature = "rdf")]
98    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
99}
100
101impl QueryProcessor {
102    /// Creates a new query processor for LPG queries.
103    #[must_use]
104    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
105        Self {
106            lpg_store: store,
107            tx_manager: Arc::new(TransactionManager::new()),
108            catalog: Arc::new(Catalog::new()),
109            optimizer: Optimizer::new(),
110            tx_context: None,
111            #[cfg(feature = "rdf")]
112            rdf_store: None,
113        }
114    }
115
116    /// Creates a new query processor with a transaction manager.
117    #[must_use]
118    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
119        Self {
120            lpg_store: store,
121            tx_manager,
122            catalog: Arc::new(Catalog::new()),
123            optimizer: Optimizer::new(),
124            tx_context: None,
125            #[cfg(feature = "rdf")]
126            rdf_store: None,
127        }
128    }
129
130    /// Creates a new query processor with both LPG and RDF stores.
131    #[cfg(feature = "rdf")]
132    #[must_use]
133    pub fn with_rdf(
134        lpg_store: Arc<LpgStore>,
135        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
136    ) -> Self {
137        Self {
138            lpg_store,
139            tx_manager: Arc::new(TransactionManager::new()),
140            catalog: Arc::new(Catalog::new()),
141            optimizer: Optimizer::new(),
142            tx_context: None,
143            rdf_store: Some(rdf_store),
144        }
145    }
146
147    /// Sets the transaction context for MVCC visibility.
148    ///
149    /// This should be called when the processor is used within a transaction.
150    #[must_use]
151    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
152        self.tx_context = Some((viewing_epoch, tx_id));
153        self
154    }
155
156    /// Sets a custom catalog.
157    #[must_use]
158    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
159        self.catalog = catalog;
160        self
161    }
162
163    /// Sets a custom optimizer.
164    #[must_use]
165    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
166        self.optimizer = optimizer;
167        self
168    }
169
170    /// Processes a query string and returns results.
171    ///
172    /// Pipeline:
173    /// 1. Parse (language-specific parser → AST)
174    /// 2. Translate (AST → LogicalPlan)
175    /// 3. Bind (semantic validation)
176    /// 4. Optimize (filter pushdown, join reorder, etc.)
177    /// 5. Plan (logical → physical operators)
178    /// 6. Execute (run operators, collect results)
179    ///
180    /// # Arguments
181    ///
182    /// * `query` - The query string
183    /// * `language` - Which query language to use
184    /// * `params` - Optional query parameters for prepared statements
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if any stage of the pipeline fails.
189    pub fn process(
190        &self,
191        query: &str,
192        language: QueryLanguage,
193        params: Option<&QueryParams>,
194    ) -> Result<QueryResult> {
195        if language.is_lpg() {
196            self.process_lpg(query, language, params)
197        } else {
198            #[cfg(feature = "rdf")]
199            {
200                self.process_rdf(query, language, params)
201            }
202            #[cfg(not(feature = "rdf"))]
203            {
204                Err(Error::Internal(
205                    "RDF support not enabled. Compile with --features rdf".to_string(),
206                ))
207            }
208        }
209    }
210
211    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
212    fn process_lpg(
213        &self,
214        query: &str,
215        language: QueryLanguage,
216        params: Option<&QueryParams>,
217    ) -> Result<QueryResult> {
218        // 1. Parse and translate to logical plan
219        let mut logical_plan = self.translate_lpg(query, language)?;
220
221        // 2. Substitute parameters if provided
222        if let Some(params) = params {
223            substitute_params(&mut logical_plan, params)?;
224        }
225
226        // 3. Semantic validation
227        let mut binder = Binder::new();
228        let _binding_context = binder.bind(&logical_plan)?;
229
230        // 4. Optimize the plan
231        let optimized_plan = self.optimizer.optimize(logical_plan)?;
232
233        // 5. Convert to physical plan with transaction context
234        let planner = if let Some((epoch, tx_id)) = self.tx_context {
235            Planner::with_context(
236                Arc::clone(&self.lpg_store),
237                Arc::clone(&self.tx_manager),
238                Some(tx_id),
239                epoch,
240            )
241        } else {
242            Planner::with_context(
243                Arc::clone(&self.lpg_store),
244                Arc::clone(&self.tx_manager),
245                None,
246                self.tx_manager.current_epoch(),
247            )
248        };
249        let mut physical_plan = planner.plan(&optimized_plan)?;
250
251        // 6. Execute and collect results
252        let executor = Executor::with_columns(physical_plan.columns.clone());
253        executor.execute(physical_plan.operator.as_mut())
254    }
255
256    /// Translates an LPG query to a logical plan.
257    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
258        match language {
259            #[cfg(feature = "gql")]
260            QueryLanguage::Gql => {
261                use crate::query::gql_translator;
262                gql_translator::translate(query)
263            }
264            #[cfg(feature = "cypher")]
265            QueryLanguage::Cypher => {
266                use crate::query::cypher_translator;
267                cypher_translator::translate(query)
268            }
269            #[cfg(feature = "gremlin")]
270            QueryLanguage::Gremlin => {
271                use crate::query::gremlin_translator;
272                gremlin_translator::translate(query)
273            }
274            #[cfg(feature = "graphql")]
275            QueryLanguage::GraphQL => {
276                use crate::query::graphql_translator;
277                graphql_translator::translate(query)
278            }
279            #[allow(unreachable_patterns)]
280            _ => Err(Error::Internal(format!(
281                "Language {:?} is not an LPG language",
282                language
283            ))),
284        }
285    }
286
287    /// Processes an RDF query (SPARQL, GraphQL-RDF).
288    #[cfg(feature = "rdf")]
289    fn process_rdf(
290        &self,
291        query: &str,
292        language: QueryLanguage,
293        _params: Option<&QueryParams>,
294    ) -> Result<QueryResult> {
295        use crate::query::planner_rdf::RdfPlanner;
296
297        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
298            Error::Internal("RDF store not configured for this processor".to_string())
299        })?;
300
301        // 1. Parse and translate to logical plan
302        let logical_plan = self.translate_rdf(query, language)?;
303
304        // 2. Semantic validation
305        let mut binder = Binder::new();
306        let _binding_context = binder.bind(&logical_plan)?;
307
308        // 3. Optimize the plan
309        let optimized_plan = self.optimizer.optimize(logical_plan)?;
310
311        // 4. Convert to physical plan (using RDF planner)
312        let planner = RdfPlanner::new(Arc::clone(rdf_store));
313        let mut physical_plan = planner.plan(&optimized_plan)?;
314
315        // 5. Execute and collect results
316        let executor = Executor::with_columns(physical_plan.columns.clone());
317        executor.execute(physical_plan.operator.as_mut())
318    }
319
320    /// Translates an RDF query to a logical plan.
321    #[cfg(feature = "rdf")]
322    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
323        match language {
324            #[cfg(feature = "sparql")]
325            QueryLanguage::Sparql => {
326                use crate::query::sparql_translator;
327                sparql_translator::translate(query)
328            }
329            #[cfg(all(feature = "graphql", feature = "rdf"))]
330            QueryLanguage::GraphQLRdf => {
331                use crate::query::graphql_rdf_translator;
332                // Default namespace for GraphQL-RDF queries
333                graphql_rdf_translator::translate(query, "http://example.org/")
334            }
335            _ => Err(Error::Internal(format!(
336                "Language {:?} is not an RDF language",
337                language
338            ))),
339        }
340    }
341
342    /// Returns a reference to the LPG store.
343    #[must_use]
344    pub fn lpg_store(&self) -> &Arc<LpgStore> {
345        &self.lpg_store
346    }
347
348    /// Returns a reference to the catalog.
349    #[must_use]
350    pub fn catalog(&self) -> &Arc<Catalog> {
351        &self.catalog
352    }
353
354    /// Returns a reference to the optimizer.
355    #[must_use]
356    pub fn optimizer(&self) -> &Optimizer {
357        &self.optimizer
358    }
359
360    /// Returns a reference to the RDF store (if configured).
361    #[cfg(feature = "rdf")]
362    #[must_use]
363    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
364        self.rdf_store.as_ref()
365    }
366}
367
368// Legacy compatibility: Keep the old API working
369impl QueryProcessor {
370    /// Creates a new query processor (legacy API).
371    ///
372    /// This creates a processor with an empty in-memory store.
373    /// Prefer using [`QueryProcessor::for_lpg`] with an explicit store.
374    #[must_use]
375    #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
376    pub fn new() -> Self {
377        Self::for_lpg(Arc::new(LpgStore::new()))
378    }
379
380    /// Processes a query using default GQL language (legacy API).
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the query fails.
385    #[cfg(feature = "gql")]
386    #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
387    pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
388        self.process(query, QueryLanguage::Gql, None)
389    }
390
391    /// Returns a reference to the transaction manager.
392    #[must_use]
393    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
394        &self.tx_manager
395    }
396}
397
398impl Default for QueryProcessor {
399    fn default() -> Self {
400        Self::for_lpg(Arc::new(LpgStore::new()))
401    }
402}
403
404/// Substitutes parameters in a logical plan with their values.
405fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
406    substitute_in_operator(&mut plan.root, params)
407}
408
409/// Recursively substitutes parameters in an operator.
410fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
411    use crate::query::plan::*;
412
413    match op {
414        LogicalOperator::Filter(filter) => {
415            substitute_in_expression(&mut filter.predicate, params)?;
416            substitute_in_operator(&mut filter.input, params)?;
417        }
418        LogicalOperator::Return(ret) => {
419            for item in &mut ret.items {
420                substitute_in_expression(&mut item.expression, params)?;
421            }
422            substitute_in_operator(&mut ret.input, params)?;
423        }
424        LogicalOperator::Project(proj) => {
425            for p in &mut proj.projections {
426                substitute_in_expression(&mut p.expression, params)?;
427            }
428            substitute_in_operator(&mut proj.input, params)?;
429        }
430        LogicalOperator::NodeScan(scan) => {
431            if let Some(input) = &mut scan.input {
432                substitute_in_operator(input, params)?;
433            }
434        }
435        LogicalOperator::EdgeScan(scan) => {
436            if let Some(input) = &mut scan.input {
437                substitute_in_operator(input, params)?;
438            }
439        }
440        LogicalOperator::Expand(expand) => {
441            substitute_in_operator(&mut expand.input, params)?;
442        }
443        LogicalOperator::Join(join) => {
444            substitute_in_operator(&mut join.left, params)?;
445            substitute_in_operator(&mut join.right, params)?;
446            for cond in &mut join.conditions {
447                substitute_in_expression(&mut cond.left, params)?;
448                substitute_in_expression(&mut cond.right, params)?;
449            }
450        }
451        LogicalOperator::LeftJoin(join) => {
452            substitute_in_operator(&mut join.left, params)?;
453            substitute_in_operator(&mut join.right, params)?;
454            if let Some(cond) = &mut join.condition {
455                substitute_in_expression(cond, params)?;
456            }
457        }
458        LogicalOperator::Aggregate(agg) => {
459            for expr in &mut agg.group_by {
460                substitute_in_expression(expr, params)?;
461            }
462            for agg_expr in &mut agg.aggregates {
463                if let Some(expr) = &mut agg_expr.expression {
464                    substitute_in_expression(expr, params)?;
465                }
466            }
467            substitute_in_operator(&mut agg.input, params)?;
468        }
469        LogicalOperator::Sort(sort) => {
470            for key in &mut sort.keys {
471                substitute_in_expression(&mut key.expression, params)?;
472            }
473            substitute_in_operator(&mut sort.input, params)?;
474        }
475        LogicalOperator::Limit(limit) => {
476            substitute_in_operator(&mut limit.input, params)?;
477        }
478        LogicalOperator::Skip(skip) => {
479            substitute_in_operator(&mut skip.input, params)?;
480        }
481        LogicalOperator::Distinct(distinct) => {
482            substitute_in_operator(&mut distinct.input, params)?;
483        }
484        LogicalOperator::CreateNode(create) => {
485            for (_, expr) in &mut create.properties {
486                substitute_in_expression(expr, params)?;
487            }
488            if let Some(input) = &mut create.input {
489                substitute_in_operator(input, params)?;
490            }
491        }
492        LogicalOperator::CreateEdge(create) => {
493            for (_, expr) in &mut create.properties {
494                substitute_in_expression(expr, params)?;
495            }
496            substitute_in_operator(&mut create.input, params)?;
497        }
498        LogicalOperator::DeleteNode(delete) => {
499            substitute_in_operator(&mut delete.input, params)?;
500        }
501        LogicalOperator::DeleteEdge(delete) => {
502            substitute_in_operator(&mut delete.input, params)?;
503        }
504        LogicalOperator::SetProperty(set) => {
505            for (_, expr) in &mut set.properties {
506                substitute_in_expression(expr, params)?;
507            }
508            substitute_in_operator(&mut set.input, params)?;
509        }
510        LogicalOperator::Union(union) => {
511            for input in &mut union.inputs {
512                substitute_in_operator(input, params)?;
513            }
514        }
515        LogicalOperator::AntiJoin(anti) => {
516            substitute_in_operator(&mut anti.left, params)?;
517            substitute_in_operator(&mut anti.right, params)?;
518        }
519        LogicalOperator::Bind(bind) => {
520            substitute_in_expression(&mut bind.expression, params)?;
521            substitute_in_operator(&mut bind.input, params)?;
522        }
523        LogicalOperator::TripleScan(scan) => {
524            if let Some(input) = &mut scan.input {
525                substitute_in_operator(input, params)?;
526            }
527        }
528        LogicalOperator::Unwind(unwind) => {
529            substitute_in_expression(&mut unwind.expression, params)?;
530            substitute_in_operator(&mut unwind.input, params)?;
531        }
532        LogicalOperator::Merge(merge) => {
533            for (_, expr) in &mut merge.match_properties {
534                substitute_in_expression(expr, params)?;
535            }
536            for (_, expr) in &mut merge.on_create {
537                substitute_in_expression(expr, params)?;
538            }
539            for (_, expr) in &mut merge.on_match {
540                substitute_in_expression(expr, params)?;
541            }
542            substitute_in_operator(&mut merge.input, params)?;
543        }
544        LogicalOperator::AddLabel(add_label) => {
545            substitute_in_operator(&mut add_label.input, params)?;
546        }
547        LogicalOperator::RemoveLabel(remove_label) => {
548            substitute_in_operator(&mut remove_label.input, params)?;
549        }
550        LogicalOperator::ShortestPath(sp) => {
551            substitute_in_operator(&mut sp.input, params)?;
552        }
553        // SPARQL Update operators
554        LogicalOperator::InsertTriple(insert) => {
555            if let Some(ref mut input) = insert.input {
556                substitute_in_operator(input, params)?;
557            }
558        }
559        LogicalOperator::DeleteTriple(delete) => {
560            if let Some(ref mut input) = delete.input {
561                substitute_in_operator(input, params)?;
562            }
563        }
564        LogicalOperator::Modify(modify) => {
565            substitute_in_operator(&mut modify.where_clause, params)?;
566        }
567        LogicalOperator::ClearGraph(_)
568        | LogicalOperator::CreateGraph(_)
569        | LogicalOperator::DropGraph(_)
570        | LogicalOperator::LoadGraph(_)
571        | LogicalOperator::CopyGraph(_)
572        | LogicalOperator::MoveGraph(_)
573        | LogicalOperator::AddGraph(_) => {}
574        LogicalOperator::Empty => {}
575    }
576    Ok(())
577}
578
579/// Substitutes parameters in an expression with their values.
580fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
581    use crate::query::plan::LogicalExpression;
582
583    match expr {
584        LogicalExpression::Parameter(name) => {
585            if let Some(value) = params.get(name) {
586                *expr = LogicalExpression::Literal(value.clone());
587            } else {
588                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
589            }
590        }
591        LogicalExpression::Binary { left, right, .. } => {
592            substitute_in_expression(left, params)?;
593            substitute_in_expression(right, params)?;
594        }
595        LogicalExpression::Unary { operand, .. } => {
596            substitute_in_expression(operand, params)?;
597        }
598        LogicalExpression::FunctionCall { args, .. } => {
599            for arg in args {
600                substitute_in_expression(arg, params)?;
601            }
602        }
603        LogicalExpression::List(items) => {
604            for item in items {
605                substitute_in_expression(item, params)?;
606            }
607        }
608        LogicalExpression::Map(pairs) => {
609            for (_, value) in pairs {
610                substitute_in_expression(value, params)?;
611            }
612        }
613        LogicalExpression::IndexAccess { base, index } => {
614            substitute_in_expression(base, params)?;
615            substitute_in_expression(index, params)?;
616        }
617        LogicalExpression::SliceAccess { base, start, end } => {
618            substitute_in_expression(base, params)?;
619            if let Some(s) = start {
620                substitute_in_expression(s, params)?;
621            }
622            if let Some(e) = end {
623                substitute_in_expression(e, params)?;
624            }
625        }
626        LogicalExpression::Case {
627            operand,
628            when_clauses,
629            else_clause,
630        } => {
631            if let Some(op) = operand {
632                substitute_in_expression(op, params)?;
633            }
634            for (cond, result) in when_clauses {
635                substitute_in_expression(cond, params)?;
636                substitute_in_expression(result, params)?;
637            }
638            if let Some(el) = else_clause {
639                substitute_in_expression(el, params)?;
640            }
641        }
642        LogicalExpression::Property { .. }
643        | LogicalExpression::Variable(_)
644        | LogicalExpression::Literal(_)
645        | LogicalExpression::Labels(_)
646        | LogicalExpression::Type(_)
647        | LogicalExpression::Id(_) => {}
648        LogicalExpression::ListComprehension {
649            list_expr,
650            filter_expr,
651            map_expr,
652            ..
653        } => {
654            substitute_in_expression(list_expr, params)?;
655            if let Some(filter) = filter_expr {
656                substitute_in_expression(filter, params)?;
657            }
658            substitute_in_expression(map_expr, params)?;
659        }
660        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
661            // Subqueries would need recursive parameter substitution
662        }
663    }
664    Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670
671    #[test]
672    fn test_query_language_is_lpg() {
673        #[cfg(feature = "gql")]
674        assert!(QueryLanguage::Gql.is_lpg());
675        #[cfg(feature = "cypher")]
676        assert!(QueryLanguage::Cypher.is_lpg());
677        #[cfg(feature = "sparql")]
678        assert!(!QueryLanguage::Sparql.is_lpg());
679    }
680
681    #[test]
682    fn test_processor_creation() {
683        let store = Arc::new(LpgStore::new());
684        let processor = QueryProcessor::for_lpg(store);
685        assert!(processor.lpg_store().node_count() == 0);
686    }
687
688    #[cfg(feature = "gql")]
689    #[test]
690    fn test_process_simple_gql() {
691        let store = Arc::new(LpgStore::new());
692        store.create_node(&["Person"]);
693        store.create_node(&["Person"]);
694
695        let processor = QueryProcessor::for_lpg(store);
696        let result = processor
697            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
698            .unwrap();
699
700        assert_eq!(result.row_count(), 2);
701        assert_eq!(result.columns[0], "n");
702    }
703
704    #[cfg(feature = "cypher")]
705    #[test]
706    fn test_process_simple_cypher() {
707        let store = Arc::new(LpgStore::new());
708        store.create_node(&["Person"]);
709
710        let processor = QueryProcessor::for_lpg(store);
711        let result = processor
712            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
713            .unwrap();
714
715        assert_eq!(result.row_count(), 1);
716    }
717
718    #[cfg(feature = "gql")]
719    #[test]
720    fn test_process_with_params() {
721        let store = Arc::new(LpgStore::new());
722        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
723        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
724        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
725
726        let processor = QueryProcessor::for_lpg(store);
727
728        // Query with parameter
729        let mut params = HashMap::new();
730        params.insert("min_age".to_string(), Value::Int64(30));
731
732        let result = processor
733            .process(
734                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
735                QueryLanguage::Gql,
736                Some(&params),
737            )
738            .unwrap();
739
740        // Should return 2 people (ages 35 and 45)
741        assert_eq!(result.row_count(), 2);
742    }
743
744    #[cfg(feature = "gql")]
745    #[test]
746    fn test_missing_param_error() {
747        let store = Arc::new(LpgStore::new());
748        store.create_node(&["Person"]);
749
750        let processor = QueryProcessor::for_lpg(store);
751
752        // Query with parameter but empty params map (missing the required param)
753        let params: HashMap<String, Value> = HashMap::new();
754        let result = processor.process(
755            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
756            QueryLanguage::Gql,
757            Some(&params),
758        );
759
760        // Should fail with missing parameter error
761        assert!(result.is_err());
762        let err = result.unwrap_err();
763        assert!(
764            err.to_string().contains("Missing parameter"),
765            "Expected 'Missing parameter' error, got: {}",
766            err
767        );
768    }
769}