1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 transaction_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 transaction_context: Option<(EpochId, TransactionId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 transaction_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 transaction_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_transaction(
136 store: Arc<LpgStore>,
137 transaction_manager: Arc<TransactionManager>,
138 ) -> Self {
139 let optimizer = Optimizer::from_store(&store);
140 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141 Self {
142 lpg_store: store,
143 graph_store,
144 transaction_manager,
145 catalog: Arc::new(Catalog::new()),
146 optimizer,
147 transaction_context: None,
148 #[cfg(feature = "rdf")]
149 rdf_store: None,
150 }
151 }
152
153 pub fn for_graph_store_with_transaction(
159 store: Arc<dyn GraphStoreMut>,
160 transaction_manager: Arc<TransactionManager>,
161 ) -> Result<Self> {
162 let optimizer = Optimizer::from_graph_store(&*store);
163 Ok(Self {
164 lpg_store: Arc::new(LpgStore::new()?),
165 graph_store: store,
166 transaction_manager,
167 catalog: Arc::new(Catalog::new()),
168 optimizer,
169 transaction_context: None,
170 #[cfg(feature = "rdf")]
171 rdf_store: None,
172 })
173 }
174
175 #[must_use]
179 pub fn with_transaction_context(
180 mut self,
181 viewing_epoch: EpochId,
182 transaction_id: TransactionId,
183 ) -> Self {
184 self.transaction_context = Some((viewing_epoch, transaction_id));
185 self
186 }
187
188 #[must_use]
190 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
191 self.catalog = catalog;
192 self
193 }
194
195 #[must_use]
197 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
198 self.optimizer = optimizer;
199 self
200 }
201
202 pub fn process(
222 &self,
223 query: &str,
224 language: QueryLanguage,
225 params: Option<&QueryParams>,
226 ) -> Result<QueryResult> {
227 if language.is_lpg() {
228 self.process_lpg(query, language, params)
229 } else {
230 #[cfg(feature = "rdf")]
231 {
232 self.process_rdf(query, language, params)
233 }
234 #[cfg(not(feature = "rdf"))]
235 {
236 Err(Error::Internal(
237 "RDF support not enabled. Compile with --features rdf".to_string(),
238 ))
239 }
240 }
241 }
242
243 fn process_lpg(
245 &self,
246 query: &str,
247 language: QueryLanguage,
248 params: Option<&QueryParams>,
249 ) -> Result<QueryResult> {
250 #[cfg(not(target_arch = "wasm32"))]
251 let start_time = std::time::Instant::now();
252
253 let mut logical_plan = self.translate_lpg(query, language)?;
255
256 if let Some(params) = params {
258 substitute_params(&mut logical_plan, params)?;
259 }
260
261 let mut binder = Binder::new();
263 let _binding_context = binder.bind(&logical_plan)?;
264
265 let optimized_plan = self.optimizer.optimize(logical_plan)?;
267
268 if optimized_plan.explain {
270 let mut plan = optimized_plan;
271 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
272 return Ok(explain_result(&plan));
273 }
274
275 let is_read_only =
279 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
280 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
281 Planner::with_context(
282 Arc::clone(&self.graph_store),
283 Arc::clone(&self.transaction_manager),
284 Some(transaction_id),
285 epoch,
286 )
287 } else {
288 Planner::with_context(
289 Arc::clone(&self.graph_store),
290 Arc::clone(&self.transaction_manager),
291 None,
292 self.transaction_manager.current_epoch(),
293 )
294 }
295 .with_read_only(is_read_only);
296 let mut physical_plan = planner.plan(&optimized_plan)?;
297
298 let executor = Executor::with_columns(physical_plan.columns.clone());
300 let mut result = executor.execute(physical_plan.operator.as_mut())?;
301
302 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
305 {
306 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
307 result.execution_time_ms = Some(elapsed_ms);
308 }
309 result.rows_scanned = Some(rows_scanned);
310
311 Ok(result)
312 }
313
314 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
316 let _span = tracing::debug_span!("grafeo::query::parse", ?language).entered();
317 match language {
318 #[cfg(feature = "gql")]
319 QueryLanguage::Gql => {
320 use crate::query::translators::gql;
321 gql::translate(query)
322 }
323 #[cfg(feature = "cypher")]
324 QueryLanguage::Cypher => {
325 use crate::query::translators::cypher;
326 cypher::translate(query)
327 }
328 #[cfg(feature = "gremlin")]
329 QueryLanguage::Gremlin => {
330 use crate::query::translators::gremlin;
331 gremlin::translate(query)
332 }
333 #[cfg(feature = "graphql")]
334 QueryLanguage::GraphQL => {
335 use crate::query::translators::graphql;
336 graphql::translate(query)
337 }
338 #[cfg(feature = "sql-pgq")]
339 QueryLanguage::SqlPgq => {
340 use crate::query::translators::sql_pgq;
341 sql_pgq::translate(query)
342 }
343 #[allow(unreachable_patterns)]
344 _ => Err(Error::Internal(format!(
345 "Language {:?} is not an LPG language",
346 language
347 ))),
348 }
349 }
350
351 #[must_use]
353 pub fn lpg_store(&self) -> &Arc<LpgStore> {
354 &self.lpg_store
355 }
356
357 #[must_use]
359 pub fn catalog(&self) -> &Arc<Catalog> {
360 &self.catalog
361 }
362
363 #[must_use]
365 pub fn optimizer(&self) -> &Optimizer {
366 &self.optimizer
367 }
368}
369
370impl QueryProcessor {
371 #[must_use]
373 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
374 &self.transaction_manager
375 }
376}
377
378#[cfg(feature = "rdf")]
383impl QueryProcessor {
384 #[must_use]
386 pub fn with_rdf(
387 lpg_store: Arc<LpgStore>,
388 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
389 ) -> Self {
390 let optimizer = Optimizer::from_store(&lpg_store);
391 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
392 Self {
393 lpg_store,
394 graph_store,
395 transaction_manager: Arc::new(TransactionManager::new()),
396 catalog: Arc::new(Catalog::new()),
397 optimizer,
398 transaction_context: None,
399 rdf_store: Some(rdf_store),
400 }
401 }
402
403 #[must_use]
405 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
406 self.rdf_store.as_ref()
407 }
408
409 fn process_rdf(
411 &self,
412 query: &str,
413 language: QueryLanguage,
414 params: Option<&QueryParams>,
415 ) -> Result<QueryResult> {
416 use crate::query::planner::rdf::RdfPlanner;
417
418 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
419 Error::Internal("RDF store not configured for this processor".to_string())
420 })?;
421
422 let mut logical_plan = self.translate_rdf(query, language)?;
424
425 if let Some(params) = params {
427 substitute_params(&mut logical_plan, params)?;
428 }
429
430 let mut binder = Binder::new();
432 let _binding_context = binder.bind(&logical_plan)?;
433
434 let optimized_plan = self.optimizer.optimize(logical_plan)?;
436
437 if optimized_plan.explain {
439 return Ok(explain_result(&optimized_plan));
440 }
441
442 let planner = RdfPlanner::new(Arc::clone(rdf_store));
444 let mut physical_plan = planner.plan(&optimized_plan)?;
445
446 let executor = Executor::with_columns(physical_plan.columns.clone());
448 executor.execute(physical_plan.operator.as_mut())
449 }
450
451 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
453 match language {
454 #[cfg(feature = "sparql")]
455 QueryLanguage::Sparql => {
456 use crate::query::translators::sparql;
457 sparql::translate(query)
458 }
459 #[cfg(all(feature = "graphql", feature = "rdf"))]
460 QueryLanguage::GraphQLRdf => {
461 use crate::query::translators::graphql_rdf;
462 graphql_rdf::translate(query, "http://example.org/")
464 }
465 _ => Err(Error::Internal(format!(
466 "Language {:?} is not an RDF language",
467 language
468 ))),
469 }
470 }
471}
472
473pub(crate) fn annotate_pushdown_hints(
478 op: &mut LogicalOperator,
479 store: &dyn grafeo_core::graph::GraphStore,
480) {
481 #[allow(clippy::wildcard_imports)]
482 use crate::query::plan::*;
483
484 match op {
485 LogicalOperator::Filter(filter) => {
486 annotate_pushdown_hints(&mut filter.input, store);
488
489 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
491 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
492 }
493 }
494 LogicalOperator::NodeScan(op) => {
495 if let Some(input) = &mut op.input {
496 annotate_pushdown_hints(input, store);
497 }
498 }
499 LogicalOperator::EdgeScan(op) => {
500 if let Some(input) = &mut op.input {
501 annotate_pushdown_hints(input, store);
502 }
503 }
504 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
505 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
506 LogicalOperator::Join(op) => {
507 annotate_pushdown_hints(&mut op.left, store);
508 annotate_pushdown_hints(&mut op.right, store);
509 }
510 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
511 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
512 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
513 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
514 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
515 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
516 LogicalOperator::Union(op) => {
517 for input in &mut op.inputs {
518 annotate_pushdown_hints(input, store);
519 }
520 }
521 LogicalOperator::Apply(op) => {
522 annotate_pushdown_hints(&mut op.input, store);
523 annotate_pushdown_hints(&mut op.subplan, store);
524 }
525 LogicalOperator::Otherwise(op) => {
526 annotate_pushdown_hints(&mut op.left, store);
527 annotate_pushdown_hints(&mut op.right, store);
528 }
529 _ => {}
530 }
531}
532
533fn infer_pushdown(
535 predicate: &LogicalExpression,
536 scan: &crate::query::plan::NodeScanOp,
537 store: &dyn grafeo_core::graph::GraphStore,
538) -> Option<crate::query::plan::PushdownHint> {
539 #[allow(clippy::wildcard_imports)]
540 use crate::query::plan::*;
541
542 match predicate {
543 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
545 if let Some(prop) = extract_property_name(left, &scan.variable)
546 .or_else(|| extract_property_name(right, &scan.variable))
547 {
548 if store.has_property_index(&prop) {
549 return Some(PushdownHint::IndexLookup { property: prop });
550 }
551 if scan.label.is_some() {
552 return Some(PushdownHint::LabelFirst);
553 }
554 }
555 None
556 }
557 LogicalExpression::Binary {
559 left,
560 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
561 right,
562 } => {
563 if let Some(prop) = extract_property_name(left, &scan.variable)
564 .or_else(|| extract_property_name(right, &scan.variable))
565 {
566 if store.has_property_index(&prop) {
567 return Some(PushdownHint::RangeScan { property: prop });
568 }
569 if scan.label.is_some() {
570 return Some(PushdownHint::LabelFirst);
571 }
572 }
573 None
574 }
575 LogicalExpression::Binary {
577 left,
578 op: BinaryOp::And,
579 ..
580 } => infer_pushdown(left, scan, store),
581 _ => {
582 if scan.label.is_some() {
584 Some(PushdownHint::LabelFirst)
585 } else {
586 None
587 }
588 }
589 }
590}
591
592fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
595 if let LogicalExpression::Property { variable, property } = expr
596 && variable == scan_var
597 {
598 Some(property.clone())
599 } else {
600 None
601 }
602}
603
604pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
606 let tree_text = plan.root.explain_tree();
607 QueryResult {
608 columns: vec!["plan".to_string()],
609 column_types: vec![grafeo_common::types::LogicalType::String],
610 rows: vec![vec![Value::String(tree_text.into())]],
611 execution_time_ms: None,
612 rows_scanned: None,
613 status_message: None,
614 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
615 }
616}
617
618pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
620 substitute_in_operator(&mut plan.root, params)
621}
622
623fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
625 #[allow(clippy::wildcard_imports)]
626 use crate::query::plan::*;
627
628 match op {
629 LogicalOperator::Filter(filter) => {
630 substitute_in_expression(&mut filter.predicate, params)?;
631 substitute_in_operator(&mut filter.input, params)?;
632 }
633 LogicalOperator::Return(ret) => {
634 for item in &mut ret.items {
635 substitute_in_expression(&mut item.expression, params)?;
636 }
637 substitute_in_operator(&mut ret.input, params)?;
638 }
639 LogicalOperator::Project(proj) => {
640 for p in &mut proj.projections {
641 substitute_in_expression(&mut p.expression, params)?;
642 }
643 substitute_in_operator(&mut proj.input, params)?;
644 }
645 LogicalOperator::NodeScan(scan) => {
646 if let Some(input) = &mut scan.input {
647 substitute_in_operator(input, params)?;
648 }
649 }
650 LogicalOperator::EdgeScan(scan) => {
651 if let Some(input) = &mut scan.input {
652 substitute_in_operator(input, params)?;
653 }
654 }
655 LogicalOperator::Expand(expand) => {
656 substitute_in_operator(&mut expand.input, params)?;
657 }
658 LogicalOperator::Join(join) => {
659 substitute_in_operator(&mut join.left, params)?;
660 substitute_in_operator(&mut join.right, params)?;
661 for cond in &mut join.conditions {
662 substitute_in_expression(&mut cond.left, params)?;
663 substitute_in_expression(&mut cond.right, params)?;
664 }
665 }
666 LogicalOperator::LeftJoin(join) => {
667 substitute_in_operator(&mut join.left, params)?;
668 substitute_in_operator(&mut join.right, params)?;
669 if let Some(cond) = &mut join.condition {
670 substitute_in_expression(cond, params)?;
671 }
672 }
673 LogicalOperator::Aggregate(agg) => {
674 for expr in &mut agg.group_by {
675 substitute_in_expression(expr, params)?;
676 }
677 for agg_expr in &mut agg.aggregates {
678 if let Some(expr) = &mut agg_expr.expression {
679 substitute_in_expression(expr, params)?;
680 }
681 }
682 substitute_in_operator(&mut agg.input, params)?;
683 }
684 LogicalOperator::Sort(sort) => {
685 for key in &mut sort.keys {
686 substitute_in_expression(&mut key.expression, params)?;
687 }
688 substitute_in_operator(&mut sort.input, params)?;
689 }
690 LogicalOperator::Limit(limit) => {
691 resolve_count_param(&mut limit.count, params)?;
692 substitute_in_operator(&mut limit.input, params)?;
693 }
694 LogicalOperator::Skip(skip) => {
695 resolve_count_param(&mut skip.count, params)?;
696 substitute_in_operator(&mut skip.input, params)?;
697 }
698 LogicalOperator::Distinct(distinct) => {
699 substitute_in_operator(&mut distinct.input, params)?;
700 }
701 LogicalOperator::CreateNode(create) => {
702 for (_, expr) in &mut create.properties {
703 substitute_in_expression(expr, params)?;
704 }
705 if let Some(input) = &mut create.input {
706 substitute_in_operator(input, params)?;
707 }
708 }
709 LogicalOperator::CreateEdge(create) => {
710 for (_, expr) in &mut create.properties {
711 substitute_in_expression(expr, params)?;
712 }
713 substitute_in_operator(&mut create.input, params)?;
714 }
715 LogicalOperator::DeleteNode(delete) => {
716 substitute_in_operator(&mut delete.input, params)?;
717 }
718 LogicalOperator::DeleteEdge(delete) => {
719 substitute_in_operator(&mut delete.input, params)?;
720 }
721 LogicalOperator::SetProperty(set) => {
722 for (_, expr) in &mut set.properties {
723 substitute_in_expression(expr, params)?;
724 }
725 substitute_in_operator(&mut set.input, params)?;
726 }
727 LogicalOperator::Union(union) => {
728 for input in &mut union.inputs {
729 substitute_in_operator(input, params)?;
730 }
731 }
732 LogicalOperator::AntiJoin(anti) => {
733 substitute_in_operator(&mut anti.left, params)?;
734 substitute_in_operator(&mut anti.right, params)?;
735 }
736 LogicalOperator::Bind(bind) => {
737 substitute_in_expression(&mut bind.expression, params)?;
738 substitute_in_operator(&mut bind.input, params)?;
739 }
740 LogicalOperator::TripleScan(scan) => {
741 if let Some(input) = &mut scan.input {
742 substitute_in_operator(input, params)?;
743 }
744 }
745 LogicalOperator::Unwind(unwind) => {
746 substitute_in_expression(&mut unwind.expression, params)?;
747 substitute_in_operator(&mut unwind.input, params)?;
748 }
749 LogicalOperator::MapCollect(mc) => {
750 substitute_in_operator(&mut mc.input, params)?;
751 }
752 LogicalOperator::Merge(merge) => {
753 for (_, expr) in &mut merge.match_properties {
754 substitute_in_expression(expr, params)?;
755 }
756 for (_, expr) in &mut merge.on_create {
757 substitute_in_expression(expr, params)?;
758 }
759 for (_, expr) in &mut merge.on_match {
760 substitute_in_expression(expr, params)?;
761 }
762 substitute_in_operator(&mut merge.input, params)?;
763 }
764 LogicalOperator::MergeRelationship(merge_rel) => {
765 for (_, expr) in &mut merge_rel.match_properties {
766 substitute_in_expression(expr, params)?;
767 }
768 for (_, expr) in &mut merge_rel.on_create {
769 substitute_in_expression(expr, params)?;
770 }
771 for (_, expr) in &mut merge_rel.on_match {
772 substitute_in_expression(expr, params)?;
773 }
774 substitute_in_operator(&mut merge_rel.input, params)?;
775 }
776 LogicalOperator::AddLabel(add_label) => {
777 substitute_in_operator(&mut add_label.input, params)?;
778 }
779 LogicalOperator::RemoveLabel(remove_label) => {
780 substitute_in_operator(&mut remove_label.input, params)?;
781 }
782 LogicalOperator::ShortestPath(sp) => {
783 substitute_in_operator(&mut sp.input, params)?;
784 }
785 LogicalOperator::InsertTriple(insert) => {
787 if let Some(ref mut input) = insert.input {
788 substitute_in_operator(input, params)?;
789 }
790 }
791 LogicalOperator::DeleteTriple(delete) => {
792 if let Some(ref mut input) = delete.input {
793 substitute_in_operator(input, params)?;
794 }
795 }
796 LogicalOperator::Modify(modify) => {
797 substitute_in_operator(&mut modify.where_clause, params)?;
798 }
799 LogicalOperator::ClearGraph(_)
800 | LogicalOperator::CreateGraph(_)
801 | LogicalOperator::DropGraph(_)
802 | LogicalOperator::LoadGraph(_)
803 | LogicalOperator::CopyGraph(_)
804 | LogicalOperator::MoveGraph(_)
805 | LogicalOperator::AddGraph(_) => {}
806 LogicalOperator::HorizontalAggregate(op) => {
807 substitute_in_operator(&mut op.input, params)?;
808 }
809 LogicalOperator::Empty => {}
810 LogicalOperator::VectorScan(scan) => {
811 substitute_in_expression(&mut scan.query_vector, params)?;
812 if let Some(ref mut input) = scan.input {
813 substitute_in_operator(input, params)?;
814 }
815 }
816 LogicalOperator::VectorJoin(join) => {
817 substitute_in_expression(&mut join.query_vector, params)?;
818 substitute_in_operator(&mut join.input, params)?;
819 }
820 LogicalOperator::Except(except) => {
821 substitute_in_operator(&mut except.left, params)?;
822 substitute_in_operator(&mut except.right, params)?;
823 }
824 LogicalOperator::Intersect(intersect) => {
825 substitute_in_operator(&mut intersect.left, params)?;
826 substitute_in_operator(&mut intersect.right, params)?;
827 }
828 LogicalOperator::Otherwise(otherwise) => {
829 substitute_in_operator(&mut otherwise.left, params)?;
830 substitute_in_operator(&mut otherwise.right, params)?;
831 }
832 LogicalOperator::Apply(apply) => {
833 substitute_in_operator(&mut apply.input, params)?;
834 substitute_in_operator(&mut apply.subplan, params)?;
835 }
836 LogicalOperator::ParameterScan(_) => {}
838 LogicalOperator::MultiWayJoin(mwj) => {
839 for input in &mut mwj.inputs {
840 substitute_in_operator(input, params)?;
841 }
842 for cond in &mut mwj.conditions {
843 substitute_in_expression(&mut cond.left, params)?;
844 substitute_in_expression(&mut cond.right, params)?;
845 }
846 }
847 LogicalOperator::CreatePropertyGraph(_) => {}
849 LogicalOperator::CallProcedure(_) => {}
851 LogicalOperator::LoadData(_) => {}
853 }
854 Ok(())
855}
856
857fn resolve_count_param(
859 count: &mut crate::query::plan::CountExpr,
860 params: &QueryParams,
861) -> Result<()> {
862 use crate::query::plan::CountExpr;
863 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
864
865 if let CountExpr::Parameter(name) = count {
866 let value = params.get(name.as_str()).ok_or_else(|| {
867 Error::Query(QueryError::new(
868 QueryErrorKind::Semantic,
869 format!("Missing parameter for SKIP/LIMIT: ${name}"),
870 ))
871 })?;
872 let n = match value {
873 Value::Int64(i) if *i >= 0 => *i as usize,
874 Value::Int64(i) => {
875 return Err(Error::Query(QueryError::new(
876 QueryErrorKind::Semantic,
877 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
878 )));
879 }
880 other => {
881 return Err(Error::Query(QueryError::new(
882 QueryErrorKind::Semantic,
883 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
884 )));
885 }
886 };
887 *count = CountExpr::Literal(n);
888 }
889 Ok(())
890}
891
892fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
894 use crate::query::plan::LogicalExpression;
895
896 match expr {
897 LogicalExpression::Parameter(name) => {
898 if let Some(value) = params.get(name) {
899 *expr = LogicalExpression::Literal(value.clone());
900 } else {
901 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
902 }
903 }
904 LogicalExpression::Binary { left, right, .. } => {
905 substitute_in_expression(left, params)?;
906 substitute_in_expression(right, params)?;
907 }
908 LogicalExpression::Unary { operand, .. } => {
909 substitute_in_expression(operand, params)?;
910 }
911 LogicalExpression::FunctionCall { args, .. } => {
912 for arg in args {
913 substitute_in_expression(arg, params)?;
914 }
915 }
916 LogicalExpression::List(items) => {
917 for item in items {
918 substitute_in_expression(item, params)?;
919 }
920 }
921 LogicalExpression::Map(pairs) => {
922 for (_, value) in pairs {
923 substitute_in_expression(value, params)?;
924 }
925 }
926 LogicalExpression::IndexAccess { base, index } => {
927 substitute_in_expression(base, params)?;
928 substitute_in_expression(index, params)?;
929 }
930 LogicalExpression::SliceAccess { base, start, end } => {
931 substitute_in_expression(base, params)?;
932 if let Some(s) = start {
933 substitute_in_expression(s, params)?;
934 }
935 if let Some(e) = end {
936 substitute_in_expression(e, params)?;
937 }
938 }
939 LogicalExpression::Case {
940 operand,
941 when_clauses,
942 else_clause,
943 } => {
944 if let Some(op) = operand {
945 substitute_in_expression(op, params)?;
946 }
947 for (cond, result) in when_clauses {
948 substitute_in_expression(cond, params)?;
949 substitute_in_expression(result, params)?;
950 }
951 if let Some(el) = else_clause {
952 substitute_in_expression(el, params)?;
953 }
954 }
955 LogicalExpression::Property { .. }
956 | LogicalExpression::Variable(_)
957 | LogicalExpression::Literal(_)
958 | LogicalExpression::Labels(_)
959 | LogicalExpression::Type(_)
960 | LogicalExpression::Id(_) => {}
961 LogicalExpression::ListComprehension {
962 list_expr,
963 filter_expr,
964 map_expr,
965 ..
966 } => {
967 substitute_in_expression(list_expr, params)?;
968 if let Some(filter) = filter_expr {
969 substitute_in_expression(filter, params)?;
970 }
971 substitute_in_expression(map_expr, params)?;
972 }
973 LogicalExpression::ListPredicate {
974 list_expr,
975 predicate,
976 ..
977 } => {
978 substitute_in_expression(list_expr, params)?;
979 substitute_in_expression(predicate, params)?;
980 }
981 LogicalExpression::ExistsSubquery(_)
982 | LogicalExpression::CountSubquery(_)
983 | LogicalExpression::ValueSubquery(_) => {
984 }
986 LogicalExpression::PatternComprehension { projection, .. } => {
987 substitute_in_expression(projection, params)?;
988 }
989 LogicalExpression::MapProjection { entries, .. } => {
990 for entry in entries {
991 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
992 substitute_in_expression(expr, params)?;
993 }
994 }
995 }
996 LogicalExpression::Reduce {
997 initial,
998 list,
999 expression,
1000 ..
1001 } => {
1002 substitute_in_expression(initial, params)?;
1003 substitute_in_expression(list, params)?;
1004 substitute_in_expression(expression, params)?;
1005 }
1006 }
1007 Ok(())
1008}
1009
1010#[cfg(test)]
1011mod tests {
1012 use super::*;
1013
1014 #[test]
1015 fn test_query_language_is_lpg() {
1016 #[cfg(feature = "gql")]
1017 assert!(QueryLanguage::Gql.is_lpg());
1018 #[cfg(feature = "cypher")]
1019 assert!(QueryLanguage::Cypher.is_lpg());
1020 #[cfg(feature = "sparql")]
1021 assert!(!QueryLanguage::Sparql.is_lpg());
1022 }
1023
1024 #[test]
1025 fn test_processor_creation() {
1026 let store = Arc::new(LpgStore::new().unwrap());
1027 let processor = QueryProcessor::for_lpg(store);
1028 assert!(processor.lpg_store().node_count() == 0);
1029 }
1030
1031 #[cfg(feature = "gql")]
1032 #[test]
1033 fn test_process_simple_gql() {
1034 let store = Arc::new(LpgStore::new().unwrap());
1035 store.create_node(&["Person"]);
1036 store.create_node(&["Person"]);
1037
1038 let processor = QueryProcessor::for_lpg(store);
1039 let result = processor
1040 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1041 .unwrap();
1042
1043 assert_eq!(result.row_count(), 2);
1044 assert_eq!(result.columns[0], "n");
1045 }
1046
1047 #[cfg(feature = "cypher")]
1048 #[test]
1049 fn test_process_simple_cypher() {
1050 let store = Arc::new(LpgStore::new().unwrap());
1051 store.create_node(&["Person"]);
1052
1053 let processor = QueryProcessor::for_lpg(store);
1054 let result = processor
1055 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1056 .unwrap();
1057
1058 assert_eq!(result.row_count(), 1);
1059 }
1060
1061 #[cfg(feature = "gql")]
1062 #[test]
1063 fn test_process_with_params() {
1064 let store = Arc::new(LpgStore::new().unwrap());
1065 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1066 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1067 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1068
1069 let processor = QueryProcessor::for_lpg(store);
1070
1071 let mut params = HashMap::new();
1073 params.insert("min_age".to_string(), Value::Int64(30));
1074
1075 let result = processor
1076 .process(
1077 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1078 QueryLanguage::Gql,
1079 Some(¶ms),
1080 )
1081 .unwrap();
1082
1083 assert_eq!(result.row_count(), 2);
1085 }
1086
1087 #[cfg(feature = "gql")]
1088 #[test]
1089 fn test_missing_param_error() {
1090 let store = Arc::new(LpgStore::new().unwrap());
1091 store.create_node(&["Person"]);
1092
1093 let processor = QueryProcessor::for_lpg(store);
1094
1095 let params: HashMap<String, Value> = HashMap::new();
1097 let result = processor.process(
1098 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1099 QueryLanguage::Gql,
1100 Some(¶ms),
1101 );
1102
1103 assert!(result.is_err());
1105 let err = result.unwrap_err();
1106 assert!(
1107 err.to_string().contains("Missing parameter"),
1108 "Expected 'Missing parameter' error, got: {}",
1109 err
1110 );
1111 }
1112
1113 #[cfg(feature = "gql")]
1114 #[test]
1115 fn test_params_in_filter_with_property() {
1116 let store = Arc::new(LpgStore::new().unwrap());
1118 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1119 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1120
1121 let processor = QueryProcessor::for_lpg(store);
1122
1123 let mut params = HashMap::new();
1124 params.insert("threshold".to_string(), Value::Int64(15));
1125
1126 let result = processor
1127 .process(
1128 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1129 QueryLanguage::Gql,
1130 Some(¶ms),
1131 )
1132 .unwrap();
1133
1134 assert_eq!(result.row_count(), 1);
1136 let row = &result.rows[0];
1137 assert_eq!(row[0], Value::Int64(20));
1138 }
1139
1140 #[cfg(feature = "gql")]
1141 #[test]
1142 fn test_params_in_multiple_where_conditions() {
1143 let store = Arc::new(LpgStore::new().unwrap());
1145 store.create_node_with_props(
1146 &["Person"],
1147 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1148 );
1149 store.create_node_with_props(
1150 &["Person"],
1151 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1152 );
1153 store.create_node_with_props(
1154 &["Person"],
1155 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1156 );
1157
1158 let processor = QueryProcessor::for_lpg(store);
1159
1160 let mut params = HashMap::new();
1161 params.insert("min_age".to_string(), Value::Int64(30));
1162 params.insert("min_score".to_string(), Value::Int64(75));
1163
1164 let result = processor
1165 .process(
1166 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1167 QueryLanguage::Gql,
1168 Some(¶ms),
1169 )
1170 .unwrap();
1171
1172 assert_eq!(result.row_count(), 1);
1174 }
1175
1176 #[cfg(feature = "gql")]
1177 #[test]
1178 fn test_params_with_in_list() {
1179 let store = Arc::new(LpgStore::new().unwrap());
1181 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1182 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1183 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1184
1185 let processor = QueryProcessor::for_lpg(store);
1186
1187 let mut params = HashMap::new();
1189 params.insert("target".to_string(), Value::String("active".into()));
1190
1191 let result = processor
1192 .process(
1193 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1194 QueryLanguage::Gql,
1195 Some(¶ms),
1196 )
1197 .unwrap();
1198
1199 assert_eq!(result.row_count(), 1);
1200 }
1201
1202 #[cfg(feature = "gql")]
1203 #[test]
1204 fn test_params_same_type_comparison() {
1205 let store = Arc::new(LpgStore::new().unwrap());
1207 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1208 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1209
1210 let processor = QueryProcessor::for_lpg(store);
1211
1212 let mut params = HashMap::new();
1214 params.insert("threshold".to_string(), Value::Int64(75));
1215
1216 let result = processor
1217 .process(
1218 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1219 QueryLanguage::Gql,
1220 Some(¶ms),
1221 )
1222 .unwrap();
1223
1224 assert_eq!(result.row_count(), 1);
1226 }
1227
1228 #[cfg(feature = "gql")]
1229 #[test]
1230 fn test_process_empty_result_has_columns() {
1231 let store = Arc::new(LpgStore::new().unwrap());
1233 let processor = QueryProcessor::for_lpg(store);
1236 let result = processor
1237 .process(
1238 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1239 QueryLanguage::Gql,
1240 None,
1241 )
1242 .unwrap();
1243
1244 assert_eq!(result.row_count(), 0);
1245 assert_eq!(result.columns.len(), 2);
1246 assert_eq!(result.columns[0], "name");
1247 assert_eq!(result.columns[1], "age");
1248 }
1249
1250 #[cfg(feature = "gql")]
1251 #[test]
1252 fn test_params_string_equality() {
1253 let store = Arc::new(LpgStore::new().unwrap());
1255 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1256 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1257 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1258
1259 let processor = QueryProcessor::for_lpg(store);
1260
1261 let mut params = HashMap::new();
1262 params.insert("target".to_string(), Value::String("beta".into()));
1263
1264 let result = processor
1265 .process(
1266 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1267 QueryLanguage::Gql,
1268 Some(¶ms),
1269 )
1270 .unwrap();
1271
1272 assert_eq!(result.row_count(), 1);
1273 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1274 }
1275}