Skip to main content

grafeo_engine/query/
processor.rs

1//! Query processor that orchestrates the query pipeline.
2//!
3//! The `QueryProcessor` is the central component that executes queries through
4//! the full pipeline: Parse → Bind → Optimize → Plan → Execute.
5//!
6//! It supports multiple query languages (GQL, Cypher, Gremlin, GraphQL) for LPG
7//! and SPARQL for RDF (when the `rdf` feature is enabled).
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25/// Supported query languages.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28    /// GQL (ISO/IEC 39075:2024) - default for LPG
29    #[cfg(feature = "gql")]
30    Gql,
31    /// openCypher 9.0
32    #[cfg(feature = "cypher")]
33    Cypher,
34    /// Apache TinkerPop Gremlin
35    #[cfg(feature = "gremlin")]
36    Gremlin,
37    /// GraphQL for LPG
38    #[cfg(feature = "graphql")]
39    GraphQL,
40    /// SPARQL 1.1 for RDF
41    #[cfg(feature = "sparql")]
42    Sparql,
43    /// GraphQL for RDF
44    #[cfg(all(feature = "graphql", feature = "rdf"))]
45    GraphQLRdf,
46}
47
48impl QueryLanguage {
49    /// Returns whether this language targets LPG (vs RDF).
50    #[must_use]
51    pub const fn is_lpg(&self) -> bool {
52        match self {
53            #[cfg(feature = "gql")]
54            Self::Gql => true,
55            #[cfg(feature = "cypher")]
56            Self::Cypher => true,
57            #[cfg(feature = "gremlin")]
58            Self::Gremlin => true,
59            #[cfg(feature = "graphql")]
60            Self::GraphQL => true,
61            #[cfg(feature = "sparql")]
62            Self::Sparql => false,
63            #[cfg(all(feature = "graphql", feature = "rdf"))]
64            Self::GraphQLRdf => false,
65        }
66    }
67}
68
69/// Query parameters for prepared statements.
70pub type QueryParams = HashMap<String, Value>;
71
72/// Processes queries through the full pipeline.
73///
74/// The processor holds references to the stores and provides a unified
75/// interface for executing queries in any supported language.
76///
77/// # Example
78///
79/// ```ignore
80/// use grafeo_engine::query::processor::{QueryProcessor, QueryLanguage};
81///
82/// let processor = QueryProcessor::for_lpg(store);
83/// let result = processor.process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)?;
84/// ```
85pub struct QueryProcessor {
86    /// LPG store for property graph queries.
87    lpg_store: Arc<LpgStore>,
88    /// Transaction manager for MVCC operations.
89    tx_manager: Arc<TransactionManager>,
90    /// Catalog for schema and index metadata.
91    catalog: Arc<Catalog>,
92    /// Query optimizer.
93    optimizer: Optimizer,
94    /// Current transaction context (if any).
95    tx_context: Option<(EpochId, TxId)>,
96    /// RDF store for triple pattern queries (optional).
97    #[cfg(feature = "rdf")]
98    rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
99}
100
101impl QueryProcessor {
102    /// Creates a new query processor for LPG queries.
103    #[must_use]
104    pub fn for_lpg(store: Arc<LpgStore>) -> Self {
105        let optimizer = Optimizer::from_store(&store);
106        Self {
107            lpg_store: store,
108            tx_manager: Arc::new(TransactionManager::new()),
109            catalog: Arc::new(Catalog::new()),
110            optimizer,
111            tx_context: None,
112            #[cfg(feature = "rdf")]
113            rdf_store: None,
114        }
115    }
116
117    /// Creates a new query processor with a transaction manager.
118    #[must_use]
119    pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
120        let optimizer = Optimizer::from_store(&store);
121        Self {
122            lpg_store: store,
123            tx_manager,
124            catalog: Arc::new(Catalog::new()),
125            optimizer,
126            tx_context: None,
127            #[cfg(feature = "rdf")]
128            rdf_store: None,
129        }
130    }
131
132    /// Creates a new query processor with both LPG and RDF stores.
133    #[cfg(feature = "rdf")]
134    #[must_use]
135    pub fn with_rdf(
136        lpg_store: Arc<LpgStore>,
137        rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
138    ) -> Self {
139        let optimizer = Optimizer::from_store(&lpg_store);
140        Self {
141            lpg_store,
142            tx_manager: Arc::new(TransactionManager::new()),
143            catalog: Arc::new(Catalog::new()),
144            optimizer,
145            tx_context: None,
146            rdf_store: Some(rdf_store),
147        }
148    }
149
150    /// Sets the transaction context for MVCC visibility.
151    ///
152    /// This should be called when the processor is used within a transaction.
153    #[must_use]
154    pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
155        self.tx_context = Some((viewing_epoch, tx_id));
156        self
157    }
158
159    /// Sets a custom catalog.
160    #[must_use]
161    pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
162        self.catalog = catalog;
163        self
164    }
165
166    /// Sets a custom optimizer.
167    #[must_use]
168    pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
169        self.optimizer = optimizer;
170        self
171    }
172
173    /// Processes a query string and returns results.
174    ///
175    /// Pipeline:
176    /// 1. Parse (language-specific parser → AST)
177    /// 2. Translate (AST → LogicalPlan)
178    /// 3. Bind (semantic validation)
179    /// 4. Optimize (filter pushdown, join reorder, etc.)
180    /// 5. Plan (logical → physical operators)
181    /// 6. Execute (run operators, collect results)
182    ///
183    /// # Arguments
184    ///
185    /// * `query` - The query string
186    /// * `language` - Which query language to use
187    /// * `params` - Optional query parameters for prepared statements
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if any stage of the pipeline fails.
192    pub fn process(
193        &self,
194        query: &str,
195        language: QueryLanguage,
196        params: Option<&QueryParams>,
197    ) -> Result<QueryResult> {
198        if language.is_lpg() {
199            self.process_lpg(query, language, params)
200        } else {
201            #[cfg(feature = "rdf")]
202            {
203                self.process_rdf(query, language, params)
204            }
205            #[cfg(not(feature = "rdf"))]
206            {
207                Err(Error::Internal(
208                    "RDF support not enabled. Compile with --features rdf".to_string(),
209                ))
210            }
211        }
212    }
213
214    /// Processes an LPG query (GQL, Cypher, Gremlin, GraphQL).
215    fn process_lpg(
216        &self,
217        query: &str,
218        language: QueryLanguage,
219        params: Option<&QueryParams>,
220    ) -> Result<QueryResult> {
221        let start_time = std::time::Instant::now();
222
223        // 1. Parse and translate to logical plan
224        let mut logical_plan = self.translate_lpg(query, language)?;
225
226        // 2. Substitute parameters if provided
227        if let Some(params) = params {
228            substitute_params(&mut logical_plan, params)?;
229        }
230
231        // 3. Semantic validation
232        let mut binder = Binder::new();
233        let _binding_context = binder.bind(&logical_plan)?;
234
235        // 4. Optimize the plan
236        let optimized_plan = self.optimizer.optimize(logical_plan)?;
237
238        // 5. Convert to physical plan with transaction context
239        let planner = if let Some((epoch, tx_id)) = self.tx_context {
240            Planner::with_context(
241                Arc::clone(&self.lpg_store),
242                Arc::clone(&self.tx_manager),
243                Some(tx_id),
244                epoch,
245            )
246        } else {
247            Planner::with_context(
248                Arc::clone(&self.lpg_store),
249                Arc::clone(&self.tx_manager),
250                None,
251                self.tx_manager.current_epoch(),
252            )
253        };
254        let mut physical_plan = planner.plan(&optimized_plan)?;
255
256        // 6. Execute and collect results
257        let executor = Executor::with_columns(physical_plan.columns.clone());
258        let mut result = executor.execute(physical_plan.operator.as_mut())?;
259
260        // Add execution metrics
261        let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
262        let rows_scanned = result.rows.len() as u64; // Approximate: rows returned
263        result.execution_time_ms = Some(elapsed_ms);
264        result.rows_scanned = Some(rows_scanned);
265
266        Ok(result)
267    }
268
269    /// Translates an LPG query to a logical plan.
270    fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
271        match language {
272            #[cfg(feature = "gql")]
273            QueryLanguage::Gql => {
274                use crate::query::gql_translator;
275                gql_translator::translate(query)
276            }
277            #[cfg(feature = "cypher")]
278            QueryLanguage::Cypher => {
279                use crate::query::cypher_translator;
280                cypher_translator::translate(query)
281            }
282            #[cfg(feature = "gremlin")]
283            QueryLanguage::Gremlin => {
284                use crate::query::gremlin_translator;
285                gremlin_translator::translate(query)
286            }
287            #[cfg(feature = "graphql")]
288            QueryLanguage::GraphQL => {
289                use crate::query::graphql_translator;
290                graphql_translator::translate(query)
291            }
292            #[allow(unreachable_patterns)]
293            _ => Err(Error::Internal(format!(
294                "Language {:?} is not an LPG language",
295                language
296            ))),
297        }
298    }
299
300    /// Processes an RDF query (SPARQL, GraphQL-RDF).
301    #[cfg(feature = "rdf")]
302    fn process_rdf(
303        &self,
304        query: &str,
305        language: QueryLanguage,
306        _params: Option<&QueryParams>,
307    ) -> Result<QueryResult> {
308        use crate::query::planner_rdf::RdfPlanner;
309
310        let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
311            Error::Internal("RDF store not configured for this processor".to_string())
312        })?;
313
314        // 1. Parse and translate to logical plan
315        let logical_plan = self.translate_rdf(query, language)?;
316
317        // 2. Semantic validation
318        let mut binder = Binder::new();
319        let _binding_context = binder.bind(&logical_plan)?;
320
321        // 3. Optimize the plan
322        let optimized_plan = self.optimizer.optimize(logical_plan)?;
323
324        // 4. Convert to physical plan (using RDF planner)
325        let planner = RdfPlanner::new(Arc::clone(rdf_store));
326        let mut physical_plan = planner.plan(&optimized_plan)?;
327
328        // 5. Execute and collect results
329        let executor = Executor::with_columns(physical_plan.columns.clone());
330        executor.execute(physical_plan.operator.as_mut())
331    }
332
333    /// Translates an RDF query to a logical plan.
334    #[cfg(feature = "rdf")]
335    fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
336        match language {
337            #[cfg(feature = "sparql")]
338            QueryLanguage::Sparql => {
339                use crate::query::sparql_translator;
340                sparql_translator::translate(query)
341            }
342            #[cfg(all(feature = "graphql", feature = "rdf"))]
343            QueryLanguage::GraphQLRdf => {
344                use crate::query::graphql_rdf_translator;
345                // Default namespace for GraphQL-RDF queries
346                graphql_rdf_translator::translate(query, "http://example.org/")
347            }
348            _ => Err(Error::Internal(format!(
349                "Language {:?} is not an RDF language",
350                language
351            ))),
352        }
353    }
354
355    /// Returns a reference to the LPG store.
356    #[must_use]
357    pub fn lpg_store(&self) -> &Arc<LpgStore> {
358        &self.lpg_store
359    }
360
361    /// Returns a reference to the catalog.
362    #[must_use]
363    pub fn catalog(&self) -> &Arc<Catalog> {
364        &self.catalog
365    }
366
367    /// Returns a reference to the optimizer.
368    #[must_use]
369    pub fn optimizer(&self) -> &Optimizer {
370        &self.optimizer
371    }
372
373    /// Returns a reference to the RDF store (if configured).
374    #[cfg(feature = "rdf")]
375    #[must_use]
376    pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
377        self.rdf_store.as_ref()
378    }
379}
380
381// Legacy compatibility: Keep the old API working
382impl QueryProcessor {
383    /// Creates a new query processor (legacy API).
384    ///
385    /// This creates a processor with an empty in-memory store.
386    /// Prefer using [`QueryProcessor::for_lpg`] with an explicit store.
387    #[must_use]
388    #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
389    pub fn new() -> Self {
390        Self::for_lpg(Arc::new(LpgStore::new()))
391    }
392
393    /// Processes a query using default GQL language (legacy API).
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if the query fails.
398    #[cfg(feature = "gql")]
399    #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
400    pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
401        self.process(query, QueryLanguage::Gql, None)
402    }
403
404    /// Returns a reference to the transaction manager.
405    #[must_use]
406    pub fn tx_manager(&self) -> &Arc<TransactionManager> {
407        &self.tx_manager
408    }
409}
410
411impl Default for QueryProcessor {
412    fn default() -> Self {
413        Self::for_lpg(Arc::new(LpgStore::new()))
414    }
415}
416
417/// Substitutes parameters in a logical plan with their values.
418fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
419    substitute_in_operator(&mut plan.root, params)
420}
421
422/// Recursively substitutes parameters in an operator.
423fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
424    use crate::query::plan::*;
425
426    match op {
427        LogicalOperator::Filter(filter) => {
428            substitute_in_expression(&mut filter.predicate, params)?;
429            substitute_in_operator(&mut filter.input, params)?;
430        }
431        LogicalOperator::Return(ret) => {
432            for item in &mut ret.items {
433                substitute_in_expression(&mut item.expression, params)?;
434            }
435            substitute_in_operator(&mut ret.input, params)?;
436        }
437        LogicalOperator::Project(proj) => {
438            for p in &mut proj.projections {
439                substitute_in_expression(&mut p.expression, params)?;
440            }
441            substitute_in_operator(&mut proj.input, params)?;
442        }
443        LogicalOperator::NodeScan(scan) => {
444            if let Some(input) = &mut scan.input {
445                substitute_in_operator(input, params)?;
446            }
447        }
448        LogicalOperator::EdgeScan(scan) => {
449            if let Some(input) = &mut scan.input {
450                substitute_in_operator(input, params)?;
451            }
452        }
453        LogicalOperator::Expand(expand) => {
454            substitute_in_operator(&mut expand.input, params)?;
455        }
456        LogicalOperator::Join(join) => {
457            substitute_in_operator(&mut join.left, params)?;
458            substitute_in_operator(&mut join.right, params)?;
459            for cond in &mut join.conditions {
460                substitute_in_expression(&mut cond.left, params)?;
461                substitute_in_expression(&mut cond.right, params)?;
462            }
463        }
464        LogicalOperator::LeftJoin(join) => {
465            substitute_in_operator(&mut join.left, params)?;
466            substitute_in_operator(&mut join.right, params)?;
467            if let Some(cond) = &mut join.condition {
468                substitute_in_expression(cond, params)?;
469            }
470        }
471        LogicalOperator::Aggregate(agg) => {
472            for expr in &mut agg.group_by {
473                substitute_in_expression(expr, params)?;
474            }
475            for agg_expr in &mut agg.aggregates {
476                if let Some(expr) = &mut agg_expr.expression {
477                    substitute_in_expression(expr, params)?;
478                }
479            }
480            substitute_in_operator(&mut agg.input, params)?;
481        }
482        LogicalOperator::Sort(sort) => {
483            for key in &mut sort.keys {
484                substitute_in_expression(&mut key.expression, params)?;
485            }
486            substitute_in_operator(&mut sort.input, params)?;
487        }
488        LogicalOperator::Limit(limit) => {
489            substitute_in_operator(&mut limit.input, params)?;
490        }
491        LogicalOperator::Skip(skip) => {
492            substitute_in_operator(&mut skip.input, params)?;
493        }
494        LogicalOperator::Distinct(distinct) => {
495            substitute_in_operator(&mut distinct.input, params)?;
496        }
497        LogicalOperator::CreateNode(create) => {
498            for (_, expr) in &mut create.properties {
499                substitute_in_expression(expr, params)?;
500            }
501            if let Some(input) = &mut create.input {
502                substitute_in_operator(input, params)?;
503            }
504        }
505        LogicalOperator::CreateEdge(create) => {
506            for (_, expr) in &mut create.properties {
507                substitute_in_expression(expr, params)?;
508            }
509            substitute_in_operator(&mut create.input, params)?;
510        }
511        LogicalOperator::DeleteNode(delete) => {
512            substitute_in_operator(&mut delete.input, params)?;
513        }
514        LogicalOperator::DeleteEdge(delete) => {
515            substitute_in_operator(&mut delete.input, params)?;
516        }
517        LogicalOperator::SetProperty(set) => {
518            for (_, expr) in &mut set.properties {
519                substitute_in_expression(expr, params)?;
520            }
521            substitute_in_operator(&mut set.input, params)?;
522        }
523        LogicalOperator::Union(union) => {
524            for input in &mut union.inputs {
525                substitute_in_operator(input, params)?;
526            }
527        }
528        LogicalOperator::AntiJoin(anti) => {
529            substitute_in_operator(&mut anti.left, params)?;
530            substitute_in_operator(&mut anti.right, params)?;
531        }
532        LogicalOperator::Bind(bind) => {
533            substitute_in_expression(&mut bind.expression, params)?;
534            substitute_in_operator(&mut bind.input, params)?;
535        }
536        LogicalOperator::TripleScan(scan) => {
537            if let Some(input) = &mut scan.input {
538                substitute_in_operator(input, params)?;
539            }
540        }
541        LogicalOperator::Unwind(unwind) => {
542            substitute_in_expression(&mut unwind.expression, params)?;
543            substitute_in_operator(&mut unwind.input, params)?;
544        }
545        LogicalOperator::Merge(merge) => {
546            for (_, expr) in &mut merge.match_properties {
547                substitute_in_expression(expr, params)?;
548            }
549            for (_, expr) in &mut merge.on_create {
550                substitute_in_expression(expr, params)?;
551            }
552            for (_, expr) in &mut merge.on_match {
553                substitute_in_expression(expr, params)?;
554            }
555            substitute_in_operator(&mut merge.input, params)?;
556        }
557        LogicalOperator::AddLabel(add_label) => {
558            substitute_in_operator(&mut add_label.input, params)?;
559        }
560        LogicalOperator::RemoveLabel(remove_label) => {
561            substitute_in_operator(&mut remove_label.input, params)?;
562        }
563        LogicalOperator::ShortestPath(sp) => {
564            substitute_in_operator(&mut sp.input, params)?;
565        }
566        // SPARQL Update operators
567        LogicalOperator::InsertTriple(insert) => {
568            if let Some(ref mut input) = insert.input {
569                substitute_in_operator(input, params)?;
570            }
571        }
572        LogicalOperator::DeleteTriple(delete) => {
573            if let Some(ref mut input) = delete.input {
574                substitute_in_operator(input, params)?;
575            }
576        }
577        LogicalOperator::Modify(modify) => {
578            substitute_in_operator(&mut modify.where_clause, params)?;
579        }
580        LogicalOperator::ClearGraph(_)
581        | LogicalOperator::CreateGraph(_)
582        | LogicalOperator::DropGraph(_)
583        | LogicalOperator::LoadGraph(_)
584        | LogicalOperator::CopyGraph(_)
585        | LogicalOperator::MoveGraph(_)
586        | LogicalOperator::AddGraph(_) => {}
587        LogicalOperator::Empty => {}
588        LogicalOperator::VectorScan(scan) => {
589            substitute_in_expression(&mut scan.query_vector, params)?;
590            if let Some(ref mut input) = scan.input {
591                substitute_in_operator(input, params)?;
592            }
593        }
594        LogicalOperator::VectorJoin(join) => {
595            substitute_in_expression(&mut join.query_vector, params)?;
596            substitute_in_operator(&mut join.input, params)?;
597        }
598    }
599    Ok(())
600}
601
602/// Substitutes parameters in an expression with their values.
603fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
604    use crate::query::plan::LogicalExpression;
605
606    match expr {
607        LogicalExpression::Parameter(name) => {
608            if let Some(value) = params.get(name) {
609                *expr = LogicalExpression::Literal(value.clone());
610            } else {
611                return Err(Error::Internal(format!("Missing parameter: ${}", name)));
612            }
613        }
614        LogicalExpression::Binary { left, right, .. } => {
615            substitute_in_expression(left, params)?;
616            substitute_in_expression(right, params)?;
617        }
618        LogicalExpression::Unary { operand, .. } => {
619            substitute_in_expression(operand, params)?;
620        }
621        LogicalExpression::FunctionCall { args, .. } => {
622            for arg in args {
623                substitute_in_expression(arg, params)?;
624            }
625        }
626        LogicalExpression::List(items) => {
627            for item in items {
628                substitute_in_expression(item, params)?;
629            }
630        }
631        LogicalExpression::Map(pairs) => {
632            for (_, value) in pairs {
633                substitute_in_expression(value, params)?;
634            }
635        }
636        LogicalExpression::IndexAccess { base, index } => {
637            substitute_in_expression(base, params)?;
638            substitute_in_expression(index, params)?;
639        }
640        LogicalExpression::SliceAccess { base, start, end } => {
641            substitute_in_expression(base, params)?;
642            if let Some(s) = start {
643                substitute_in_expression(s, params)?;
644            }
645            if let Some(e) = end {
646                substitute_in_expression(e, params)?;
647            }
648        }
649        LogicalExpression::Case {
650            operand,
651            when_clauses,
652            else_clause,
653        } => {
654            if let Some(op) = operand {
655                substitute_in_expression(op, params)?;
656            }
657            for (cond, result) in when_clauses {
658                substitute_in_expression(cond, params)?;
659                substitute_in_expression(result, params)?;
660            }
661            if let Some(el) = else_clause {
662                substitute_in_expression(el, params)?;
663            }
664        }
665        LogicalExpression::Property { .. }
666        | LogicalExpression::Variable(_)
667        | LogicalExpression::Literal(_)
668        | LogicalExpression::Labels(_)
669        | LogicalExpression::Type(_)
670        | LogicalExpression::Id(_) => {}
671        LogicalExpression::ListComprehension {
672            list_expr,
673            filter_expr,
674            map_expr,
675            ..
676        } => {
677            substitute_in_expression(list_expr, params)?;
678            if let Some(filter) = filter_expr {
679                substitute_in_expression(filter, params)?;
680            }
681            substitute_in_expression(map_expr, params)?;
682        }
683        LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
684            // Subqueries would need recursive parameter substitution
685        }
686    }
687    Ok(())
688}
689
690#[cfg(test)]
691mod tests {
692    use super::*;
693
694    #[test]
695    fn test_query_language_is_lpg() {
696        #[cfg(feature = "gql")]
697        assert!(QueryLanguage::Gql.is_lpg());
698        #[cfg(feature = "cypher")]
699        assert!(QueryLanguage::Cypher.is_lpg());
700        #[cfg(feature = "sparql")]
701        assert!(!QueryLanguage::Sparql.is_lpg());
702    }
703
704    #[test]
705    fn test_processor_creation() {
706        let store = Arc::new(LpgStore::new());
707        let processor = QueryProcessor::for_lpg(store);
708        assert!(processor.lpg_store().node_count() == 0);
709    }
710
711    #[cfg(feature = "gql")]
712    #[test]
713    fn test_process_simple_gql() {
714        let store = Arc::new(LpgStore::new());
715        store.create_node(&["Person"]);
716        store.create_node(&["Person"]);
717
718        let processor = QueryProcessor::for_lpg(store);
719        let result = processor
720            .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
721            .unwrap();
722
723        assert_eq!(result.row_count(), 2);
724        assert_eq!(result.columns[0], "n");
725    }
726
727    #[cfg(feature = "cypher")]
728    #[test]
729    fn test_process_simple_cypher() {
730        let store = Arc::new(LpgStore::new());
731        store.create_node(&["Person"]);
732
733        let processor = QueryProcessor::for_lpg(store);
734        let result = processor
735            .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
736            .unwrap();
737
738        assert_eq!(result.row_count(), 1);
739    }
740
741    #[cfg(feature = "gql")]
742    #[test]
743    fn test_process_with_params() {
744        let store = Arc::new(LpgStore::new());
745        store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
746        store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
747        store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
748
749        let processor = QueryProcessor::for_lpg(store);
750
751        // Query with parameter
752        let mut params = HashMap::new();
753        params.insert("min_age".to_string(), Value::Int64(30));
754
755        let result = processor
756            .process(
757                "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
758                QueryLanguage::Gql,
759                Some(&params),
760            )
761            .unwrap();
762
763        // Should return 2 people (ages 35 and 45)
764        assert_eq!(result.row_count(), 2);
765    }
766
767    #[cfg(feature = "gql")]
768    #[test]
769    fn test_missing_param_error() {
770        let store = Arc::new(LpgStore::new());
771        store.create_node(&["Person"]);
772
773        let processor = QueryProcessor::for_lpg(store);
774
775        // Query with parameter but empty params map (missing the required param)
776        let params: HashMap<String, Value> = HashMap::new();
777        let result = processor.process(
778            "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
779            QueryLanguage::Gql,
780            Some(&params),
781        );
782
783        // Should fail with missing parameter error
784        assert!(result.is_err());
785        let err = result.unwrap_err();
786        assert!(
787            err.to_string().contains("Missing parameter"),
788            "Expected 'Missing parameter' error, got: {}",
789            err
790        );
791    }
792
793    #[cfg(feature = "gql")]
794    #[test]
795    fn test_params_in_filter_with_property() {
796        // Tests parameter substitution in WHERE clause with property comparison
797        let store = Arc::new(LpgStore::new());
798        store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
799        store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
800
801        let processor = QueryProcessor::for_lpg(store);
802
803        let mut params = HashMap::new();
804        params.insert("threshold".to_string(), Value::Int64(15));
805
806        let result = processor
807            .process(
808                "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
809                QueryLanguage::Gql,
810                Some(&params),
811            )
812            .unwrap();
813
814        // Only value=20 matches > 15
815        assert_eq!(result.row_count(), 1);
816        let row = &result.rows[0];
817        assert_eq!(row[0], Value::Int64(20));
818    }
819
820    #[cfg(feature = "gql")]
821    #[test]
822    fn test_params_in_multiple_where_conditions() {
823        // Tests multiple parameters in WHERE clause with AND
824        let store = Arc::new(LpgStore::new());
825        store.create_node_with_props(
826            &["Person"],
827            [("age", Value::Int64(25)), ("score", Value::Int64(80))],
828        );
829        store.create_node_with_props(
830            &["Person"],
831            [("age", Value::Int64(35)), ("score", Value::Int64(90))],
832        );
833        store.create_node_with_props(
834            &["Person"],
835            [("age", Value::Int64(45)), ("score", Value::Int64(70))],
836        );
837
838        let processor = QueryProcessor::for_lpg(store);
839
840        let mut params = HashMap::new();
841        params.insert("min_age".to_string(), Value::Int64(30));
842        params.insert("min_score".to_string(), Value::Int64(75));
843
844        let result = processor
845            .process(
846                "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
847                QueryLanguage::Gql,
848                Some(&params),
849            )
850            .unwrap();
851
852        // Only the person with age=35, score=90 matches both conditions
853        assert_eq!(result.row_count(), 1);
854    }
855
856    #[cfg(feature = "gql")]
857    #[test]
858    fn test_params_with_in_list() {
859        // Tests parameter as a value checked against IN list
860        let store = Arc::new(LpgStore::new());
861        store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
862        store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
863        store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
864
865        let processor = QueryProcessor::for_lpg(store);
866
867        // Check if a parameter value matches any of the statuses
868        let mut params = HashMap::new();
869        params.insert("target".to_string(), Value::String("active".into()));
870
871        let result = processor
872            .process(
873                "MATCH (n:Item) WHERE n.status = $target RETURN n",
874                QueryLanguage::Gql,
875                Some(&params),
876            )
877            .unwrap();
878
879        assert_eq!(result.row_count(), 1);
880    }
881
882    #[cfg(feature = "gql")]
883    #[test]
884    fn test_params_same_type_comparison() {
885        // Tests that same-type parameter comparisons work correctly
886        let store = Arc::new(LpgStore::new());
887        store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
888        store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
889
890        let processor = QueryProcessor::for_lpg(store);
891
892        // Compare int property with int parameter
893        let mut params = HashMap::new();
894        params.insert("threshold".to_string(), Value::Int64(75));
895
896        let result = processor
897            .process(
898                "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
899                QueryLanguage::Gql,
900                Some(&params),
901            )
902            .unwrap();
903
904        // Only value=100 matches > 75
905        assert_eq!(result.row_count(), 1);
906    }
907
908    #[cfg(feature = "gql")]
909    #[test]
910    fn test_process_empty_result_has_columns() {
911        // Tests that empty results still have correct column names
912        let store = Arc::new(LpgStore::new());
913        // Don't create any nodes
914
915        let processor = QueryProcessor::for_lpg(store);
916        let result = processor
917            .process(
918                "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
919                QueryLanguage::Gql,
920                None,
921            )
922            .unwrap();
923
924        assert_eq!(result.row_count(), 0);
925        assert_eq!(result.columns.len(), 2);
926        assert_eq!(result.columns[0], "name");
927        assert_eq!(result.columns[1], "age");
928    }
929
930    #[cfg(feature = "gql")]
931    #[test]
932    fn test_params_string_equality() {
933        // Tests string parameter equality comparison
934        let store = Arc::new(LpgStore::new());
935        store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
936        store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
937        store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
938
939        let processor = QueryProcessor::for_lpg(store);
940
941        let mut params = HashMap::new();
942        params.insert("target".to_string(), Value::String("beta".into()));
943
944        let result = processor
945            .process(
946                "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
947                QueryLanguage::Gql,
948                Some(&params),
949            )
950            .unwrap();
951
952        assert_eq!(result.row_count(), 1);
953        assert_eq!(result.rows[0][0], Value::String("beta".into()));
954    }
955}