1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::lpg::LpgStore;
16use grafeo_core::graph::{GraphStore, GraphStoreMut};
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29#[non_exhaustive]
30pub enum QueryLanguage {
31 #[cfg(feature = "gql")]
33 Gql,
34 #[cfg(feature = "cypher")]
36 Cypher,
37 #[cfg(feature = "gremlin")]
39 Gremlin,
40 #[cfg(feature = "graphql")]
42 GraphQL,
43 #[cfg(feature = "sql-pgq")]
45 SqlPgq,
46 #[cfg(feature = "sparql")]
48 Sparql,
49 #[cfg(all(feature = "graphql", feature = "rdf"))]
51 GraphQLRdf,
52}
53
54impl QueryLanguage {
55 #[must_use]
57 pub const fn is_lpg(&self) -> bool {
58 match self {
59 #[cfg(feature = "gql")]
60 Self::Gql => true,
61 #[cfg(feature = "cypher")]
62 Self::Cypher => true,
63 #[cfg(feature = "gremlin")]
64 Self::Gremlin => true,
65 #[cfg(feature = "graphql")]
66 Self::GraphQL => true,
67 #[cfg(feature = "sql-pgq")]
68 Self::SqlPgq => true,
69 #[cfg(feature = "sparql")]
70 Self::Sparql => false,
71 #[cfg(all(feature = "graphql", feature = "rdf"))]
72 Self::GraphQLRdf => false,
73 }
74 }
75}
76
77pub type QueryParams = HashMap<String, Value>;
79
80pub struct QueryProcessor {
100 lpg_store: Arc<LpgStore>,
102 graph_store: Arc<dyn GraphStore>,
104 write_store: Option<Arc<dyn GraphStoreMut>>,
106 transaction_manager: Arc<TransactionManager>,
108 catalog: Arc<Catalog>,
110 optimizer: Optimizer,
112 transaction_context: Option<(EpochId, TransactionId)>,
114 #[cfg(feature = "rdf")]
116 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
117}
118
119impl QueryProcessor {
120 #[must_use]
122 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
123 let optimizer = Optimizer::from_store(&store);
124 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
125 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
126 Self {
127 lpg_store: store,
128 graph_store,
129 write_store,
130 transaction_manager: Arc::new(TransactionManager::new()),
131 catalog: Arc::new(Catalog::new()),
132 optimizer,
133 transaction_context: None,
134 #[cfg(feature = "rdf")]
135 rdf_store: None,
136 }
137 }
138
139 #[must_use]
141 pub fn for_lpg_with_transaction(
142 store: Arc<LpgStore>,
143 transaction_manager: Arc<TransactionManager>,
144 ) -> Self {
145 let optimizer = Optimizer::from_store(&store);
146 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
147 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
148 Self {
149 lpg_store: store,
150 graph_store,
151 write_store,
152 transaction_manager,
153 catalog: Arc::new(Catalog::new()),
154 optimizer,
155 transaction_context: None,
156 #[cfg(feature = "rdf")]
157 rdf_store: None,
158 }
159 }
160
161 pub fn for_graph_store_with_transaction(
167 store: Arc<dyn GraphStoreMut>,
168 transaction_manager: Arc<TransactionManager>,
169 ) -> Result<Self> {
170 let optimizer = Optimizer::from_graph_store(&*store);
171 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
172 Ok(Self {
173 lpg_store: Arc::new(LpgStore::new()?),
174 graph_store: read_store,
175 write_store: Some(store),
176 transaction_manager,
177 catalog: Arc::new(Catalog::new()),
178 optimizer,
179 transaction_context: None,
180 #[cfg(feature = "rdf")]
181 rdf_store: None,
182 })
183 }
184
185 pub fn for_stores_with_transaction(
191 read_store: Arc<dyn GraphStore>,
192 write_store: Option<Arc<dyn GraphStoreMut>>,
193 transaction_manager: Arc<TransactionManager>,
194 ) -> Result<Self> {
195 let optimizer = Optimizer::from_graph_store(&*read_store);
196 Ok(Self {
197 lpg_store: Arc::new(LpgStore::new()?),
198 graph_store: read_store,
199 write_store,
200 transaction_manager,
201 catalog: Arc::new(Catalog::new()),
202 optimizer,
203 transaction_context: None,
204 #[cfg(feature = "rdf")]
205 rdf_store: None,
206 })
207 }
208
209 #[must_use]
213 pub fn with_transaction_context(
214 mut self,
215 viewing_epoch: EpochId,
216 transaction_id: TransactionId,
217 ) -> Self {
218 self.transaction_context = Some((viewing_epoch, transaction_id));
219 self
220 }
221
222 #[must_use]
224 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
225 self.catalog = catalog;
226 self
227 }
228
229 #[must_use]
231 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
232 self.optimizer = optimizer;
233 self
234 }
235
236 pub fn process(
256 &self,
257 query: &str,
258 language: QueryLanguage,
259 params: Option<&QueryParams>,
260 ) -> Result<QueryResult> {
261 if language.is_lpg() {
262 self.process_lpg(query, language, params)
263 } else {
264 #[cfg(feature = "rdf")]
265 {
266 self.process_rdf(query, language, params)
267 }
268 #[cfg(not(feature = "rdf"))]
269 {
270 Err(Error::Internal(
271 "RDF support not enabled. Compile with --features rdf".to_string(),
272 ))
273 }
274 }
275 }
276
277 fn process_lpg(
279 &self,
280 query: &str,
281 language: QueryLanguage,
282 params: Option<&QueryParams>,
283 ) -> Result<QueryResult> {
284 #[cfg(not(target_arch = "wasm32"))]
285 let start_time = std::time::Instant::now();
286
287 let mut logical_plan = self.translate_lpg(query, language)?;
289
290 let has_defaults = !logical_plan.default_params.is_empty();
292 if params.is_some() || has_defaults {
293 let merged = if has_defaults {
294 let mut merged = logical_plan.default_params.clone();
295 if let Some(params) = params {
296 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
297 }
298 merged
299 } else {
300 params.cloned().unwrap_or_default()
301 };
302 substitute_params(&mut logical_plan, &merged)?;
303 }
304
305 let mut binder = Binder::new();
307 let _binding_context = binder.bind(&logical_plan)?;
308
309 let optimized_plan = self.optimizer.optimize(logical_plan)?;
311
312 if optimized_plan.explain {
314 let mut plan = optimized_plan;
315 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
316 return Ok(explain_result(&plan));
317 }
318
319 let is_read_only =
323 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
324 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
325 Planner::with_context(
326 Arc::clone(&self.graph_store),
327 self.write_store.as_ref().map(Arc::clone),
328 Arc::clone(&self.transaction_manager),
329 Some(transaction_id),
330 epoch,
331 )
332 } else {
333 Planner::with_context(
334 Arc::clone(&self.graph_store),
335 self.write_store.as_ref().map(Arc::clone),
336 Arc::clone(&self.transaction_manager),
337 None,
338 self.transaction_manager.current_epoch(),
339 )
340 }
341 .with_read_only(is_read_only);
342 let mut physical_plan = planner.plan(&optimized_plan)?;
343
344 let executor = Executor::with_columns(physical_plan.columns.clone());
346 let mut result = executor.execute(physical_plan.operator.as_mut())?;
347
348 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
351 {
352 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
353 result.execution_time_ms = Some(elapsed_ms);
354 }
355 result.rows_scanned = Some(rows_scanned);
356
357 Ok(result)
358 }
359
360 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
362 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
363 match language {
364 #[cfg(feature = "gql")]
365 QueryLanguage::Gql => {
366 use crate::query::translators::gql;
367 gql::translate(query)
368 }
369 #[cfg(feature = "cypher")]
370 QueryLanguage::Cypher => {
371 use crate::query::translators::cypher;
372 cypher::translate(query)
373 }
374 #[cfg(feature = "gremlin")]
375 QueryLanguage::Gremlin => {
376 use crate::query::translators::gremlin;
377 gremlin::translate(query)
378 }
379 #[cfg(feature = "graphql")]
380 QueryLanguage::GraphQL => {
381 use crate::query::translators::graphql;
382 graphql::translate(query)
383 }
384 #[cfg(feature = "sql-pgq")]
385 QueryLanguage::SqlPgq => {
386 use crate::query::translators::sql_pgq;
387 sql_pgq::translate(query)
388 }
389 #[allow(unreachable_patterns)]
390 _ => Err(Error::Internal(format!(
391 "Language {:?} is not an LPG language",
392 language
393 ))),
394 }
395 }
396
397 #[must_use]
399 pub fn lpg_store(&self) -> &Arc<LpgStore> {
400 &self.lpg_store
401 }
402
403 #[must_use]
405 pub fn catalog(&self) -> &Arc<Catalog> {
406 &self.catalog
407 }
408
409 #[must_use]
411 pub fn optimizer(&self) -> &Optimizer {
412 &self.optimizer
413 }
414}
415
416impl QueryProcessor {
417 #[must_use]
419 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
420 &self.transaction_manager
421 }
422}
423
424#[cfg(feature = "rdf")]
429impl QueryProcessor {
430 #[must_use]
432 pub fn with_rdf(
433 lpg_store: Arc<LpgStore>,
434 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
435 ) -> Self {
436 let optimizer = Optimizer::from_store(&lpg_store);
437 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
438 let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
439 Self {
440 lpg_store,
441 graph_store,
442 write_store,
443 transaction_manager: Arc::new(TransactionManager::new()),
444 catalog: Arc::new(Catalog::new()),
445 optimizer,
446 transaction_context: None,
447 rdf_store: Some(rdf_store),
448 }
449 }
450
451 #[must_use]
453 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
454 self.rdf_store.as_ref()
455 }
456
457 fn process_rdf(
459 &self,
460 query: &str,
461 language: QueryLanguage,
462 params: Option<&QueryParams>,
463 ) -> Result<QueryResult> {
464 use crate::query::planner::rdf::RdfPlanner;
465
466 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
467 Error::Internal("RDF store not configured for this processor".to_string())
468 })?;
469
470 let mut logical_plan = self.translate_rdf(query, language)?;
472
473 let has_defaults = !logical_plan.default_params.is_empty();
475 if params.is_some() || has_defaults {
476 let merged = if has_defaults {
477 let mut merged = logical_plan.default_params.clone();
478 if let Some(params) = params {
479 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
480 }
481 merged
482 } else {
483 params.cloned().unwrap_or_default()
484 };
485 substitute_params(&mut logical_plan, &merged)?;
486 }
487
488 let mut binder = Binder::new();
490 let _binding_context = binder.bind(&logical_plan)?;
491
492 let optimized_plan = self.optimizer.optimize(logical_plan)?;
494
495 if optimized_plan.explain {
497 return Ok(explain_result(&optimized_plan));
498 }
499
500 let planner = RdfPlanner::new(Arc::clone(rdf_store));
502 let mut physical_plan = planner.plan(&optimized_plan)?;
503
504 let executor = Executor::with_columns(physical_plan.columns.clone());
506 executor.execute(physical_plan.operator.as_mut())
507 }
508
509 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
511 match language {
512 #[cfg(feature = "sparql")]
513 QueryLanguage::Sparql => {
514 use crate::query::translators::sparql;
515 sparql::translate(query)
516 }
517 #[cfg(all(feature = "graphql", feature = "rdf"))]
518 QueryLanguage::GraphQLRdf => {
519 use crate::query::translators::graphql_rdf;
520 graphql_rdf::translate(query, "http://example.org/")
522 }
523 _ => Err(Error::Internal(format!(
524 "Language {:?} is not an RDF language",
525 language
526 ))),
527 }
528 }
529}
530
531pub(crate) fn annotate_pushdown_hints(
536 op: &mut LogicalOperator,
537 store: &dyn grafeo_core::graph::GraphStore,
538) {
539 #[allow(clippy::wildcard_imports)]
540 use crate::query::plan::*;
541
542 match op {
543 LogicalOperator::Filter(filter) => {
544 annotate_pushdown_hints(&mut filter.input, store);
546
547 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
549 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
550 }
551 }
552 LogicalOperator::NodeScan(op) => {
553 if let Some(input) = &mut op.input {
554 annotate_pushdown_hints(input, store);
555 }
556 }
557 LogicalOperator::EdgeScan(op) => {
558 if let Some(input) = &mut op.input {
559 annotate_pushdown_hints(input, store);
560 }
561 }
562 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
563 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
564 LogicalOperator::Join(op) => {
565 annotate_pushdown_hints(&mut op.left, store);
566 annotate_pushdown_hints(&mut op.right, store);
567 }
568 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
569 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
570 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
571 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
572 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
573 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
574 LogicalOperator::Union(op) => {
575 for input in &mut op.inputs {
576 annotate_pushdown_hints(input, store);
577 }
578 }
579 LogicalOperator::Apply(op) => {
580 annotate_pushdown_hints(&mut op.input, store);
581 annotate_pushdown_hints(&mut op.subplan, store);
582 }
583 LogicalOperator::Otherwise(op) => {
584 annotate_pushdown_hints(&mut op.left, store);
585 annotate_pushdown_hints(&mut op.right, store);
586 }
587 _ => {}
588 }
589}
590
591fn infer_pushdown(
593 predicate: &LogicalExpression,
594 scan: &crate::query::plan::NodeScanOp,
595 store: &dyn grafeo_core::graph::GraphStore,
596) -> Option<crate::query::plan::PushdownHint> {
597 #[allow(clippy::wildcard_imports)]
598 use crate::query::plan::*;
599
600 match predicate {
601 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
603 if let Some(prop) = extract_property_name(left, &scan.variable)
604 .or_else(|| extract_property_name(right, &scan.variable))
605 {
606 if store.has_property_index(&prop) {
607 return Some(PushdownHint::IndexLookup { property: prop });
608 }
609 if scan.label.is_some() {
610 return Some(PushdownHint::LabelFirst);
611 }
612 }
613 None
614 }
615 LogicalExpression::Binary {
617 left,
618 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
619 right,
620 } => {
621 if let Some(prop) = extract_property_name(left, &scan.variable)
622 .or_else(|| extract_property_name(right, &scan.variable))
623 {
624 if store.has_property_index(&prop) {
625 return Some(PushdownHint::RangeScan { property: prop });
626 }
627 if scan.label.is_some() {
628 return Some(PushdownHint::LabelFirst);
629 }
630 }
631 None
632 }
633 LogicalExpression::Binary {
635 left,
636 op: BinaryOp::And,
637 ..
638 } => infer_pushdown(left, scan, store),
639 _ => {
640 if scan.label.is_some() {
642 Some(PushdownHint::LabelFirst)
643 } else {
644 None
645 }
646 }
647 }
648}
649
650fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
653 if let LogicalExpression::Property { variable, property } = expr
654 && variable == scan_var
655 {
656 Some(property.clone())
657 } else {
658 None
659 }
660}
661
662pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
664 let tree_text = plan.root.explain_tree();
665 QueryResult {
666 columns: vec!["plan".to_string()],
667 column_types: vec![grafeo_common::types::LogicalType::String],
668 rows: vec![vec![Value::String(tree_text.into())]],
669 execution_time_ms: None,
670 rows_scanned: None,
671 status_message: None,
672 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
673 }
674}
675
676pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
682 substitute_in_operator(&mut plan.root, params)
683}
684
685fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
687 #[allow(clippy::wildcard_imports)]
688 use crate::query::plan::*;
689
690 match op {
691 LogicalOperator::Filter(filter) => {
692 substitute_in_expression(&mut filter.predicate, params)?;
693 substitute_in_operator(&mut filter.input, params)?;
694 }
695 LogicalOperator::Return(ret) => {
696 for item in &mut ret.items {
697 substitute_in_expression(&mut item.expression, params)?;
698 }
699 substitute_in_operator(&mut ret.input, params)?;
700 }
701 LogicalOperator::Project(proj) => {
702 for p in &mut proj.projections {
703 substitute_in_expression(&mut p.expression, params)?;
704 }
705 substitute_in_operator(&mut proj.input, params)?;
706 }
707 LogicalOperator::NodeScan(scan) => {
708 if let Some(input) = &mut scan.input {
709 substitute_in_operator(input, params)?;
710 }
711 }
712 LogicalOperator::EdgeScan(scan) => {
713 if let Some(input) = &mut scan.input {
714 substitute_in_operator(input, params)?;
715 }
716 }
717 LogicalOperator::Expand(expand) => {
718 substitute_in_operator(&mut expand.input, params)?;
719 }
720 LogicalOperator::Join(join) => {
721 substitute_in_operator(&mut join.left, params)?;
722 substitute_in_operator(&mut join.right, params)?;
723 for cond in &mut join.conditions {
724 substitute_in_expression(&mut cond.left, params)?;
725 substitute_in_expression(&mut cond.right, params)?;
726 }
727 }
728 LogicalOperator::LeftJoin(join) => {
729 substitute_in_operator(&mut join.left, params)?;
730 substitute_in_operator(&mut join.right, params)?;
731 if let Some(cond) = &mut join.condition {
732 substitute_in_expression(cond, params)?;
733 }
734 }
735 LogicalOperator::Aggregate(agg) => {
736 for expr in &mut agg.group_by {
737 substitute_in_expression(expr, params)?;
738 }
739 for agg_expr in &mut agg.aggregates {
740 if let Some(expr) = &mut agg_expr.expression {
741 substitute_in_expression(expr, params)?;
742 }
743 }
744 substitute_in_operator(&mut agg.input, params)?;
745 }
746 LogicalOperator::Sort(sort) => {
747 for key in &mut sort.keys {
748 substitute_in_expression(&mut key.expression, params)?;
749 }
750 substitute_in_operator(&mut sort.input, params)?;
751 }
752 LogicalOperator::Limit(limit) => {
753 resolve_count_param(&mut limit.count, params)?;
754 substitute_in_operator(&mut limit.input, params)?;
755 }
756 LogicalOperator::Skip(skip) => {
757 resolve_count_param(&mut skip.count, params)?;
758 substitute_in_operator(&mut skip.input, params)?;
759 }
760 LogicalOperator::Distinct(distinct) => {
761 substitute_in_operator(&mut distinct.input, params)?;
762 }
763 LogicalOperator::CreateNode(create) => {
764 for (_, expr) in &mut create.properties {
765 substitute_in_expression(expr, params)?;
766 }
767 if let Some(input) = &mut create.input {
768 substitute_in_operator(input, params)?;
769 }
770 }
771 LogicalOperator::CreateEdge(create) => {
772 for (_, expr) in &mut create.properties {
773 substitute_in_expression(expr, params)?;
774 }
775 substitute_in_operator(&mut create.input, params)?;
776 }
777 LogicalOperator::DeleteNode(delete) => {
778 substitute_in_operator(&mut delete.input, params)?;
779 }
780 LogicalOperator::DeleteEdge(delete) => {
781 substitute_in_operator(&mut delete.input, params)?;
782 }
783 LogicalOperator::SetProperty(set) => {
784 for (_, expr) in &mut set.properties {
785 substitute_in_expression(expr, params)?;
786 }
787 substitute_in_operator(&mut set.input, params)?;
788 }
789 LogicalOperator::Union(union) => {
790 for input in &mut union.inputs {
791 substitute_in_operator(input, params)?;
792 }
793 }
794 LogicalOperator::AntiJoin(anti) => {
795 substitute_in_operator(&mut anti.left, params)?;
796 substitute_in_operator(&mut anti.right, params)?;
797 }
798 LogicalOperator::Bind(bind) => {
799 substitute_in_expression(&mut bind.expression, params)?;
800 substitute_in_operator(&mut bind.input, params)?;
801 }
802 LogicalOperator::TripleScan(scan) => {
803 if let Some(input) = &mut scan.input {
804 substitute_in_operator(input, params)?;
805 }
806 }
807 LogicalOperator::Unwind(unwind) => {
808 substitute_in_expression(&mut unwind.expression, params)?;
809 substitute_in_operator(&mut unwind.input, params)?;
810 }
811 LogicalOperator::MapCollect(mc) => {
812 substitute_in_operator(&mut mc.input, params)?;
813 }
814 LogicalOperator::Merge(merge) => {
815 for (_, expr) in &mut merge.match_properties {
816 substitute_in_expression(expr, params)?;
817 }
818 for (_, expr) in &mut merge.on_create {
819 substitute_in_expression(expr, params)?;
820 }
821 for (_, expr) in &mut merge.on_match {
822 substitute_in_expression(expr, params)?;
823 }
824 substitute_in_operator(&mut merge.input, params)?;
825 }
826 LogicalOperator::MergeRelationship(merge_rel) => {
827 for (_, expr) in &mut merge_rel.match_properties {
828 substitute_in_expression(expr, params)?;
829 }
830 for (_, expr) in &mut merge_rel.on_create {
831 substitute_in_expression(expr, params)?;
832 }
833 for (_, expr) in &mut merge_rel.on_match {
834 substitute_in_expression(expr, params)?;
835 }
836 substitute_in_operator(&mut merge_rel.input, params)?;
837 }
838 LogicalOperator::AddLabel(add_label) => {
839 substitute_in_operator(&mut add_label.input, params)?;
840 }
841 LogicalOperator::RemoveLabel(remove_label) => {
842 substitute_in_operator(&mut remove_label.input, params)?;
843 }
844 LogicalOperator::ShortestPath(sp) => {
845 substitute_in_operator(&mut sp.input, params)?;
846 }
847 LogicalOperator::InsertTriple(insert) => {
849 if let Some(ref mut input) = insert.input {
850 substitute_in_operator(input, params)?;
851 }
852 }
853 LogicalOperator::DeleteTriple(delete) => {
854 if let Some(ref mut input) = delete.input {
855 substitute_in_operator(input, params)?;
856 }
857 }
858 LogicalOperator::Modify(modify) => {
859 substitute_in_operator(&mut modify.where_clause, params)?;
860 }
861 LogicalOperator::ClearGraph(_)
862 | LogicalOperator::CreateGraph(_)
863 | LogicalOperator::DropGraph(_)
864 | LogicalOperator::LoadGraph(_)
865 | LogicalOperator::CopyGraph(_)
866 | LogicalOperator::MoveGraph(_)
867 | LogicalOperator::AddGraph(_) => {}
868 LogicalOperator::HorizontalAggregate(op) => {
869 substitute_in_operator(&mut op.input, params)?;
870 }
871 LogicalOperator::Empty => {}
872 LogicalOperator::VectorScan(scan) => {
873 substitute_in_expression(&mut scan.query_vector, params)?;
874 if let Some(ref mut input) = scan.input {
875 substitute_in_operator(input, params)?;
876 }
877 }
878 LogicalOperator::VectorJoin(join) => {
879 substitute_in_expression(&mut join.query_vector, params)?;
880 substitute_in_operator(&mut join.input, params)?;
881 }
882 LogicalOperator::Except(except) => {
883 substitute_in_operator(&mut except.left, params)?;
884 substitute_in_operator(&mut except.right, params)?;
885 }
886 LogicalOperator::Intersect(intersect) => {
887 substitute_in_operator(&mut intersect.left, params)?;
888 substitute_in_operator(&mut intersect.right, params)?;
889 }
890 LogicalOperator::Otherwise(otherwise) => {
891 substitute_in_operator(&mut otherwise.left, params)?;
892 substitute_in_operator(&mut otherwise.right, params)?;
893 }
894 LogicalOperator::Apply(apply) => {
895 substitute_in_operator(&mut apply.input, params)?;
896 substitute_in_operator(&mut apply.subplan, params)?;
897 }
898 LogicalOperator::ParameterScan(_) => {}
900 LogicalOperator::MultiWayJoin(mwj) => {
901 for input in &mut mwj.inputs {
902 substitute_in_operator(input, params)?;
903 }
904 for cond in &mut mwj.conditions {
905 substitute_in_expression(&mut cond.left, params)?;
906 substitute_in_expression(&mut cond.right, params)?;
907 }
908 }
909 LogicalOperator::CreatePropertyGraph(_) => {}
911 LogicalOperator::CallProcedure(_) => {}
913 LogicalOperator::LoadData(_) => {}
915 }
916 Ok(())
917}
918
919fn resolve_count_param(
921 count: &mut crate::query::plan::CountExpr,
922 params: &QueryParams,
923) -> Result<()> {
924 use crate::query::plan::CountExpr;
925 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
926
927 if let CountExpr::Parameter(name) = count {
928 let value = params.get(name.as_str()).ok_or_else(|| {
929 Error::Query(QueryError::new(
930 QueryErrorKind::Semantic,
931 format!("Missing parameter for SKIP/LIMIT: ${name}"),
932 ))
933 })?;
934 let n = match value {
935 Value::Int64(i) if *i >= 0 => *i as usize,
936 Value::Int64(i) => {
937 return Err(Error::Query(QueryError::new(
938 QueryErrorKind::Semantic,
939 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
940 )));
941 }
942 other => {
943 return Err(Error::Query(QueryError::new(
944 QueryErrorKind::Semantic,
945 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
946 )));
947 }
948 };
949 *count = CountExpr::Literal(n);
950 }
951 Ok(())
952}
953
954fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
956 use crate::query::plan::LogicalExpression;
957
958 match expr {
959 LogicalExpression::Parameter(name) => {
960 if let Some(value) = params.get(name) {
961 *expr = LogicalExpression::Literal(value.clone());
962 } else {
963 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
964 }
965 }
966 LogicalExpression::Binary { left, right, .. } => {
967 substitute_in_expression(left, params)?;
968 substitute_in_expression(right, params)?;
969 }
970 LogicalExpression::Unary { operand, .. } => {
971 substitute_in_expression(operand, params)?;
972 }
973 LogicalExpression::FunctionCall { args, .. } => {
974 for arg in args {
975 substitute_in_expression(arg, params)?;
976 }
977 }
978 LogicalExpression::List(items) => {
979 for item in items {
980 substitute_in_expression(item, params)?;
981 }
982 }
983 LogicalExpression::Map(pairs) => {
984 for (_, value) in pairs {
985 substitute_in_expression(value, params)?;
986 }
987 }
988 LogicalExpression::IndexAccess { base, index } => {
989 substitute_in_expression(base, params)?;
990 substitute_in_expression(index, params)?;
991 }
992 LogicalExpression::SliceAccess { base, start, end } => {
993 substitute_in_expression(base, params)?;
994 if let Some(s) = start {
995 substitute_in_expression(s, params)?;
996 }
997 if let Some(e) = end {
998 substitute_in_expression(e, params)?;
999 }
1000 }
1001 LogicalExpression::Case {
1002 operand,
1003 when_clauses,
1004 else_clause,
1005 } => {
1006 if let Some(op) = operand {
1007 substitute_in_expression(op, params)?;
1008 }
1009 for (cond, result) in when_clauses {
1010 substitute_in_expression(cond, params)?;
1011 substitute_in_expression(result, params)?;
1012 }
1013 if let Some(el) = else_clause {
1014 substitute_in_expression(el, params)?;
1015 }
1016 }
1017 LogicalExpression::Property { .. }
1018 | LogicalExpression::Variable(_)
1019 | LogicalExpression::Literal(_)
1020 | LogicalExpression::Labels(_)
1021 | LogicalExpression::Type(_)
1022 | LogicalExpression::Id(_) => {}
1023 LogicalExpression::ListComprehension {
1024 list_expr,
1025 filter_expr,
1026 map_expr,
1027 ..
1028 } => {
1029 substitute_in_expression(list_expr, params)?;
1030 if let Some(filter) = filter_expr {
1031 substitute_in_expression(filter, params)?;
1032 }
1033 substitute_in_expression(map_expr, params)?;
1034 }
1035 LogicalExpression::ListPredicate {
1036 list_expr,
1037 predicate,
1038 ..
1039 } => {
1040 substitute_in_expression(list_expr, params)?;
1041 substitute_in_expression(predicate, params)?;
1042 }
1043 LogicalExpression::ExistsSubquery(_)
1044 | LogicalExpression::CountSubquery(_)
1045 | LogicalExpression::ValueSubquery(_) => {
1046 }
1048 LogicalExpression::PatternComprehension { projection, .. } => {
1049 substitute_in_expression(projection, params)?;
1050 }
1051 LogicalExpression::MapProjection { entries, .. } => {
1052 for entry in entries {
1053 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1054 substitute_in_expression(expr, params)?;
1055 }
1056 }
1057 }
1058 LogicalExpression::Reduce {
1059 initial,
1060 list,
1061 expression,
1062 ..
1063 } => {
1064 substitute_in_expression(initial, params)?;
1065 substitute_in_expression(list, params)?;
1066 substitute_in_expression(expression, params)?;
1067 }
1068 }
1069 Ok(())
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use super::*;
1075
1076 #[test]
1077 fn test_query_language_is_lpg() {
1078 #[cfg(feature = "gql")]
1079 assert!(QueryLanguage::Gql.is_lpg());
1080 #[cfg(feature = "cypher")]
1081 assert!(QueryLanguage::Cypher.is_lpg());
1082 #[cfg(feature = "sparql")]
1083 assert!(!QueryLanguage::Sparql.is_lpg());
1084 }
1085
1086 #[test]
1087 fn test_processor_creation() {
1088 let store = Arc::new(LpgStore::new().unwrap());
1089 let processor = QueryProcessor::for_lpg(store);
1090 assert!(processor.lpg_store().node_count() == 0);
1091 }
1092
1093 #[cfg(feature = "gql")]
1094 #[test]
1095 fn test_process_simple_gql() {
1096 let store = Arc::new(LpgStore::new().unwrap());
1097 store.create_node(&["Person"]);
1098 store.create_node(&["Person"]);
1099
1100 let processor = QueryProcessor::for_lpg(store);
1101 let result = processor
1102 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1103 .unwrap();
1104
1105 assert_eq!(result.row_count(), 2);
1106 assert_eq!(result.columns[0], "n");
1107 }
1108
1109 #[cfg(feature = "cypher")]
1110 #[test]
1111 fn test_process_simple_cypher() {
1112 let store = Arc::new(LpgStore::new().unwrap());
1113 store.create_node(&["Person"]);
1114
1115 let processor = QueryProcessor::for_lpg(store);
1116 let result = processor
1117 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1118 .unwrap();
1119
1120 assert_eq!(result.row_count(), 1);
1121 }
1122
1123 #[cfg(feature = "gql")]
1124 #[test]
1125 fn test_process_with_params() {
1126 let store = Arc::new(LpgStore::new().unwrap());
1127 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1128 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1129 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1130
1131 let processor = QueryProcessor::for_lpg(store);
1132
1133 let mut params = HashMap::new();
1135 params.insert("min_age".to_string(), Value::Int64(30));
1136
1137 let result = processor
1138 .process(
1139 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1140 QueryLanguage::Gql,
1141 Some(¶ms),
1142 )
1143 .unwrap();
1144
1145 assert_eq!(result.row_count(), 2);
1147 }
1148
1149 #[cfg(feature = "gql")]
1150 #[test]
1151 fn test_missing_param_error() {
1152 let store = Arc::new(LpgStore::new().unwrap());
1153 store.create_node(&["Person"]);
1154
1155 let processor = QueryProcessor::for_lpg(store);
1156
1157 let params: HashMap<String, Value> = HashMap::new();
1159 let result = processor.process(
1160 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1161 QueryLanguage::Gql,
1162 Some(¶ms),
1163 );
1164
1165 assert!(result.is_err());
1167 let err = result.unwrap_err();
1168 assert!(
1169 err.to_string().contains("Missing parameter"),
1170 "Expected 'Missing parameter' error, got: {}",
1171 err
1172 );
1173 }
1174
1175 #[cfg(feature = "gql")]
1176 #[test]
1177 fn test_params_in_filter_with_property() {
1178 let store = Arc::new(LpgStore::new().unwrap());
1180 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1181 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1182
1183 let processor = QueryProcessor::for_lpg(store);
1184
1185 let mut params = HashMap::new();
1186 params.insert("threshold".to_string(), Value::Int64(15));
1187
1188 let result = processor
1189 .process(
1190 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1191 QueryLanguage::Gql,
1192 Some(¶ms),
1193 )
1194 .unwrap();
1195
1196 assert_eq!(result.row_count(), 1);
1198 let row = &result.rows[0];
1199 assert_eq!(row[0], Value::Int64(20));
1200 }
1201
1202 #[cfg(feature = "gql")]
1203 #[test]
1204 fn test_params_in_multiple_where_conditions() {
1205 let store = Arc::new(LpgStore::new().unwrap());
1207 store.create_node_with_props(
1208 &["Person"],
1209 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1210 );
1211 store.create_node_with_props(
1212 &["Person"],
1213 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1214 );
1215 store.create_node_with_props(
1216 &["Person"],
1217 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1218 );
1219
1220 let processor = QueryProcessor::for_lpg(store);
1221
1222 let mut params = HashMap::new();
1223 params.insert("min_age".to_string(), Value::Int64(30));
1224 params.insert("min_score".to_string(), Value::Int64(75));
1225
1226 let result = processor
1227 .process(
1228 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1229 QueryLanguage::Gql,
1230 Some(¶ms),
1231 )
1232 .unwrap();
1233
1234 assert_eq!(result.row_count(), 1);
1236 }
1237
1238 #[cfg(feature = "gql")]
1239 #[test]
1240 fn test_params_with_in_list() {
1241 let store = Arc::new(LpgStore::new().unwrap());
1243 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1244 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1245 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1246
1247 let processor = QueryProcessor::for_lpg(store);
1248
1249 let mut params = HashMap::new();
1251 params.insert("target".to_string(), Value::String("active".into()));
1252
1253 let result = processor
1254 .process(
1255 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1256 QueryLanguage::Gql,
1257 Some(¶ms),
1258 )
1259 .unwrap();
1260
1261 assert_eq!(result.row_count(), 1);
1262 }
1263
1264 #[cfg(feature = "gql")]
1265 #[test]
1266 fn test_params_same_type_comparison() {
1267 let store = Arc::new(LpgStore::new().unwrap());
1269 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1270 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1271
1272 let processor = QueryProcessor::for_lpg(store);
1273
1274 let mut params = HashMap::new();
1276 params.insert("threshold".to_string(), Value::Int64(75));
1277
1278 let result = processor
1279 .process(
1280 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1281 QueryLanguage::Gql,
1282 Some(¶ms),
1283 )
1284 .unwrap();
1285
1286 assert_eq!(result.row_count(), 1);
1288 }
1289
1290 #[cfg(feature = "gql")]
1291 #[test]
1292 fn test_process_empty_result_has_columns() {
1293 let store = Arc::new(LpgStore::new().unwrap());
1295 let processor = QueryProcessor::for_lpg(store);
1298 let result = processor
1299 .process(
1300 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1301 QueryLanguage::Gql,
1302 None,
1303 )
1304 .unwrap();
1305
1306 assert_eq!(result.row_count(), 0);
1307 assert_eq!(result.columns.len(), 2);
1308 assert_eq!(result.columns[0], "name");
1309 assert_eq!(result.columns[1], "age");
1310 }
1311
1312 #[cfg(feature = "gql")]
1313 #[test]
1314 fn test_params_string_equality() {
1315 let store = Arc::new(LpgStore::new().unwrap());
1317 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1318 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1319 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1320
1321 let processor = QueryProcessor::for_lpg(store);
1322
1323 let mut params = HashMap::new();
1324 params.insert("target".to_string(), Value::String("beta".into()));
1325
1326 let result = processor
1327 .process(
1328 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1329 QueryLanguage::Gql,
1330 Some(¶ms),
1331 )
1332 .unwrap();
1333
1334 assert_eq!(result.row_count(), 1);
1335 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1336 }
1337}