1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 tx_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 tx_context: Option<(EpochId, TxId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 tx_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 tx_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
136 let optimizer = Optimizer::from_store(&store);
137 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
138 Self {
139 lpg_store: store,
140 graph_store,
141 tx_manager,
142 catalog: Arc::new(Catalog::new()),
143 optimizer,
144 tx_context: None,
145 #[cfg(feature = "rdf")]
146 rdf_store: None,
147 }
148 }
149
150 #[must_use]
152 pub fn for_graph_store_with_tx(
153 store: Arc<dyn GraphStoreMut>,
154 tx_manager: Arc<TransactionManager>,
155 ) -> Self {
156 let optimizer = Optimizer::from_graph_store(&*store);
157 Self {
158 lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), graph_store: store,
160 tx_manager,
161 catalog: Arc::new(Catalog::new()),
162 optimizer,
163 tx_context: None,
164 #[cfg(feature = "rdf")]
165 rdf_store: None,
166 }
167 }
168
169 #[cfg(feature = "rdf")]
171 #[must_use]
172 pub fn with_rdf(
173 lpg_store: Arc<LpgStore>,
174 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
175 ) -> Self {
176 let optimizer = Optimizer::from_store(&lpg_store);
177 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
178 Self {
179 lpg_store,
180 graph_store,
181 tx_manager: Arc::new(TransactionManager::new()),
182 catalog: Arc::new(Catalog::new()),
183 optimizer,
184 tx_context: None,
185 rdf_store: Some(rdf_store),
186 }
187 }
188
189 #[must_use]
193 pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
194 self.tx_context = Some((viewing_epoch, tx_id));
195 self
196 }
197
198 #[must_use]
200 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
201 self.catalog = catalog;
202 self
203 }
204
205 #[must_use]
207 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
208 self.optimizer = optimizer;
209 self
210 }
211
212 pub fn process(
232 &self,
233 query: &str,
234 language: QueryLanguage,
235 params: Option<&QueryParams>,
236 ) -> Result<QueryResult> {
237 if language.is_lpg() {
238 self.process_lpg(query, language, params)
239 } else {
240 #[cfg(feature = "rdf")]
241 {
242 self.process_rdf(query, language, params)
243 }
244 #[cfg(not(feature = "rdf"))]
245 {
246 Err(Error::Internal(
247 "RDF support not enabled. Compile with --features rdf".to_string(),
248 ))
249 }
250 }
251 }
252
253 fn process_lpg(
255 &self,
256 query: &str,
257 language: QueryLanguage,
258 params: Option<&QueryParams>,
259 ) -> Result<QueryResult> {
260 #[cfg(not(target_arch = "wasm32"))]
261 let start_time = std::time::Instant::now();
262
263 let mut logical_plan = self.translate_lpg(query, language)?;
265
266 if let Some(params) = params {
268 substitute_params(&mut logical_plan, params)?;
269 }
270
271 let mut binder = Binder::new();
273 let _binding_context = binder.bind(&logical_plan)?;
274
275 let optimized_plan = self.optimizer.optimize(logical_plan)?;
277
278 if optimized_plan.explain {
280 let mut plan = optimized_plan;
281 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
282 return Ok(explain_result(&plan));
283 }
284
285 let planner = if let Some((epoch, tx_id)) = self.tx_context {
287 Planner::with_context(
288 Arc::clone(&self.graph_store),
289 Arc::clone(&self.tx_manager),
290 Some(tx_id),
291 epoch,
292 )
293 } else {
294 Planner::with_context(
295 Arc::clone(&self.graph_store),
296 Arc::clone(&self.tx_manager),
297 None,
298 self.tx_manager.current_epoch(),
299 )
300 };
301 let mut physical_plan = planner.plan(&optimized_plan)?;
302
303 let executor = Executor::with_columns(physical_plan.columns.clone());
305 let mut result = executor.execute(physical_plan.operator.as_mut())?;
306
307 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
310 {
311 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
312 result.execution_time_ms = Some(elapsed_ms);
313 }
314 result.rows_scanned = Some(rows_scanned);
315
316 Ok(result)
317 }
318
319 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
321 match language {
322 #[cfg(feature = "gql")]
323 QueryLanguage::Gql => {
324 use crate::query::translators::gql;
325 gql::translate(query)
326 }
327 #[cfg(feature = "cypher")]
328 QueryLanguage::Cypher => {
329 use crate::query::translators::cypher;
330 cypher::translate(query)
331 }
332 #[cfg(feature = "gremlin")]
333 QueryLanguage::Gremlin => {
334 use crate::query::translators::gremlin;
335 gremlin::translate(query)
336 }
337 #[cfg(feature = "graphql")]
338 QueryLanguage::GraphQL => {
339 use crate::query::translators::graphql;
340 graphql::translate(query)
341 }
342 #[cfg(feature = "sql-pgq")]
343 QueryLanguage::SqlPgq => {
344 use crate::query::translators::sql_pgq;
345 sql_pgq::translate(query)
346 }
347 #[allow(unreachable_patterns)]
348 _ => Err(Error::Internal(format!(
349 "Language {:?} is not an LPG language",
350 language
351 ))),
352 }
353 }
354
355 #[cfg(feature = "rdf")]
357 fn process_rdf(
358 &self,
359 query: &str,
360 language: QueryLanguage,
361 _params: Option<&QueryParams>,
362 ) -> Result<QueryResult> {
363 use crate::query::planner::rdf::RdfPlanner;
364
365 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
366 Error::Internal("RDF store not configured for this processor".to_string())
367 })?;
368
369 let logical_plan = self.translate_rdf(query, language)?;
371
372 let mut binder = Binder::new();
374 let _binding_context = binder.bind(&logical_plan)?;
375
376 let optimized_plan = self.optimizer.optimize(logical_plan)?;
378
379 if optimized_plan.explain {
381 return Ok(explain_result(&optimized_plan));
382 }
383
384 let planner = RdfPlanner::new(Arc::clone(rdf_store));
386 let mut physical_plan = planner.plan(&optimized_plan)?;
387
388 let executor = Executor::with_columns(physical_plan.columns.clone());
390 executor.execute(physical_plan.operator.as_mut())
391 }
392
393 #[cfg(feature = "rdf")]
395 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
396 match language {
397 #[cfg(feature = "sparql")]
398 QueryLanguage::Sparql => {
399 use crate::query::translators::sparql;
400 sparql::translate(query)
401 }
402 #[cfg(all(feature = "graphql", feature = "rdf"))]
403 QueryLanguage::GraphQLRdf => {
404 use crate::query::translators::graphql_rdf;
405 graphql_rdf::translate(query, "http://example.org/")
407 }
408 _ => Err(Error::Internal(format!(
409 "Language {:?} is not an RDF language",
410 language
411 ))),
412 }
413 }
414
415 #[must_use]
417 pub fn lpg_store(&self) -> &Arc<LpgStore> {
418 &self.lpg_store
419 }
420
421 #[must_use]
423 pub fn catalog(&self) -> &Arc<Catalog> {
424 &self.catalog
425 }
426
427 #[must_use]
429 pub fn optimizer(&self) -> &Optimizer {
430 &self.optimizer
431 }
432
433 #[cfg(feature = "rdf")]
435 #[must_use]
436 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
437 self.rdf_store.as_ref()
438 }
439}
440
441impl QueryProcessor {
442 #[must_use]
444 pub fn tx_manager(&self) -> &Arc<TransactionManager> {
445 &self.tx_manager
446 }
447}
448
449pub(crate) fn annotate_pushdown_hints(
454 op: &mut LogicalOperator,
455 store: &dyn grafeo_core::graph::GraphStore,
456) {
457 use crate::query::plan::*;
458
459 match op {
460 LogicalOperator::Filter(filter) => {
461 annotate_pushdown_hints(&mut filter.input, store);
463
464 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
466 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
467 }
468 }
469 LogicalOperator::NodeScan(op) => {
470 if let Some(input) = &mut op.input {
471 annotate_pushdown_hints(input, store);
472 }
473 }
474 LogicalOperator::EdgeScan(op) => {
475 if let Some(input) = &mut op.input {
476 annotate_pushdown_hints(input, store);
477 }
478 }
479 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
480 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
481 LogicalOperator::Join(op) => {
482 annotate_pushdown_hints(&mut op.left, store);
483 annotate_pushdown_hints(&mut op.right, store);
484 }
485 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
486 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
487 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
488 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
489 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
490 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
491 LogicalOperator::Union(op) => {
492 for input in &mut op.inputs {
493 annotate_pushdown_hints(input, store);
494 }
495 }
496 LogicalOperator::Apply(op) => {
497 annotate_pushdown_hints(&mut op.input, store);
498 annotate_pushdown_hints(&mut op.subplan, store);
499 }
500 LogicalOperator::Otherwise(op) => {
501 annotate_pushdown_hints(&mut op.left, store);
502 annotate_pushdown_hints(&mut op.right, store);
503 }
504 _ => {}
505 }
506}
507
508fn infer_pushdown(
510 predicate: &LogicalExpression,
511 scan: &crate::query::plan::NodeScanOp,
512 store: &dyn grafeo_core::graph::GraphStore,
513) -> Option<crate::query::plan::PushdownHint> {
514 use crate::query::plan::*;
515
516 match predicate {
517 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
519 if let Some(prop) = extract_property_name(left, &scan.variable)
520 .or_else(|| extract_property_name(right, &scan.variable))
521 {
522 if store.has_property_index(&prop) {
523 return Some(PushdownHint::IndexLookup { property: prop });
524 }
525 if scan.label.is_some() {
526 return Some(PushdownHint::LabelFirst);
527 }
528 }
529 None
530 }
531 LogicalExpression::Binary {
533 left,
534 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
535 right,
536 } => {
537 if let Some(prop) = extract_property_name(left, &scan.variable)
538 .or_else(|| extract_property_name(right, &scan.variable))
539 {
540 if store.has_property_index(&prop) {
541 return Some(PushdownHint::RangeScan { property: prop });
542 }
543 if scan.label.is_some() {
544 return Some(PushdownHint::LabelFirst);
545 }
546 }
547 None
548 }
549 LogicalExpression::Binary {
551 left,
552 op: BinaryOp::And,
553 ..
554 } => infer_pushdown(left, scan, store),
555 _ => {
556 if scan.label.is_some() {
558 Some(PushdownHint::LabelFirst)
559 } else {
560 None
561 }
562 }
563 }
564}
565
566fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
569 if let LogicalExpression::Property { variable, property } = expr
570 && variable == scan_var
571 {
572 Some(property.clone())
573 } else {
574 None
575 }
576}
577
578pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
580 let tree_text = plan.root.explain_tree();
581 QueryResult {
582 columns: vec!["plan".to_string()],
583 column_types: vec![grafeo_common::types::LogicalType::String],
584 rows: vec![vec![Value::String(tree_text.into())]],
585 execution_time_ms: None,
586 rows_scanned: None,
587 status_message: None,
588 }
589}
590
591fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
593 substitute_in_operator(&mut plan.root, params)
594}
595
596fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
598 use crate::query::plan::*;
599
600 match op {
601 LogicalOperator::Filter(filter) => {
602 substitute_in_expression(&mut filter.predicate, params)?;
603 substitute_in_operator(&mut filter.input, params)?;
604 }
605 LogicalOperator::Return(ret) => {
606 for item in &mut ret.items {
607 substitute_in_expression(&mut item.expression, params)?;
608 }
609 substitute_in_operator(&mut ret.input, params)?;
610 }
611 LogicalOperator::Project(proj) => {
612 for p in &mut proj.projections {
613 substitute_in_expression(&mut p.expression, params)?;
614 }
615 substitute_in_operator(&mut proj.input, params)?;
616 }
617 LogicalOperator::NodeScan(scan) => {
618 if let Some(input) = &mut scan.input {
619 substitute_in_operator(input, params)?;
620 }
621 }
622 LogicalOperator::EdgeScan(scan) => {
623 if let Some(input) = &mut scan.input {
624 substitute_in_operator(input, params)?;
625 }
626 }
627 LogicalOperator::Expand(expand) => {
628 substitute_in_operator(&mut expand.input, params)?;
629 }
630 LogicalOperator::Join(join) => {
631 substitute_in_operator(&mut join.left, params)?;
632 substitute_in_operator(&mut join.right, params)?;
633 for cond in &mut join.conditions {
634 substitute_in_expression(&mut cond.left, params)?;
635 substitute_in_expression(&mut cond.right, params)?;
636 }
637 }
638 LogicalOperator::LeftJoin(join) => {
639 substitute_in_operator(&mut join.left, params)?;
640 substitute_in_operator(&mut join.right, params)?;
641 if let Some(cond) = &mut join.condition {
642 substitute_in_expression(cond, params)?;
643 }
644 }
645 LogicalOperator::Aggregate(agg) => {
646 for expr in &mut agg.group_by {
647 substitute_in_expression(expr, params)?;
648 }
649 for agg_expr in &mut agg.aggregates {
650 if let Some(expr) = &mut agg_expr.expression {
651 substitute_in_expression(expr, params)?;
652 }
653 }
654 substitute_in_operator(&mut agg.input, params)?;
655 }
656 LogicalOperator::Sort(sort) => {
657 for key in &mut sort.keys {
658 substitute_in_expression(&mut key.expression, params)?;
659 }
660 substitute_in_operator(&mut sort.input, params)?;
661 }
662 LogicalOperator::Limit(limit) => {
663 substitute_in_operator(&mut limit.input, params)?;
664 }
665 LogicalOperator::Skip(skip) => {
666 substitute_in_operator(&mut skip.input, params)?;
667 }
668 LogicalOperator::Distinct(distinct) => {
669 substitute_in_operator(&mut distinct.input, params)?;
670 }
671 LogicalOperator::CreateNode(create) => {
672 for (_, expr) in &mut create.properties {
673 substitute_in_expression(expr, params)?;
674 }
675 if let Some(input) = &mut create.input {
676 substitute_in_operator(input, params)?;
677 }
678 }
679 LogicalOperator::CreateEdge(create) => {
680 for (_, expr) in &mut create.properties {
681 substitute_in_expression(expr, params)?;
682 }
683 substitute_in_operator(&mut create.input, params)?;
684 }
685 LogicalOperator::DeleteNode(delete) => {
686 substitute_in_operator(&mut delete.input, params)?;
687 }
688 LogicalOperator::DeleteEdge(delete) => {
689 substitute_in_operator(&mut delete.input, params)?;
690 }
691 LogicalOperator::SetProperty(set) => {
692 for (_, expr) in &mut set.properties {
693 substitute_in_expression(expr, params)?;
694 }
695 substitute_in_operator(&mut set.input, params)?;
696 }
697 LogicalOperator::Union(union) => {
698 for input in &mut union.inputs {
699 substitute_in_operator(input, params)?;
700 }
701 }
702 LogicalOperator::AntiJoin(anti) => {
703 substitute_in_operator(&mut anti.left, params)?;
704 substitute_in_operator(&mut anti.right, params)?;
705 }
706 LogicalOperator::Bind(bind) => {
707 substitute_in_expression(&mut bind.expression, params)?;
708 substitute_in_operator(&mut bind.input, params)?;
709 }
710 LogicalOperator::TripleScan(scan) => {
711 if let Some(input) = &mut scan.input {
712 substitute_in_operator(input, params)?;
713 }
714 }
715 LogicalOperator::Unwind(unwind) => {
716 substitute_in_expression(&mut unwind.expression, params)?;
717 substitute_in_operator(&mut unwind.input, params)?;
718 }
719 LogicalOperator::MapCollect(mc) => {
720 substitute_in_operator(&mut mc.input, params)?;
721 }
722 LogicalOperator::Merge(merge) => {
723 for (_, expr) in &mut merge.match_properties {
724 substitute_in_expression(expr, params)?;
725 }
726 for (_, expr) in &mut merge.on_create {
727 substitute_in_expression(expr, params)?;
728 }
729 for (_, expr) in &mut merge.on_match {
730 substitute_in_expression(expr, params)?;
731 }
732 substitute_in_operator(&mut merge.input, params)?;
733 }
734 LogicalOperator::MergeRelationship(merge_rel) => {
735 for (_, expr) in &mut merge_rel.match_properties {
736 substitute_in_expression(expr, params)?;
737 }
738 for (_, expr) in &mut merge_rel.on_create {
739 substitute_in_expression(expr, params)?;
740 }
741 for (_, expr) in &mut merge_rel.on_match {
742 substitute_in_expression(expr, params)?;
743 }
744 substitute_in_operator(&mut merge_rel.input, params)?;
745 }
746 LogicalOperator::AddLabel(add_label) => {
747 substitute_in_operator(&mut add_label.input, params)?;
748 }
749 LogicalOperator::RemoveLabel(remove_label) => {
750 substitute_in_operator(&mut remove_label.input, params)?;
751 }
752 LogicalOperator::ShortestPath(sp) => {
753 substitute_in_operator(&mut sp.input, params)?;
754 }
755 LogicalOperator::InsertTriple(insert) => {
757 if let Some(ref mut input) = insert.input {
758 substitute_in_operator(input, params)?;
759 }
760 }
761 LogicalOperator::DeleteTriple(delete) => {
762 if let Some(ref mut input) = delete.input {
763 substitute_in_operator(input, params)?;
764 }
765 }
766 LogicalOperator::Modify(modify) => {
767 substitute_in_operator(&mut modify.where_clause, params)?;
768 }
769 LogicalOperator::ClearGraph(_)
770 | LogicalOperator::CreateGraph(_)
771 | LogicalOperator::DropGraph(_)
772 | LogicalOperator::LoadGraph(_)
773 | LogicalOperator::CopyGraph(_)
774 | LogicalOperator::MoveGraph(_)
775 | LogicalOperator::AddGraph(_) => {}
776 LogicalOperator::HorizontalAggregate(op) => {
777 substitute_in_operator(&mut op.input, params)?;
778 }
779 LogicalOperator::Empty => {}
780 LogicalOperator::VectorScan(scan) => {
781 substitute_in_expression(&mut scan.query_vector, params)?;
782 if let Some(ref mut input) = scan.input {
783 substitute_in_operator(input, params)?;
784 }
785 }
786 LogicalOperator::VectorJoin(join) => {
787 substitute_in_expression(&mut join.query_vector, params)?;
788 substitute_in_operator(&mut join.input, params)?;
789 }
790 LogicalOperator::Except(except) => {
791 substitute_in_operator(&mut except.left, params)?;
792 substitute_in_operator(&mut except.right, params)?;
793 }
794 LogicalOperator::Intersect(intersect) => {
795 substitute_in_operator(&mut intersect.left, params)?;
796 substitute_in_operator(&mut intersect.right, params)?;
797 }
798 LogicalOperator::Otherwise(otherwise) => {
799 substitute_in_operator(&mut otherwise.left, params)?;
800 substitute_in_operator(&mut otherwise.right, params)?;
801 }
802 LogicalOperator::Apply(apply) => {
803 substitute_in_operator(&mut apply.input, params)?;
804 substitute_in_operator(&mut apply.subplan, params)?;
805 }
806 LogicalOperator::ParameterScan(_) => {}
808 LogicalOperator::MultiWayJoin(mwj) => {
809 for input in &mut mwj.inputs {
810 substitute_in_operator(input, params)?;
811 }
812 for cond in &mut mwj.conditions {
813 substitute_in_expression(&mut cond.left, params)?;
814 substitute_in_expression(&mut cond.right, params)?;
815 }
816 }
817 LogicalOperator::CreatePropertyGraph(_) => {}
819 LogicalOperator::CallProcedure(_) => {}
821 }
822 Ok(())
823}
824
825fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
827 use crate::query::plan::LogicalExpression;
828
829 match expr {
830 LogicalExpression::Parameter(name) => {
831 if let Some(value) = params.get(name) {
832 *expr = LogicalExpression::Literal(value.clone());
833 } else {
834 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
835 }
836 }
837 LogicalExpression::Binary { left, right, .. } => {
838 substitute_in_expression(left, params)?;
839 substitute_in_expression(right, params)?;
840 }
841 LogicalExpression::Unary { operand, .. } => {
842 substitute_in_expression(operand, params)?;
843 }
844 LogicalExpression::FunctionCall { args, .. } => {
845 for arg in args {
846 substitute_in_expression(arg, params)?;
847 }
848 }
849 LogicalExpression::List(items) => {
850 for item in items {
851 substitute_in_expression(item, params)?;
852 }
853 }
854 LogicalExpression::Map(pairs) => {
855 for (_, value) in pairs {
856 substitute_in_expression(value, params)?;
857 }
858 }
859 LogicalExpression::IndexAccess { base, index } => {
860 substitute_in_expression(base, params)?;
861 substitute_in_expression(index, params)?;
862 }
863 LogicalExpression::SliceAccess { base, start, end } => {
864 substitute_in_expression(base, params)?;
865 if let Some(s) = start {
866 substitute_in_expression(s, params)?;
867 }
868 if let Some(e) = end {
869 substitute_in_expression(e, params)?;
870 }
871 }
872 LogicalExpression::Case {
873 operand,
874 when_clauses,
875 else_clause,
876 } => {
877 if let Some(op) = operand {
878 substitute_in_expression(op, params)?;
879 }
880 for (cond, result) in when_clauses {
881 substitute_in_expression(cond, params)?;
882 substitute_in_expression(result, params)?;
883 }
884 if let Some(el) = else_clause {
885 substitute_in_expression(el, params)?;
886 }
887 }
888 LogicalExpression::Property { .. }
889 | LogicalExpression::Variable(_)
890 | LogicalExpression::Literal(_)
891 | LogicalExpression::Labels(_)
892 | LogicalExpression::Type(_)
893 | LogicalExpression::Id(_) => {}
894 LogicalExpression::ListComprehension {
895 list_expr,
896 filter_expr,
897 map_expr,
898 ..
899 } => {
900 substitute_in_expression(list_expr, params)?;
901 if let Some(filter) = filter_expr {
902 substitute_in_expression(filter, params)?;
903 }
904 substitute_in_expression(map_expr, params)?;
905 }
906 LogicalExpression::ListPredicate {
907 list_expr,
908 predicate,
909 ..
910 } => {
911 substitute_in_expression(list_expr, params)?;
912 substitute_in_expression(predicate, params)?;
913 }
914 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
915 }
917 LogicalExpression::PatternComprehension { projection, .. } => {
918 substitute_in_expression(projection, params)?;
919 }
920 LogicalExpression::MapProjection { entries, .. } => {
921 for entry in entries {
922 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
923 substitute_in_expression(expr, params)?;
924 }
925 }
926 }
927 LogicalExpression::Reduce {
928 initial,
929 list,
930 expression,
931 ..
932 } => {
933 substitute_in_expression(initial, params)?;
934 substitute_in_expression(list, params)?;
935 substitute_in_expression(expression, params)?;
936 }
937 }
938 Ok(())
939}
940
941#[cfg(test)]
942mod tests {
943 use super::*;
944
945 #[test]
946 fn test_query_language_is_lpg() {
947 #[cfg(feature = "gql")]
948 assert!(QueryLanguage::Gql.is_lpg());
949 #[cfg(feature = "cypher")]
950 assert!(QueryLanguage::Cypher.is_lpg());
951 #[cfg(feature = "sparql")]
952 assert!(!QueryLanguage::Sparql.is_lpg());
953 }
954
955 #[test]
956 fn test_processor_creation() {
957 let store = Arc::new(LpgStore::new().unwrap());
958 let processor = QueryProcessor::for_lpg(store);
959 assert!(processor.lpg_store().node_count() == 0);
960 }
961
962 #[cfg(feature = "gql")]
963 #[test]
964 fn test_process_simple_gql() {
965 let store = Arc::new(LpgStore::new().unwrap());
966 store.create_node(&["Person"]);
967 store.create_node(&["Person"]);
968
969 let processor = QueryProcessor::for_lpg(store);
970 let result = processor
971 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
972 .unwrap();
973
974 assert_eq!(result.row_count(), 2);
975 assert_eq!(result.columns[0], "n");
976 }
977
978 #[cfg(feature = "cypher")]
979 #[test]
980 fn test_process_simple_cypher() {
981 let store = Arc::new(LpgStore::new().unwrap());
982 store.create_node(&["Person"]);
983
984 let processor = QueryProcessor::for_lpg(store);
985 let result = processor
986 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
987 .unwrap();
988
989 assert_eq!(result.row_count(), 1);
990 }
991
992 #[cfg(feature = "gql")]
993 #[test]
994 fn test_process_with_params() {
995 let store = Arc::new(LpgStore::new().unwrap());
996 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
997 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
998 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
999
1000 let processor = QueryProcessor::for_lpg(store);
1001
1002 let mut params = HashMap::new();
1004 params.insert("min_age".to_string(), Value::Int64(30));
1005
1006 let result = processor
1007 .process(
1008 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1009 QueryLanguage::Gql,
1010 Some(¶ms),
1011 )
1012 .unwrap();
1013
1014 assert_eq!(result.row_count(), 2);
1016 }
1017
1018 #[cfg(feature = "gql")]
1019 #[test]
1020 fn test_missing_param_error() {
1021 let store = Arc::new(LpgStore::new().unwrap());
1022 store.create_node(&["Person"]);
1023
1024 let processor = QueryProcessor::for_lpg(store);
1025
1026 let params: HashMap<String, Value> = HashMap::new();
1028 let result = processor.process(
1029 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1030 QueryLanguage::Gql,
1031 Some(¶ms),
1032 );
1033
1034 assert!(result.is_err());
1036 let err = result.unwrap_err();
1037 assert!(
1038 err.to_string().contains("Missing parameter"),
1039 "Expected 'Missing parameter' error, got: {}",
1040 err
1041 );
1042 }
1043
1044 #[cfg(feature = "gql")]
1045 #[test]
1046 fn test_params_in_filter_with_property() {
1047 let store = Arc::new(LpgStore::new().unwrap());
1049 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1050 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1051
1052 let processor = QueryProcessor::for_lpg(store);
1053
1054 let mut params = HashMap::new();
1055 params.insert("threshold".to_string(), Value::Int64(15));
1056
1057 let result = processor
1058 .process(
1059 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1060 QueryLanguage::Gql,
1061 Some(¶ms),
1062 )
1063 .unwrap();
1064
1065 assert_eq!(result.row_count(), 1);
1067 let row = &result.rows[0];
1068 assert_eq!(row[0], Value::Int64(20));
1069 }
1070
1071 #[cfg(feature = "gql")]
1072 #[test]
1073 fn test_params_in_multiple_where_conditions() {
1074 let store = Arc::new(LpgStore::new().unwrap());
1076 store.create_node_with_props(
1077 &["Person"],
1078 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1079 );
1080 store.create_node_with_props(
1081 &["Person"],
1082 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1083 );
1084 store.create_node_with_props(
1085 &["Person"],
1086 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1087 );
1088
1089 let processor = QueryProcessor::for_lpg(store);
1090
1091 let mut params = HashMap::new();
1092 params.insert("min_age".to_string(), Value::Int64(30));
1093 params.insert("min_score".to_string(), Value::Int64(75));
1094
1095 let result = processor
1096 .process(
1097 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1098 QueryLanguage::Gql,
1099 Some(¶ms),
1100 )
1101 .unwrap();
1102
1103 assert_eq!(result.row_count(), 1);
1105 }
1106
1107 #[cfg(feature = "gql")]
1108 #[test]
1109 fn test_params_with_in_list() {
1110 let store = Arc::new(LpgStore::new().unwrap());
1112 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1113 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1114 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1115
1116 let processor = QueryProcessor::for_lpg(store);
1117
1118 let mut params = HashMap::new();
1120 params.insert("target".to_string(), Value::String("active".into()));
1121
1122 let result = processor
1123 .process(
1124 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1125 QueryLanguage::Gql,
1126 Some(¶ms),
1127 )
1128 .unwrap();
1129
1130 assert_eq!(result.row_count(), 1);
1131 }
1132
1133 #[cfg(feature = "gql")]
1134 #[test]
1135 fn test_params_same_type_comparison() {
1136 let store = Arc::new(LpgStore::new().unwrap());
1138 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1139 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1140
1141 let processor = QueryProcessor::for_lpg(store);
1142
1143 let mut params = HashMap::new();
1145 params.insert("threshold".to_string(), Value::Int64(75));
1146
1147 let result = processor
1148 .process(
1149 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1150 QueryLanguage::Gql,
1151 Some(¶ms),
1152 )
1153 .unwrap();
1154
1155 assert_eq!(result.row_count(), 1);
1157 }
1158
1159 #[cfg(feature = "gql")]
1160 #[test]
1161 fn test_process_empty_result_has_columns() {
1162 let store = Arc::new(LpgStore::new().unwrap());
1164 let processor = QueryProcessor::for_lpg(store);
1167 let result = processor
1168 .process(
1169 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1170 QueryLanguage::Gql,
1171 None,
1172 )
1173 .unwrap();
1174
1175 assert_eq!(result.row_count(), 0);
1176 assert_eq!(result.columns.len(), 2);
1177 assert_eq!(result.columns[0], "name");
1178 assert_eq!(result.columns[1], "age");
1179 }
1180
1181 #[cfg(feature = "gql")]
1182 #[test]
1183 fn test_params_string_equality() {
1184 let store = Arc::new(LpgStore::new().unwrap());
1186 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1187 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1188 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1189
1190 let processor = QueryProcessor::for_lpg(store);
1191
1192 let mut params = HashMap::new();
1193 params.insert("target".to_string(), Value::String("beta".into()));
1194
1195 let result = processor
1196 .process(
1197 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1198 QueryLanguage::Gql,
1199 Some(¶ms),
1200 )
1201 .unwrap();
1202
1203 assert_eq!(result.row_count(), 1);
1204 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1205 }
1206}