1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::GraphStoreMut;
16use grafeo_core::graph::lpg::LpgStore;
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum QueryLanguage {
30 #[cfg(feature = "gql")]
32 Gql,
33 #[cfg(feature = "cypher")]
35 Cypher,
36 #[cfg(feature = "gremlin")]
38 Gremlin,
39 #[cfg(feature = "graphql")]
41 GraphQL,
42 #[cfg(feature = "sql-pgq")]
44 SqlPgq,
45 #[cfg(feature = "sparql")]
47 Sparql,
48 #[cfg(all(feature = "graphql", feature = "rdf"))]
50 GraphQLRdf,
51}
52
53impl QueryLanguage {
54 #[must_use]
56 pub const fn is_lpg(&self) -> bool {
57 match self {
58 #[cfg(feature = "gql")]
59 Self::Gql => true,
60 #[cfg(feature = "cypher")]
61 Self::Cypher => true,
62 #[cfg(feature = "gremlin")]
63 Self::Gremlin => true,
64 #[cfg(feature = "graphql")]
65 Self::GraphQL => true,
66 #[cfg(feature = "sql-pgq")]
67 Self::SqlPgq => true,
68 #[cfg(feature = "sparql")]
69 Self::Sparql => false,
70 #[cfg(all(feature = "graphql", feature = "rdf"))]
71 Self::GraphQLRdf => false,
72 }
73 }
74}
75
76pub type QueryParams = HashMap<String, Value>;
78
79pub struct QueryProcessor {
99 lpg_store: Arc<LpgStore>,
101 graph_store: Arc<dyn GraphStoreMut>,
103 transaction_manager: Arc<TransactionManager>,
105 catalog: Arc<Catalog>,
107 optimizer: Optimizer,
109 transaction_context: Option<(EpochId, TransactionId)>,
111 #[cfg(feature = "rdf")]
113 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
114}
115
116impl QueryProcessor {
117 #[must_use]
119 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
120 let optimizer = Optimizer::from_store(&store);
121 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
122 Self {
123 lpg_store: store,
124 graph_store,
125 transaction_manager: Arc::new(TransactionManager::new()),
126 catalog: Arc::new(Catalog::new()),
127 optimizer,
128 transaction_context: None,
129 #[cfg(feature = "rdf")]
130 rdf_store: None,
131 }
132 }
133
134 #[must_use]
136 pub fn for_lpg_with_transaction(
137 store: Arc<LpgStore>,
138 transaction_manager: Arc<TransactionManager>,
139 ) -> Self {
140 let optimizer = Optimizer::from_store(&store);
141 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
142 Self {
143 lpg_store: store,
144 graph_store,
145 transaction_manager,
146 catalog: Arc::new(Catalog::new()),
147 optimizer,
148 transaction_context: None,
149 #[cfg(feature = "rdf")]
150 rdf_store: None,
151 }
152 }
153
154 pub fn for_graph_store_with_transaction(
160 store: Arc<dyn GraphStoreMut>,
161 transaction_manager: Arc<TransactionManager>,
162 ) -> Result<Self> {
163 let optimizer = Optimizer::from_graph_store(&*store);
164 Ok(Self {
165 lpg_store: Arc::new(LpgStore::new()?),
166 graph_store: store,
167 transaction_manager,
168 catalog: Arc::new(Catalog::new()),
169 optimizer,
170 transaction_context: None,
171 #[cfg(feature = "rdf")]
172 rdf_store: None,
173 })
174 }
175
176 #[must_use]
180 pub fn with_transaction_context(
181 mut self,
182 viewing_epoch: EpochId,
183 transaction_id: TransactionId,
184 ) -> Self {
185 self.transaction_context = Some((viewing_epoch, transaction_id));
186 self
187 }
188
189 #[must_use]
191 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
192 self.catalog = catalog;
193 self
194 }
195
196 #[must_use]
198 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
199 self.optimizer = optimizer;
200 self
201 }
202
203 pub fn process(
223 &self,
224 query: &str,
225 language: QueryLanguage,
226 params: Option<&QueryParams>,
227 ) -> Result<QueryResult> {
228 if language.is_lpg() {
229 self.process_lpg(query, language, params)
230 } else {
231 #[cfg(feature = "rdf")]
232 {
233 self.process_rdf(query, language, params)
234 }
235 #[cfg(not(feature = "rdf"))]
236 {
237 Err(Error::Internal(
238 "RDF support not enabled. Compile with --features rdf".to_string(),
239 ))
240 }
241 }
242 }
243
244 fn process_lpg(
246 &self,
247 query: &str,
248 language: QueryLanguage,
249 params: Option<&QueryParams>,
250 ) -> Result<QueryResult> {
251 #[cfg(not(target_arch = "wasm32"))]
252 let start_time = std::time::Instant::now();
253
254 let mut logical_plan = self.translate_lpg(query, language)?;
256
257 if let Some(params) = params {
259 substitute_params(&mut logical_plan, params)?;
260 }
261
262 let mut binder = Binder::new();
264 let _binding_context = binder.bind(&logical_plan)?;
265
266 let optimized_plan = self.optimizer.optimize(logical_plan)?;
268
269 if optimized_plan.explain {
271 let mut plan = optimized_plan;
272 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
273 return Ok(explain_result(&plan));
274 }
275
276 let is_read_only =
280 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
281 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
282 Planner::with_context(
283 Arc::clone(&self.graph_store),
284 Arc::clone(&self.transaction_manager),
285 Some(transaction_id),
286 epoch,
287 )
288 } else {
289 Planner::with_context(
290 Arc::clone(&self.graph_store),
291 Arc::clone(&self.transaction_manager),
292 None,
293 self.transaction_manager.current_epoch(),
294 )
295 }
296 .with_read_only(is_read_only);
297 let mut physical_plan = planner.plan(&optimized_plan)?;
298
299 let executor = Executor::with_columns(physical_plan.columns.clone());
301 let mut result = executor.execute(physical_plan.operator.as_mut())?;
302
303 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
306 {
307 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
308 result.execution_time_ms = Some(elapsed_ms);
309 }
310 result.rows_scanned = Some(rows_scanned);
311
312 Ok(result)
313 }
314
315 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
317 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
318 match language {
319 #[cfg(feature = "gql")]
320 QueryLanguage::Gql => {
321 use crate::query::translators::gql;
322 gql::translate(query)
323 }
324 #[cfg(feature = "cypher")]
325 QueryLanguage::Cypher => {
326 use crate::query::translators::cypher;
327 cypher::translate(query)
328 }
329 #[cfg(feature = "gremlin")]
330 QueryLanguage::Gremlin => {
331 use crate::query::translators::gremlin;
332 gremlin::translate(query)
333 }
334 #[cfg(feature = "graphql")]
335 QueryLanguage::GraphQL => {
336 use crate::query::translators::graphql;
337 graphql::translate(query)
338 }
339 #[cfg(feature = "sql-pgq")]
340 QueryLanguage::SqlPgq => {
341 use crate::query::translators::sql_pgq;
342 sql_pgq::translate(query)
343 }
344 #[allow(unreachable_patterns)]
345 _ => Err(Error::Internal(format!(
346 "Language {:?} is not an LPG language",
347 language
348 ))),
349 }
350 }
351
352 #[must_use]
354 pub fn lpg_store(&self) -> &Arc<LpgStore> {
355 &self.lpg_store
356 }
357
358 #[must_use]
360 pub fn catalog(&self) -> &Arc<Catalog> {
361 &self.catalog
362 }
363
364 #[must_use]
366 pub fn optimizer(&self) -> &Optimizer {
367 &self.optimizer
368 }
369}
370
371impl QueryProcessor {
372 #[must_use]
374 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
375 &self.transaction_manager
376 }
377}
378
379#[cfg(feature = "rdf")]
384impl QueryProcessor {
385 #[must_use]
387 pub fn with_rdf(
388 lpg_store: Arc<LpgStore>,
389 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
390 ) -> Self {
391 let optimizer = Optimizer::from_store(&lpg_store);
392 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
393 Self {
394 lpg_store,
395 graph_store,
396 transaction_manager: Arc::new(TransactionManager::new()),
397 catalog: Arc::new(Catalog::new()),
398 optimizer,
399 transaction_context: None,
400 rdf_store: Some(rdf_store),
401 }
402 }
403
404 #[must_use]
406 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
407 self.rdf_store.as_ref()
408 }
409
410 fn process_rdf(
412 &self,
413 query: &str,
414 language: QueryLanguage,
415 params: Option<&QueryParams>,
416 ) -> Result<QueryResult> {
417 use crate::query::planner::rdf::RdfPlanner;
418
419 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
420 Error::Internal("RDF store not configured for this processor".to_string())
421 })?;
422
423 let mut logical_plan = self.translate_rdf(query, language)?;
425
426 if let Some(params) = params {
428 substitute_params(&mut logical_plan, params)?;
429 }
430
431 let mut binder = Binder::new();
433 let _binding_context = binder.bind(&logical_plan)?;
434
435 let optimized_plan = self.optimizer.optimize(logical_plan)?;
437
438 if optimized_plan.explain {
440 return Ok(explain_result(&optimized_plan));
441 }
442
443 let planner = RdfPlanner::new(Arc::clone(rdf_store));
445 let mut physical_plan = planner.plan(&optimized_plan)?;
446
447 let executor = Executor::with_columns(physical_plan.columns.clone());
449 executor.execute(physical_plan.operator.as_mut())
450 }
451
452 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
454 match language {
455 #[cfg(feature = "sparql")]
456 QueryLanguage::Sparql => {
457 use crate::query::translators::sparql;
458 sparql::translate(query)
459 }
460 #[cfg(all(feature = "graphql", feature = "rdf"))]
461 QueryLanguage::GraphQLRdf => {
462 use crate::query::translators::graphql_rdf;
463 graphql_rdf::translate(query, "http://example.org/")
465 }
466 _ => Err(Error::Internal(format!(
467 "Language {:?} is not an RDF language",
468 language
469 ))),
470 }
471 }
472}
473
474pub(crate) fn annotate_pushdown_hints(
479 op: &mut LogicalOperator,
480 store: &dyn grafeo_core::graph::GraphStore,
481) {
482 #[allow(clippy::wildcard_imports)]
483 use crate::query::plan::*;
484
485 match op {
486 LogicalOperator::Filter(filter) => {
487 annotate_pushdown_hints(&mut filter.input, store);
489
490 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
492 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
493 }
494 }
495 LogicalOperator::NodeScan(op) => {
496 if let Some(input) = &mut op.input {
497 annotate_pushdown_hints(input, store);
498 }
499 }
500 LogicalOperator::EdgeScan(op) => {
501 if let Some(input) = &mut op.input {
502 annotate_pushdown_hints(input, store);
503 }
504 }
505 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
506 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
507 LogicalOperator::Join(op) => {
508 annotate_pushdown_hints(&mut op.left, store);
509 annotate_pushdown_hints(&mut op.right, store);
510 }
511 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
512 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
513 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
514 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
515 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
516 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
517 LogicalOperator::Union(op) => {
518 for input in &mut op.inputs {
519 annotate_pushdown_hints(input, store);
520 }
521 }
522 LogicalOperator::Apply(op) => {
523 annotate_pushdown_hints(&mut op.input, store);
524 annotate_pushdown_hints(&mut op.subplan, store);
525 }
526 LogicalOperator::Otherwise(op) => {
527 annotate_pushdown_hints(&mut op.left, store);
528 annotate_pushdown_hints(&mut op.right, store);
529 }
530 _ => {}
531 }
532}
533
534fn infer_pushdown(
536 predicate: &LogicalExpression,
537 scan: &crate::query::plan::NodeScanOp,
538 store: &dyn grafeo_core::graph::GraphStore,
539) -> Option<crate::query::plan::PushdownHint> {
540 #[allow(clippy::wildcard_imports)]
541 use crate::query::plan::*;
542
543 match predicate {
544 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
546 if let Some(prop) = extract_property_name(left, &scan.variable)
547 .or_else(|| extract_property_name(right, &scan.variable))
548 {
549 if store.has_property_index(&prop) {
550 return Some(PushdownHint::IndexLookup { property: prop });
551 }
552 if scan.label.is_some() {
553 return Some(PushdownHint::LabelFirst);
554 }
555 }
556 None
557 }
558 LogicalExpression::Binary {
560 left,
561 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
562 right,
563 } => {
564 if let Some(prop) = extract_property_name(left, &scan.variable)
565 .or_else(|| extract_property_name(right, &scan.variable))
566 {
567 if store.has_property_index(&prop) {
568 return Some(PushdownHint::RangeScan { property: prop });
569 }
570 if scan.label.is_some() {
571 return Some(PushdownHint::LabelFirst);
572 }
573 }
574 None
575 }
576 LogicalExpression::Binary {
578 left,
579 op: BinaryOp::And,
580 ..
581 } => infer_pushdown(left, scan, store),
582 _ => {
583 if scan.label.is_some() {
585 Some(PushdownHint::LabelFirst)
586 } else {
587 None
588 }
589 }
590 }
591}
592
593fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
596 if let LogicalExpression::Property { variable, property } = expr
597 && variable == scan_var
598 {
599 Some(property.clone())
600 } else {
601 None
602 }
603}
604
605pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
607 let tree_text = plan.root.explain_tree();
608 QueryResult {
609 columns: vec!["plan".to_string()],
610 column_types: vec![grafeo_common::types::LogicalType::String],
611 rows: vec![vec![Value::String(tree_text.into())]],
612 execution_time_ms: None,
613 rows_scanned: None,
614 status_message: None,
615 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
616 }
617}
618
619pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
621 substitute_in_operator(&mut plan.root, params)
622}
623
624fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
626 #[allow(clippy::wildcard_imports)]
627 use crate::query::plan::*;
628
629 match op {
630 LogicalOperator::Filter(filter) => {
631 substitute_in_expression(&mut filter.predicate, params)?;
632 substitute_in_operator(&mut filter.input, params)?;
633 }
634 LogicalOperator::Return(ret) => {
635 for item in &mut ret.items {
636 substitute_in_expression(&mut item.expression, params)?;
637 }
638 substitute_in_operator(&mut ret.input, params)?;
639 }
640 LogicalOperator::Project(proj) => {
641 for p in &mut proj.projections {
642 substitute_in_expression(&mut p.expression, params)?;
643 }
644 substitute_in_operator(&mut proj.input, params)?;
645 }
646 LogicalOperator::NodeScan(scan) => {
647 if let Some(input) = &mut scan.input {
648 substitute_in_operator(input, params)?;
649 }
650 }
651 LogicalOperator::EdgeScan(scan) => {
652 if let Some(input) = &mut scan.input {
653 substitute_in_operator(input, params)?;
654 }
655 }
656 LogicalOperator::Expand(expand) => {
657 substitute_in_operator(&mut expand.input, params)?;
658 }
659 LogicalOperator::Join(join) => {
660 substitute_in_operator(&mut join.left, params)?;
661 substitute_in_operator(&mut join.right, params)?;
662 for cond in &mut join.conditions {
663 substitute_in_expression(&mut cond.left, params)?;
664 substitute_in_expression(&mut cond.right, params)?;
665 }
666 }
667 LogicalOperator::LeftJoin(join) => {
668 substitute_in_operator(&mut join.left, params)?;
669 substitute_in_operator(&mut join.right, params)?;
670 if let Some(cond) = &mut join.condition {
671 substitute_in_expression(cond, params)?;
672 }
673 }
674 LogicalOperator::Aggregate(agg) => {
675 for expr in &mut agg.group_by {
676 substitute_in_expression(expr, params)?;
677 }
678 for agg_expr in &mut agg.aggregates {
679 if let Some(expr) = &mut agg_expr.expression {
680 substitute_in_expression(expr, params)?;
681 }
682 }
683 substitute_in_operator(&mut agg.input, params)?;
684 }
685 LogicalOperator::Sort(sort) => {
686 for key in &mut sort.keys {
687 substitute_in_expression(&mut key.expression, params)?;
688 }
689 substitute_in_operator(&mut sort.input, params)?;
690 }
691 LogicalOperator::Limit(limit) => {
692 resolve_count_param(&mut limit.count, params)?;
693 substitute_in_operator(&mut limit.input, params)?;
694 }
695 LogicalOperator::Skip(skip) => {
696 resolve_count_param(&mut skip.count, params)?;
697 substitute_in_operator(&mut skip.input, params)?;
698 }
699 LogicalOperator::Distinct(distinct) => {
700 substitute_in_operator(&mut distinct.input, params)?;
701 }
702 LogicalOperator::CreateNode(create) => {
703 for (_, expr) in &mut create.properties {
704 substitute_in_expression(expr, params)?;
705 }
706 if let Some(input) = &mut create.input {
707 substitute_in_operator(input, params)?;
708 }
709 }
710 LogicalOperator::CreateEdge(create) => {
711 for (_, expr) in &mut create.properties {
712 substitute_in_expression(expr, params)?;
713 }
714 substitute_in_operator(&mut create.input, params)?;
715 }
716 LogicalOperator::DeleteNode(delete) => {
717 substitute_in_operator(&mut delete.input, params)?;
718 }
719 LogicalOperator::DeleteEdge(delete) => {
720 substitute_in_operator(&mut delete.input, params)?;
721 }
722 LogicalOperator::SetProperty(set) => {
723 for (_, expr) in &mut set.properties {
724 substitute_in_expression(expr, params)?;
725 }
726 substitute_in_operator(&mut set.input, params)?;
727 }
728 LogicalOperator::Union(union) => {
729 for input in &mut union.inputs {
730 substitute_in_operator(input, params)?;
731 }
732 }
733 LogicalOperator::AntiJoin(anti) => {
734 substitute_in_operator(&mut anti.left, params)?;
735 substitute_in_operator(&mut anti.right, params)?;
736 }
737 LogicalOperator::Bind(bind) => {
738 substitute_in_expression(&mut bind.expression, params)?;
739 substitute_in_operator(&mut bind.input, params)?;
740 }
741 LogicalOperator::TripleScan(scan) => {
742 if let Some(input) = &mut scan.input {
743 substitute_in_operator(input, params)?;
744 }
745 }
746 LogicalOperator::Unwind(unwind) => {
747 substitute_in_expression(&mut unwind.expression, params)?;
748 substitute_in_operator(&mut unwind.input, params)?;
749 }
750 LogicalOperator::MapCollect(mc) => {
751 substitute_in_operator(&mut mc.input, params)?;
752 }
753 LogicalOperator::Merge(merge) => {
754 for (_, expr) in &mut merge.match_properties {
755 substitute_in_expression(expr, params)?;
756 }
757 for (_, expr) in &mut merge.on_create {
758 substitute_in_expression(expr, params)?;
759 }
760 for (_, expr) in &mut merge.on_match {
761 substitute_in_expression(expr, params)?;
762 }
763 substitute_in_operator(&mut merge.input, params)?;
764 }
765 LogicalOperator::MergeRelationship(merge_rel) => {
766 for (_, expr) in &mut merge_rel.match_properties {
767 substitute_in_expression(expr, params)?;
768 }
769 for (_, expr) in &mut merge_rel.on_create {
770 substitute_in_expression(expr, params)?;
771 }
772 for (_, expr) in &mut merge_rel.on_match {
773 substitute_in_expression(expr, params)?;
774 }
775 substitute_in_operator(&mut merge_rel.input, params)?;
776 }
777 LogicalOperator::AddLabel(add_label) => {
778 substitute_in_operator(&mut add_label.input, params)?;
779 }
780 LogicalOperator::RemoveLabel(remove_label) => {
781 substitute_in_operator(&mut remove_label.input, params)?;
782 }
783 LogicalOperator::ShortestPath(sp) => {
784 substitute_in_operator(&mut sp.input, params)?;
785 }
786 LogicalOperator::InsertTriple(insert) => {
788 if let Some(ref mut input) = insert.input {
789 substitute_in_operator(input, params)?;
790 }
791 }
792 LogicalOperator::DeleteTriple(delete) => {
793 if let Some(ref mut input) = delete.input {
794 substitute_in_operator(input, params)?;
795 }
796 }
797 LogicalOperator::Modify(modify) => {
798 substitute_in_operator(&mut modify.where_clause, params)?;
799 }
800 LogicalOperator::ClearGraph(_)
801 | LogicalOperator::CreateGraph(_)
802 | LogicalOperator::DropGraph(_)
803 | LogicalOperator::LoadGraph(_)
804 | LogicalOperator::CopyGraph(_)
805 | LogicalOperator::MoveGraph(_)
806 | LogicalOperator::AddGraph(_) => {}
807 LogicalOperator::HorizontalAggregate(op) => {
808 substitute_in_operator(&mut op.input, params)?;
809 }
810 LogicalOperator::Empty => {}
811 LogicalOperator::VectorScan(scan) => {
812 substitute_in_expression(&mut scan.query_vector, params)?;
813 if let Some(ref mut input) = scan.input {
814 substitute_in_operator(input, params)?;
815 }
816 }
817 LogicalOperator::VectorJoin(join) => {
818 substitute_in_expression(&mut join.query_vector, params)?;
819 substitute_in_operator(&mut join.input, params)?;
820 }
821 LogicalOperator::Except(except) => {
822 substitute_in_operator(&mut except.left, params)?;
823 substitute_in_operator(&mut except.right, params)?;
824 }
825 LogicalOperator::Intersect(intersect) => {
826 substitute_in_operator(&mut intersect.left, params)?;
827 substitute_in_operator(&mut intersect.right, params)?;
828 }
829 LogicalOperator::Otherwise(otherwise) => {
830 substitute_in_operator(&mut otherwise.left, params)?;
831 substitute_in_operator(&mut otherwise.right, params)?;
832 }
833 LogicalOperator::Apply(apply) => {
834 substitute_in_operator(&mut apply.input, params)?;
835 substitute_in_operator(&mut apply.subplan, params)?;
836 }
837 LogicalOperator::ParameterScan(_) => {}
839 LogicalOperator::MultiWayJoin(mwj) => {
840 for input in &mut mwj.inputs {
841 substitute_in_operator(input, params)?;
842 }
843 for cond in &mut mwj.conditions {
844 substitute_in_expression(&mut cond.left, params)?;
845 substitute_in_expression(&mut cond.right, params)?;
846 }
847 }
848 LogicalOperator::CreatePropertyGraph(_) => {}
850 LogicalOperator::CallProcedure(_) => {}
852 LogicalOperator::LoadData(_) => {}
854 }
855 Ok(())
856}
857
858fn resolve_count_param(
860 count: &mut crate::query::plan::CountExpr,
861 params: &QueryParams,
862) -> Result<()> {
863 use crate::query::plan::CountExpr;
864 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
865
866 if let CountExpr::Parameter(name) = count {
867 let value = params.get(name.as_str()).ok_or_else(|| {
868 Error::Query(QueryError::new(
869 QueryErrorKind::Semantic,
870 format!("Missing parameter for SKIP/LIMIT: ${name}"),
871 ))
872 })?;
873 let n = match value {
874 Value::Int64(i) if *i >= 0 => *i as usize,
875 Value::Int64(i) => {
876 return Err(Error::Query(QueryError::new(
877 QueryErrorKind::Semantic,
878 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
879 )));
880 }
881 other => {
882 return Err(Error::Query(QueryError::new(
883 QueryErrorKind::Semantic,
884 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
885 )));
886 }
887 };
888 *count = CountExpr::Literal(n);
889 }
890 Ok(())
891}
892
893fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
895 use crate::query::plan::LogicalExpression;
896
897 match expr {
898 LogicalExpression::Parameter(name) => {
899 if let Some(value) = params.get(name) {
900 *expr = LogicalExpression::Literal(value.clone());
901 } else {
902 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
903 }
904 }
905 LogicalExpression::Binary { left, right, .. } => {
906 substitute_in_expression(left, params)?;
907 substitute_in_expression(right, params)?;
908 }
909 LogicalExpression::Unary { operand, .. } => {
910 substitute_in_expression(operand, params)?;
911 }
912 LogicalExpression::FunctionCall { args, .. } => {
913 for arg in args {
914 substitute_in_expression(arg, params)?;
915 }
916 }
917 LogicalExpression::List(items) => {
918 for item in items {
919 substitute_in_expression(item, params)?;
920 }
921 }
922 LogicalExpression::Map(pairs) => {
923 for (_, value) in pairs {
924 substitute_in_expression(value, params)?;
925 }
926 }
927 LogicalExpression::IndexAccess { base, index } => {
928 substitute_in_expression(base, params)?;
929 substitute_in_expression(index, params)?;
930 }
931 LogicalExpression::SliceAccess { base, start, end } => {
932 substitute_in_expression(base, params)?;
933 if let Some(s) = start {
934 substitute_in_expression(s, params)?;
935 }
936 if let Some(e) = end {
937 substitute_in_expression(e, params)?;
938 }
939 }
940 LogicalExpression::Case {
941 operand,
942 when_clauses,
943 else_clause,
944 } => {
945 if let Some(op) = operand {
946 substitute_in_expression(op, params)?;
947 }
948 for (cond, result) in when_clauses {
949 substitute_in_expression(cond, params)?;
950 substitute_in_expression(result, params)?;
951 }
952 if let Some(el) = else_clause {
953 substitute_in_expression(el, params)?;
954 }
955 }
956 LogicalExpression::Property { .. }
957 | LogicalExpression::Variable(_)
958 | LogicalExpression::Literal(_)
959 | LogicalExpression::Labels(_)
960 | LogicalExpression::Type(_)
961 | LogicalExpression::Id(_) => {}
962 LogicalExpression::ListComprehension {
963 list_expr,
964 filter_expr,
965 map_expr,
966 ..
967 } => {
968 substitute_in_expression(list_expr, params)?;
969 if let Some(filter) = filter_expr {
970 substitute_in_expression(filter, params)?;
971 }
972 substitute_in_expression(map_expr, params)?;
973 }
974 LogicalExpression::ListPredicate {
975 list_expr,
976 predicate,
977 ..
978 } => {
979 substitute_in_expression(list_expr, params)?;
980 substitute_in_expression(predicate, params)?;
981 }
982 LogicalExpression::ExistsSubquery(_)
983 | LogicalExpression::CountSubquery(_)
984 | LogicalExpression::ValueSubquery(_) => {
985 }
987 LogicalExpression::PatternComprehension { projection, .. } => {
988 substitute_in_expression(projection, params)?;
989 }
990 LogicalExpression::MapProjection { entries, .. } => {
991 for entry in entries {
992 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
993 substitute_in_expression(expr, params)?;
994 }
995 }
996 }
997 LogicalExpression::Reduce {
998 initial,
999 list,
1000 expression,
1001 ..
1002 } => {
1003 substitute_in_expression(initial, params)?;
1004 substitute_in_expression(list, params)?;
1005 substitute_in_expression(expression, params)?;
1006 }
1007 }
1008 Ok(())
1009}
1010
1011#[cfg(test)]
1012mod tests {
1013 use super::*;
1014
1015 #[test]
1016 fn test_query_language_is_lpg() {
1017 #[cfg(feature = "gql")]
1018 assert!(QueryLanguage::Gql.is_lpg());
1019 #[cfg(feature = "cypher")]
1020 assert!(QueryLanguage::Cypher.is_lpg());
1021 #[cfg(feature = "sparql")]
1022 assert!(!QueryLanguage::Sparql.is_lpg());
1023 }
1024
1025 #[test]
1026 fn test_processor_creation() {
1027 let store = Arc::new(LpgStore::new().unwrap());
1028 let processor = QueryProcessor::for_lpg(store);
1029 assert!(processor.lpg_store().node_count() == 0);
1030 }
1031
1032 #[cfg(feature = "gql")]
1033 #[test]
1034 fn test_process_simple_gql() {
1035 let store = Arc::new(LpgStore::new().unwrap());
1036 store.create_node(&["Person"]);
1037 store.create_node(&["Person"]);
1038
1039 let processor = QueryProcessor::for_lpg(store);
1040 let result = processor
1041 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1042 .unwrap();
1043
1044 assert_eq!(result.row_count(), 2);
1045 assert_eq!(result.columns[0], "n");
1046 }
1047
1048 #[cfg(feature = "cypher")]
1049 #[test]
1050 fn test_process_simple_cypher() {
1051 let store = Arc::new(LpgStore::new().unwrap());
1052 store.create_node(&["Person"]);
1053
1054 let processor = QueryProcessor::for_lpg(store);
1055 let result = processor
1056 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1057 .unwrap();
1058
1059 assert_eq!(result.row_count(), 1);
1060 }
1061
1062 #[cfg(feature = "gql")]
1063 #[test]
1064 fn test_process_with_params() {
1065 let store = Arc::new(LpgStore::new().unwrap());
1066 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1067 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1068 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1069
1070 let processor = QueryProcessor::for_lpg(store);
1071
1072 let mut params = HashMap::new();
1074 params.insert("min_age".to_string(), Value::Int64(30));
1075
1076 let result = processor
1077 .process(
1078 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1079 QueryLanguage::Gql,
1080 Some(¶ms),
1081 )
1082 .unwrap();
1083
1084 assert_eq!(result.row_count(), 2);
1086 }
1087
1088 #[cfg(feature = "gql")]
1089 #[test]
1090 fn test_missing_param_error() {
1091 let store = Arc::new(LpgStore::new().unwrap());
1092 store.create_node(&["Person"]);
1093
1094 let processor = QueryProcessor::for_lpg(store);
1095
1096 let params: HashMap<String, Value> = HashMap::new();
1098 let result = processor.process(
1099 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1100 QueryLanguage::Gql,
1101 Some(¶ms),
1102 );
1103
1104 assert!(result.is_err());
1106 let err = result.unwrap_err();
1107 assert!(
1108 err.to_string().contains("Missing parameter"),
1109 "Expected 'Missing parameter' error, got: {}",
1110 err
1111 );
1112 }
1113
1114 #[cfg(feature = "gql")]
1115 #[test]
1116 fn test_params_in_filter_with_property() {
1117 let store = Arc::new(LpgStore::new().unwrap());
1119 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1120 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1121
1122 let processor = QueryProcessor::for_lpg(store);
1123
1124 let mut params = HashMap::new();
1125 params.insert("threshold".to_string(), Value::Int64(15));
1126
1127 let result = processor
1128 .process(
1129 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1130 QueryLanguage::Gql,
1131 Some(¶ms),
1132 )
1133 .unwrap();
1134
1135 assert_eq!(result.row_count(), 1);
1137 let row = &result.rows[0];
1138 assert_eq!(row[0], Value::Int64(20));
1139 }
1140
1141 #[cfg(feature = "gql")]
1142 #[test]
1143 fn test_params_in_multiple_where_conditions() {
1144 let store = Arc::new(LpgStore::new().unwrap());
1146 store.create_node_with_props(
1147 &["Person"],
1148 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1149 );
1150 store.create_node_with_props(
1151 &["Person"],
1152 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1153 );
1154 store.create_node_with_props(
1155 &["Person"],
1156 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1157 );
1158
1159 let processor = QueryProcessor::for_lpg(store);
1160
1161 let mut params = HashMap::new();
1162 params.insert("min_age".to_string(), Value::Int64(30));
1163 params.insert("min_score".to_string(), Value::Int64(75));
1164
1165 let result = processor
1166 .process(
1167 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1168 QueryLanguage::Gql,
1169 Some(¶ms),
1170 )
1171 .unwrap();
1172
1173 assert_eq!(result.row_count(), 1);
1175 }
1176
1177 #[cfg(feature = "gql")]
1178 #[test]
1179 fn test_params_with_in_list() {
1180 let store = Arc::new(LpgStore::new().unwrap());
1182 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1183 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1184 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1185
1186 let processor = QueryProcessor::for_lpg(store);
1187
1188 let mut params = HashMap::new();
1190 params.insert("target".to_string(), Value::String("active".into()));
1191
1192 let result = processor
1193 .process(
1194 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1195 QueryLanguage::Gql,
1196 Some(¶ms),
1197 )
1198 .unwrap();
1199
1200 assert_eq!(result.row_count(), 1);
1201 }
1202
1203 #[cfg(feature = "gql")]
1204 #[test]
1205 fn test_params_same_type_comparison() {
1206 let store = Arc::new(LpgStore::new().unwrap());
1208 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1209 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1210
1211 let processor = QueryProcessor::for_lpg(store);
1212
1213 let mut params = HashMap::new();
1215 params.insert("threshold".to_string(), Value::Int64(75));
1216
1217 let result = processor
1218 .process(
1219 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1220 QueryLanguage::Gql,
1221 Some(¶ms),
1222 )
1223 .unwrap();
1224
1225 assert_eq!(result.row_count(), 1);
1227 }
1228
1229 #[cfg(feature = "gql")]
1230 #[test]
1231 fn test_process_empty_result_has_columns() {
1232 let store = Arc::new(LpgStore::new().unwrap());
1234 let processor = QueryProcessor::for_lpg(store);
1237 let result = processor
1238 .process(
1239 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1240 QueryLanguage::Gql,
1241 None,
1242 )
1243 .unwrap();
1244
1245 assert_eq!(result.row_count(), 0);
1246 assert_eq!(result.columns.len(), 2);
1247 assert_eq!(result.columns[0], "name");
1248 assert_eq!(result.columns[1], "age");
1249 }
1250
1251 #[cfg(feature = "gql")]
1252 #[test]
1253 fn test_params_string_equality() {
1254 let store = Arc::new(LpgStore::new().unwrap());
1256 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1257 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1258 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1259
1260 let processor = QueryProcessor::for_lpg(store);
1261
1262 let mut params = HashMap::new();
1263 params.insert("target".to_string(), Value::String("beta".into()));
1264
1265 let result = processor
1266 .process(
1267 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1268 QueryLanguage::Gql,
1269 Some(¶ms),
1270 )
1271 .unwrap();
1272
1273 assert_eq!(result.row_count(), 1);
1274 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1275 }
1276}