Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SPARQL 1.1 for RDF
41    #[cfg(feature = "sparql")]
42    Sparql,
43    /// GraphQL for RDF
44    #[cfg(all(feature = "graphql", feature = "rdf"))]
45    GraphQLRdf,
46}
47
48impl QueryLanguage {
49    /// Returns whether this language targets LPG (vs RDF).
50    #[must_use]
51    pub const fn is_lpg(&self) -> bool {
52        match self {
53            #[cfg(feature = "gql")]
54            Self::Gql => true,
55            #[cfg(feature = "cypher")]
56            Self::Cypher => true,
57            #[cfg(feature = "gremlin")]
58            Self::Gremlin => true,
59            #[cfg(feature = "graphql")]
60            Self::GraphQL => true,
61            #[cfg(feature = "sparql")]
62            Self::Sparql => false,
63            #[cfg(all(feature = "graphql", feature = "rdf"))]
64            Self::GraphQLRdf => false,
65        }
66    }
67}
68
69/// Query parameters for prepared statements.
70pub type QueryParams = HashMap<String, Value>;
71
72/// Processes queries through the full pipeline.
73///
74/// The processor holds references to the stores and provides a unified
75/// interface for executing queries in any supported language.
76///
77/// # Example
78///
79/// ```ignore
80/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
81///
82/// let processor = QueryProcessor::for_lpg(store);
83/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
84/// ```
85pub struct QueryProcessor {
86    /// LPG store for property graph queries.
87    lpg_store: Arc<LpgStore>,
88    /// Transaction manager for MVCC operations.
89    tx_manager: Arc<TransactionManager>,
90    /// Catalog for schema and index metadata.
91    catalog: Arc<Catalog>,
92    /// Query optimizer.
93    optimizer: Optimizer,
94    /// Current transaction context (if any).
95    tx_context: Option<(EpochId, TxId)>,
96    /// RDF store for triple pattern queries (optional).
97    #[cfg(feature = "rdf")]
98    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
99}
100
101impl QueryProcessor {
102    /// Creates a new query processor for LPG queries.
103    #[must_use]
104    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
105        Self {
106            lpg_store: store,
107            tx_manager: Arc::new(TransactionManager::new()),
108            catalog: Arc::new(Catalog::new()),
109            optimizer: Optimizer::new(),
110            tx_context: None,
111            #[cfg(feature = "rdf")]
112            rdf_store: None,
113        }
114    }
115
116    /// Creates a new query processor with a transaction manager.
117    #[must_use]
118    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
119        Self {
120            lpg_store: store,
121            tx_manager,
122            catalog: Arc::new(Catalog::new()),
123            optimizer: Optimizer::new(),
124            tx_context: None,
125            #[cfg(feature = "rdf")]
126            rdf_store: None,
127        }
128    }
129
130    /// Creates a new query processor with both LPG and RDF stores.
131    #[cfg(feature = "rdf")]
132    #[must_use]
133    pub fn with_rdf(
134        lpg_store: Arc<LpgStore>,
135        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
136    ) -> Self {
137        Self {
138            lpg_store,
139            tx_manager: Arc::new(TransactionManager::new()),
140            catalog: Arc::new(Catalog::new()),
141            optimizer: Optimizer::new(),
142            tx_context: None,
143            rdf_store: Some(rdf_store),
144        }
145    }
146
147    /// Sets the transaction context for MVCC visibility.
148    ///
149    /// This should be called when the processor is used within a transaction.
150    #[must_use]
151    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
152        self.tx_context = Some((viewing_epoch, tx_id));
153        self
154    }
155
156    /// Sets a custom catalog.
157    #[must_use]
158    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
159        self.catalog = catalog;
160        self
161    }
162
163    /// Sets a custom optimizer.
164    #[must_use]
165    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
166        self.optimizer = optimizer;
167        self
168    }
169
170    /// Processes a query string and returns results.
171    ///
172    /// Pipeline:
173    /// 1. Parse (language-specific parser → AST)
174    /// 2. Translate (AST → LogicalPlan)
175    /// 3. Bind (semantic validation)
176    /// 4. Optimize (filter pushdown, join reorder, etc.)
177    /// 5. Plan (logical → physical operators)
178    /// 6. Execute (run operators, collect results)
179    ///
180    /// # Arguments
181    ///
182    /// * `query` - The query string
183    /// * `language` - Which query language to use
184    /// * `params` - Optional query parameters for prepared statements
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if any stage of the pipeline fails.
189    pub fn process(
190        &self,
191        query: &str,
192        language: QueryLanguage,
193        params: Option<&QueryParams>,
194    ) -> Result<QueryResult> {
195        if language.is_lpg() {
196            self.process_lpg(query, language, params)
197        } else {
198            #[cfg(feature = "rdf")]
199            {
200                self.process_rdf(query, language, params)
201            }
202            #[cfg(not(feature = "rdf"))]
203            {
204                Err(Error::Internal(
205                    "RDF support not enabled. Compile with --features rdf".to_string(),
206                ))
207            }
208        }
209    }
210
211    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
212    fn process_lpg(
213        &self,
214        query: &str,
215        language: QueryLanguage,
216        params: Option<&QueryParams>,
217    ) -> Result<QueryResult> {
218        let start_time = std::time::Instant::now();
219
220        // 1. Parse and translate to logical plan
221        let mut logical_plan = self.translate_lpg(query, language)?;
222
223        // 2. Substitute parameters if provided
224        if let Some(params) = params {
225            substitute_params(&mut logical_plan, params)?;
226        }
227
228        // 3. Semantic validation
229        let mut binder = Binder::new();
230        let _binding_context = binder.bind(&logical_plan)?;
231
232        // 4. Optimize the plan
233        let optimized_plan = self.optimizer.optimize(logical_plan)?;
234
235        // 5. Convert to physical plan with transaction context
236        let planner = if let Some((epoch, tx_id)) = self.tx_context {
237            Planner::with_context(
238                Arc::clone(&self.lpg_store),
239                Arc::clone(&self.tx_manager),
240                Some(tx_id),
241                epoch,
242            )
243        } else {
244            Planner::with_context(
245                Arc::clone(&self.lpg_store),
246                Arc::clone(&self.tx_manager),
247                None,
248                self.tx_manager.current_epoch(),
249            )
250        };
251        let mut physical_plan = planner.plan(&optimized_plan)?;
252
253        // 6. Execute and collect results
254        let executor = Executor::with_columns(physical_plan.columns.clone());
255        let mut result = executor.execute(physical_plan.operator.as_mut())?;
256
257        // Add execution metrics
258        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
259        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
260        result.execution_time_ms = Some(elapsed_ms);
261        result.rows_scanned = Some(rows_scanned);
262
263        Ok(result)
264    }
265
266    /// Translates an LPG query to a logical plan.
267    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
268        match language {
269            #[cfg(feature = "gql")]
270            QueryLanguage::Gql => {
271                use crate::query::gql_translator;
272                gql_translator::translate(query)
273            }
274            #[cfg(feature = "cypher")]
275            QueryLanguage::Cypher => {
276                use crate::query::cypher_translator;
277                cypher_translator::translate(query)
278            }
279            #[cfg(feature = "gremlin")]
280            QueryLanguage::Gremlin => {
281                use crate::query::gremlin_translator;
282                gremlin_translator::translate(query)
283            }
284            #[cfg(feature = "graphql")]
285            QueryLanguage::GraphQL => {
286                use crate::query::graphql_translator;
287                graphql_translator::translate(query)
288            }
289            #[allow(unreachable_patterns)]
290            _ => Err(Error::Internal(format!(
291                "Language {:?} is not an LPG language",
292                language
293            ))),
294        }
295    }
296
297    /// Processes an RDF query (SPARQL, GraphQL-RDF).
298    #[cfg(feature = "rdf")]
299    fn process_rdf(
300        &self,
301        query: &str,
302        language: QueryLanguage,
303        _params: Option<&QueryParams>,
304    ) -> Result<QueryResult> {
305        use crate::query::planner_rdf::RdfPlanner;
306
307        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
308            Error::Internal("RDF store not configured for this processor".to_string())
309        })?;
310
311        // 1. Parse and translate to logical plan
312        let logical_plan = self.translate_rdf(query, language)?;
313
314        // 2. Semantic validation
315        let mut binder = Binder::new();
316        let _binding_context = binder.bind(&logical_plan)?;
317
318        // 3. Optimize the plan
319        let optimized_plan = self.optimizer.optimize(logical_plan)?;
320
321        // 4. Convert to physical plan (using RDF planner)
322        let planner = RdfPlanner::new(Arc::clone(rdf_store));
323        let mut physical_plan = planner.plan(&optimized_plan)?;
324
325        // 5. Execute and collect results
326        let executor = Executor::with_columns(physical_plan.columns.clone());
327        executor.execute(physical_plan.operator.as_mut())
328    }
329
330    /// Translates an RDF query to a logical plan.
331    #[cfg(feature = "rdf")]
332    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
333        match language {
334            #[cfg(feature = "sparql")]
335            QueryLanguage::Sparql => {
336                use crate::query::sparql_translator;
337                sparql_translator::translate(query)
338            }
339            #[cfg(all(feature = "graphql", feature = "rdf"))]
340            QueryLanguage::GraphQLRdf => {
341                use crate::query::graphql_rdf_translator;
342                // Default namespace for GraphQL-RDF queries
343                graphql_rdf_translator::translate(query, "http://example.org/")
344            }
345            _ => Err(Error::Internal(format!(
346                "Language {:?} is not an RDF language",
347                language
348            ))),
349        }
350    }
351
352    /// Returns a reference to the LPG store.
353    #[must_use]
354    pub fn lpg_store(&self) -> &Arc<LpgStore> {
355        &self.lpg_store
356    }
357
358    /// Returns a reference to the catalog.
359    #[must_use]
360    pub fn catalog(&self) -> &Arc<Catalog> {
361        &self.catalog
362    }
363
364    /// Returns a reference to the optimizer.
365    #[must_use]
366    pub fn optimizer(&self) -> &Optimizer {
367        &self.optimizer
368    }
369
370    /// Returns a reference to the RDF store (if configured).
371    #[cfg(feature = "rdf")]
372    #[must_use]
373    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
374        self.rdf_store.as_ref()
375    }
376}
377
378// Legacy compatibility: Keep the old API working
379impl QueryProcessor {
380    /// Creates a new query processor (legacy API).
381    ///
382    /// This creates a processor with an empty in-memory store.
383    /// Prefer using [`QueryProcessor::for_lpg`] with an explicit store.
384    #[must_use]
385    #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
386    pub fn new() -> Self {
387        Self::for_lpg(Arc::new(LpgStore::new()))
388    }
389
390    /// Processes a query using default GQL language (legacy API).
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the query fails.
395    #[cfg(feature = "gql")]
396    #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
397    pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
398        self.process(query, QueryLanguage::Gql, None)
399    }
400
401    /// Returns a reference to the transaction manager.
402    #[must_use]
403    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
404        &self.tx_manager
405    }
406}
407
408impl Default for QueryProcessor {
409    fn default() -> Self {
410        Self::for_lpg(Arc::new(LpgStore::new()))
411    }
412}
413
414/// Substitutes parameters in a logical plan with their values.
415fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
416    substitute_in_operator(&mut plan.root, params)
417}
418
419/// Recursively substitutes parameters in an operator.
420fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
421    use crate::query::plan::*;
422
423    match op {
424        LogicalOperator::Filter(filter) => {
425            substitute_in_expression(&mut filter.predicate, params)?;
426            substitute_in_operator(&mut filter.input, params)?;
427        }
428        LogicalOperator::Return(ret) => {
429            for item in &mut ret.items {
430                substitute_in_expression(&mut item.expression, params)?;
431            }
432            substitute_in_operator(&mut ret.input, params)?;
433        }
434        LogicalOperator::Project(proj) => {
435            for p in &mut proj.projections {
436                substitute_in_expression(&mut p.expression, params)?;
437            }
438            substitute_in_operator(&mut proj.input, params)?;
439        }
440        LogicalOperator::NodeScan(scan) => {
441            if let Some(input) = &mut scan.input {
442                substitute_in_operator(input, params)?;
443            }
444        }
445        LogicalOperator::EdgeScan(scan) => {
446            if let Some(input) = &mut scan.input {
447                substitute_in_operator(input, params)?;
448            }
449        }
450        LogicalOperator::Expand(expand) => {
451            substitute_in_operator(&mut expand.input, params)?;
452        }
453        LogicalOperator::Join(join) => {
454            substitute_in_operator(&mut join.left, params)?;
455            substitute_in_operator(&mut join.right, params)?;
456            for cond in &mut join.conditions {
457                substitute_in_expression(&mut cond.left, params)?;
458                substitute_in_expression(&mut cond.right, params)?;
459            }
460        }
461        LogicalOperator::LeftJoin(join) => {
462            substitute_in_operator(&mut join.left, params)?;
463            substitute_in_operator(&mut join.right, params)?;
464            if let Some(cond) = &mut join.condition {
465                substitute_in_expression(cond, params)?;
466            }
467        }
468        LogicalOperator::Aggregate(agg) => {
469            for expr in &mut agg.group_by {
470                substitute_in_expression(expr, params)?;
471            }
472            for agg_expr in &mut agg.aggregates {
473                if let Some(expr) = &mut agg_expr.expression {
474                    substitute_in_expression(expr, params)?;
475                }
476            }
477            substitute_in_operator(&mut agg.input, params)?;
478        }
479        LogicalOperator::Sort(sort) => {
480            for key in &mut sort.keys {
481                substitute_in_expression(&mut key.expression, params)?;
482            }
483            substitute_in_operator(&mut sort.input, params)?;
484        }
485        LogicalOperator::Limit(limit) => {
486            substitute_in_operator(&mut limit.input, params)?;
487        }
488        LogicalOperator::Skip(skip) => {
489            substitute_in_operator(&mut skip.input, params)?;
490        }
491        LogicalOperator::Distinct(distinct) => {
492            substitute_in_operator(&mut distinct.input, params)?;
493        }
494        LogicalOperator::CreateNode(create) => {
495            for (_, expr) in &mut create.properties {
496                substitute_in_expression(expr, params)?;
497            }
498            if let Some(input) = &mut create.input {
499                substitute_in_operator(input, params)?;
500            }
501        }
502        LogicalOperator::CreateEdge(create) => {
503            for (_, expr) in &mut create.properties {
504                substitute_in_expression(expr, params)?;
505            }
506            substitute_in_operator(&mut create.input, params)?;
507        }
508        LogicalOperator::DeleteNode(delete) => {
509            substitute_in_operator(&mut delete.input, params)?;
510        }
511        LogicalOperator::DeleteEdge(delete) => {
512            substitute_in_operator(&mut delete.input, params)?;
513        }
514        LogicalOperator::SetProperty(set) => {
515            for (_, expr) in &mut set.properties {
516                substitute_in_expression(expr, params)?;
517            }
518            substitute_in_operator(&mut set.input, params)?;
519        }
520        LogicalOperator::Union(union) => {
521            for input in &mut union.inputs {
522                substitute_in_operator(input, params)?;
523            }
524        }
525        LogicalOperator::AntiJoin(anti) => {
526            substitute_in_operator(&mut anti.left, params)?;
527            substitute_in_operator(&mut anti.right, params)?;
528        }
529        LogicalOperator::Bind(bind) => {
530            substitute_in_expression(&mut bind.expression, params)?;
531            substitute_in_operator(&mut bind.input, params)?;
532        }
533        LogicalOperator::TripleScan(scan) => {
534            if let Some(input) = &mut scan.input {
535                substitute_in_operator(input, params)?;
536            }
537        }
538        LogicalOperator::Unwind(unwind) => {
539            substitute_in_expression(&mut unwind.expression, params)?;
540            substitute_in_operator(&mut unwind.input, params)?;
541        }
542        LogicalOperator::Merge(merge) => {
543            for (_, expr) in &mut merge.match_properties {
544                substitute_in_expression(expr, params)?;
545            }
546            for (_, expr) in &mut merge.on_create {
547                substitute_in_expression(expr, params)?;
548            }
549            for (_, expr) in &mut merge.on_match {
550                substitute_in_expression(expr, params)?;
551            }
552            substitute_in_operator(&mut merge.input, params)?;
553        }
554        LogicalOperator::AddLabel(add_label) => {
555            substitute_in_operator(&mut add_label.input, params)?;
556        }
557        LogicalOperator::RemoveLabel(remove_label) => {
558            substitute_in_operator(&mut remove_label.input, params)?;
559        }
560        LogicalOperator::ShortestPath(sp) => {
561            substitute_in_operator(&mut sp.input, params)?;
562        }
563        // SPARQL Update operators
564        LogicalOperator::InsertTriple(insert) => {
565            if let Some(ref mut input) = insert.input {
566                substitute_in_operator(input, params)?;
567            }
568        }
569        LogicalOperator::DeleteTriple(delete) => {
570            if let Some(ref mut input) = delete.input {
571                substitute_in_operator(input, params)?;
572            }
573        }
574        LogicalOperator::Modify(modify) => {
575            substitute_in_operator(&mut modify.where_clause, params)?;
576        }
577        LogicalOperator::ClearGraph(_)
578        | LogicalOperator::CreateGraph(_)
579        | LogicalOperator::DropGraph(_)
580        | LogicalOperator::LoadGraph(_)
581        | LogicalOperator::CopyGraph(_)
582        | LogicalOperator::MoveGraph(_)
583        | LogicalOperator::AddGraph(_) => {}
584        LogicalOperator::Empty => {}
585        LogicalOperator::VectorScan(scan) => {
586            substitute_in_expression(&mut scan.query_vector, params)?;
587            if let Some(ref mut input) = scan.input {
588                substitute_in_operator(input, params)?;
589            }
590        }
591        LogicalOperator::VectorJoin(join) => {
592            substitute_in_expression(&mut join.query_vector, params)?;
593            substitute_in_operator(&mut join.input, params)?;
594        }
595    }
596    Ok(())
597}
598
599/// Substitutes parameters in an expression with their values.
600fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
601    use crate::query::plan::LogicalExpression;
602
603    match expr {
604        LogicalExpression::Parameter(name) => {
605            if let Some(value) = params.get(name) {
606                *expr = LogicalExpression::Literal(value.clone());
607            } else {
608                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
609            }
610        }
611        LogicalExpression::Binary { left, right, .. } => {
612            substitute_in_expression(left, params)?;
613            substitute_in_expression(right, params)?;
614        }
615        LogicalExpression::Unary { operand, .. } => {
616            substitute_in_expression(operand, params)?;
617        }
618        LogicalExpression::FunctionCall { args, .. } => {
619            for arg in args {
620                substitute_in_expression(arg, params)?;
621            }
622        }
623        LogicalExpression::List(items) => {
624            for item in items {
625                substitute_in_expression(item, params)?;
626            }
627        }
628        LogicalExpression::Map(pairs) => {
629            for (_, value) in pairs {
630                substitute_in_expression(value, params)?;
631            }
632        }
633        LogicalExpression::IndexAccess { base, index } => {
634            substitute_in_expression(base, params)?;
635            substitute_in_expression(index, params)?;
636        }
637        LogicalExpression::SliceAccess { base, start, end } => {
638            substitute_in_expression(base, params)?;
639            if let Some(s) = start {
640                substitute_in_expression(s, params)?;
641            }
642            if let Some(e) = end {
643                substitute_in_expression(e, params)?;
644            }
645        }
646        LogicalExpression::Case {
647            operand,
648            when_clauses,
649            else_clause,
650        } => {
651            if let Some(op) = operand {
652                substitute_in_expression(op, params)?;
653            }
654            for (cond, result) in when_clauses {
655                substitute_in_expression(cond, params)?;
656                substitute_in_expression(result, params)?;
657            }
658            if let Some(el) = else_clause {
659                substitute_in_expression(el, params)?;
660            }
661        }
662        LogicalExpression::Property { .. }
663        | LogicalExpression::Variable(_)
664        | LogicalExpression::Literal(_)
665        | LogicalExpression::Labels(_)
666        | LogicalExpression::Type(_)
667        | LogicalExpression::Id(_) => {}
668        LogicalExpression::ListComprehension {
669            list_expr,
670            filter_expr,
671            map_expr,
672            ..
673        } => {
674            substitute_in_expression(list_expr, params)?;
675            if let Some(filter) = filter_expr {
676                substitute_in_expression(filter, params)?;
677            }
678            substitute_in_expression(map_expr, params)?;
679        }
680        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
681            // Subqueries would need recursive parameter substitution
682        }
683    }
684    Ok(())
685}
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690
691    #[test]
692    fn test_query_language_is_lpg() {
693        #[cfg(feature = "gql")]
694        assert!(QueryLanguage::Gql.is_lpg());
695        #[cfg(feature = "cypher")]
696        assert!(QueryLanguage::Cypher.is_lpg());
697        #[cfg(feature = "sparql")]
698        assert!(!QueryLanguage::Sparql.is_lpg());
699    }
700
701    #[test]
702    fn test_processor_creation() {
703        let store = Arc::new(LpgStore::new());
704        let processor = QueryProcessor::for_lpg(store);
705        assert!(processor.lpg_store().node_count() == 0);
706    }
707
708    #[cfg(feature = "gql")]
709    #[test]
710    fn test_process_simple_gql() {
711        let store = Arc::new(LpgStore::new());
712        store.create_node(&["Person"]);
713        store.create_node(&["Person"]);
714
715        let processor = QueryProcessor::for_lpg(store);
716        let result = processor
717            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
718            .unwrap();
719
720        assert_eq!(result.row_count(), 2);
721        assert_eq!(result.columns[0], "n");
722    }
723
724    #[cfg(feature = "cypher")]
725    #[test]
726    fn test_process_simple_cypher() {
727        let store = Arc::new(LpgStore::new());
728        store.create_node(&["Person"]);
729
730        let processor = QueryProcessor::for_lpg(store);
731        let result = processor
732            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
733            .unwrap();
734
735        assert_eq!(result.row_count(), 1);
736    }
737
738    #[cfg(feature = "gql")]
739    #[test]
740    fn test_process_with_params() {
741        let store = Arc::new(LpgStore::new());
742        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
743        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
744        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
745
746        let processor = QueryProcessor::for_lpg(store);
747
748        // Query with parameter
749        let mut params = HashMap::new();
750        params.insert("min_age".to_string(), Value::Int64(30));
751
752        let result = processor
753            .process(
754                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
755                QueryLanguage::Gql,
756                Some(&params),
757            )
758            .unwrap();
759
760        // Should return 2 people (ages 35 and 45)
761        assert_eq!(result.row_count(), 2);
762    }
763
764    #[cfg(feature = "gql")]
765    #[test]
766    fn test_missing_param_error() {
767        let store = Arc::new(LpgStore::new());
768        store.create_node(&["Person"]);
769
770        let processor = QueryProcessor::for_lpg(store);
771
772        // Query with parameter but empty params map (missing the required param)
773        let params: HashMap<String, Value> = HashMap::new();
774        let result = processor.process(
775            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
776            QueryLanguage::Gql,
777            Some(&params),
778        );
779
780        // Should fail with missing parameter error
781        assert!(result.is_err());
782        let err = result.unwrap_err();
783        assert!(
784            err.to_string().contains("Missing parameter"),
785            "Expected 'Missing parameter' error, got: {}",
786            err
787        );
788    }
789
790    #[cfg(feature = "gql")]
791    #[test]
792    fn test_params_in_filter_with_property() {
793        // Tests parameter substitution in WHERE clause with property comparison
794        let store = Arc::new(LpgStore::new());
795        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
796        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
797
798        let processor = QueryProcessor::for_lpg(store);
799
800        let mut params = HashMap::new();
801        params.insert("threshold".to_string(), Value::Int64(15));
802
803        let result = processor
804            .process(
805                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
806                QueryLanguage::Gql,
807                Some(&params),
808            )
809            .unwrap();
810
811        // Only value=20 matches > 15
812        assert_eq!(result.row_count(), 1);
813        let row = &result.rows[0];
814        assert_eq!(row[0], Value::Int64(20));
815    }
816
817    #[cfg(feature = "gql")]
818    #[test]
819    fn test_params_in_multiple_where_conditions() {
820        // Tests multiple parameters in WHERE clause with AND
821        let store = Arc::new(LpgStore::new());
822        store.create_node_with_props(
823            &["Person"],
824            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
825        );
826        store.create_node_with_props(
827            &["Person"],
828            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
829        );
830        store.create_node_with_props(
831            &["Person"],
832            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
833        );
834
835        let processor = QueryProcessor::for_lpg(store);
836
837        let mut params = HashMap::new();
838        params.insert("min_age".to_string(), Value::Int64(30));
839        params.insert("min_score".to_string(), Value::Int64(75));
840
841        let result = processor
842            .process(
843                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
844                QueryLanguage::Gql,
845                Some(&params),
846            )
847            .unwrap();
848
849        // Only the person with age=35, score=90 matches both conditions
850        assert_eq!(result.row_count(), 1);
851    }
852
853    #[cfg(feature = "gql")]
854    #[test]
855    fn test_params_with_in_list() {
856        // Tests parameter as a value checked against IN list
857        let store = Arc::new(LpgStore::new());
858        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
859        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
860        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
861
862        let processor = QueryProcessor::for_lpg(store);
863
864        // Check if a parameter value matches any of the statuses
865        let mut params = HashMap::new();
866        params.insert("target".to_string(), Value::String("active".into()));
867
868        let result = processor
869            .process(
870                "MATCH (n:Item) WHERE n.status = $target RETURN n",
871                QueryLanguage::Gql,
872                Some(&params),
873            )
874            .unwrap();
875
876        assert_eq!(result.row_count(), 1);
877    }
878
879    #[cfg(feature = "gql")]
880    #[test]
881    fn test_params_same_type_comparison() {
882        // Tests that same-type parameter comparisons work correctly
883        let store = Arc::new(LpgStore::new());
884        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
885        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
886
887        let processor = QueryProcessor::for_lpg(store);
888
889        // Compare int property with int parameter
890        let mut params = HashMap::new();
891        params.insert("threshold".to_string(), Value::Int64(75));
892
893        let result = processor
894            .process(
895                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
896                QueryLanguage::Gql,
897                Some(&params),
898            )
899            .unwrap();
900
901        // Only value=100 matches > 75
902        assert_eq!(result.row_count(), 1);
903    }
904
905    #[cfg(feature = "gql")]
906    #[test]
907    fn test_process_empty_result_has_columns() {
908        // Tests that empty results still have correct column names
909        let store = Arc::new(LpgStore::new());
910        // Don't create any nodes
911
912        let processor = QueryProcessor::for_lpg(store);
913        let result = processor
914            .process(
915                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
916                QueryLanguage::Gql,
917                None,
918            )
919            .unwrap();
920
921        assert_eq!(result.row_count(), 0);
922        assert_eq!(result.columns.len(), 2);
923        assert_eq!(result.columns[0], "name");
924        assert_eq!(result.columns[1], "age");
925    }
926
927    #[cfg(feature = "gql")]
928    #[test]
929    fn test_params_string_equality() {
930        // Tests string parameter equality comparison
931        let store = Arc::new(LpgStore::new());
932        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
933        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
934        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
935
936        let processor = QueryProcessor::for_lpg(store);
937
938        let mut params = HashMap::new();
939        params.insert("target".to_string(), Value::String("beta".into()));
940
941        let result = processor
942            .process(
943                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
944                QueryLanguage::Gql,
945                Some(&params),
946            )
947            .unwrap();
948
949        assert_eq!(result.row_count(), 1);
950        assert_eq!(result.rows[0][0], Value::String("beta".into()));
951    }
952}