1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 transaction_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 transaction_context: Option<(EpochId, TransactionId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 transaction_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 transaction_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_transaction(
136 store: Arc<LpgStore>,
137 transaction_manager: Arc<TransactionManager>,
138 ) -> Self {
139 let optimizer = Optimizer::from_store(&store);
140 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141 Self {
142 lpg_store: store,
143 graph_store,
144 transaction_manager,
145 catalog: Arc::new(Catalog::new()),
146 optimizer,
147 transaction_context: None,
148 #[cfg(feature = "rdf")]
149 rdf_store: None,
150 }
151 }
152
153 pub fn for_graph_store_with_transaction(
159 store: Arc<dyn GraphStoreMut>,
160 transaction_manager: Arc<TransactionManager>,
161 ) -> Result<Self> {
162 let optimizer = Optimizer::from_graph_store(&*store);
163 Ok(Self {
164 lpg_store: Arc::new(LpgStore::new()?),
165 graph_store: store,
166 transaction_manager,
167 catalog: Arc::new(Catalog::new()),
168 optimizer,
169 transaction_context: None,
170 #[cfg(feature = "rdf")]
171 rdf_store: None,
172 })
173 }
174
175 #[must_use]
179 pub fn with_transaction_context(
180 mut self,
181 viewing_epoch: EpochId,
182 transaction_id: TransactionId,
183 ) -> Self {
184 self.transaction_context = Some((viewing_epoch, transaction_id));
185 self
186 }
187
188 #[must_use]
190 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
191 self.catalog = catalog;
192 self
193 }
194
195 #[must_use]
197 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
198 self.optimizer = optimizer;
199 self
200 }
201
202 pub fn process(
222 &self,
223 query: &str,
224 language: QueryLanguage,
225 params: Option<&QueryParams>,
226 ) -> Result<QueryResult> {
227 if language.is_lpg() {
228 self.process_lpg(query, language, params)
229 } else {
230 #[cfg(feature = "rdf")]
231 {
232 self.process_rdf(query, language, params)
233 }
234 #[cfg(not(feature = "rdf"))]
235 {
236 Err(Error::Internal(
237 "RDF support not enabled. Compile with --features rdf".to_string(),
238 ))
239 }
240 }
241 }
242
243 fn process_lpg(
245 &self,
246 query: &str,
247 language: QueryLanguage,
248 params: Option<&QueryParams>,
249 ) -> Result<QueryResult> {
250 #[cfg(not(target_arch = "wasm32"))]
251 let start_time = std::time::Instant::now();
252
253 let mut logical_plan = self.translate_lpg(query, language)?;
255
256 if let Some(params) = params {
258 substitute_params(&mut logical_plan, params)?;
259 }
260
261 let mut binder = Binder::new();
263 let _binding_context = binder.bind(&logical_plan)?;
264
265 let optimized_plan = self.optimizer.optimize(logical_plan)?;
267
268 if optimized_plan.explain {
270 let mut plan = optimized_plan;
271 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
272 return Ok(explain_result(&plan));
273 }
274
275 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
277 Planner::with_context(
278 Arc::clone(&self.graph_store),
279 Arc::clone(&self.transaction_manager),
280 Some(transaction_id),
281 epoch,
282 )
283 } else {
284 Planner::with_context(
285 Arc::clone(&self.graph_store),
286 Arc::clone(&self.transaction_manager),
287 None,
288 self.transaction_manager.current_epoch(),
289 )
290 };
291 let mut physical_plan = planner.plan(&optimized_plan)?;
292
293 let executor = Executor::with_columns(physical_plan.columns.clone());
295 let mut result = executor.execute(physical_plan.operator.as_mut())?;
296
297 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
300 {
301 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
302 result.execution_time_ms = Some(elapsed_ms);
303 }
304 result.rows_scanned = Some(rows_scanned);
305
306 Ok(result)
307 }
308
309 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
311 match language {
312 #[cfg(feature = "gql")]
313 QueryLanguage::Gql => {
314 use crate::query::translators::gql;
315 gql::translate(query)
316 }
317 #[cfg(feature = "cypher")]
318 QueryLanguage::Cypher => {
319 use crate::query::translators::cypher;
320 cypher::translate(query)
321 }
322 #[cfg(feature = "gremlin")]
323 QueryLanguage::Gremlin => {
324 use crate::query::translators::gremlin;
325 gremlin::translate(query)
326 }
327 #[cfg(feature = "graphql")]
328 QueryLanguage::GraphQL => {
329 use crate::query::translators::graphql;
330 graphql::translate(query)
331 }
332 #[cfg(feature = "sql-pgq")]
333 QueryLanguage::SqlPgq => {
334 use crate::query::translators::sql_pgq;
335 sql_pgq::translate(query)
336 }
337 #[allow(unreachable_patterns)]
338 _ => Err(Error::Internal(format!(
339 "Language {:?} is not an LPG language",
340 language
341 ))),
342 }
343 }
344
345 #[must_use]
347 pub fn lpg_store(&self) -> &Arc<LpgStore> {
348 &self.lpg_store
349 }
350
351 #[must_use]
353 pub fn catalog(&self) -> &Arc<Catalog> {
354 &self.catalog
355 }
356
357 #[must_use]
359 pub fn optimizer(&self) -> &Optimizer {
360 &self.optimizer
361 }
362}
363
364impl QueryProcessor {
365 #[must_use]
367 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
368 &self.transaction_manager
369 }
370}
371
372#[cfg(feature = "rdf")]
377impl QueryProcessor {
378 #[must_use]
380 pub fn with_rdf(
381 lpg_store: Arc<LpgStore>,
382 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
383 ) -> Self {
384 let optimizer = Optimizer::from_store(&lpg_store);
385 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
386 Self {
387 lpg_store,
388 graph_store,
389 transaction_manager: Arc::new(TransactionManager::new()),
390 catalog: Arc::new(Catalog::new()),
391 optimizer,
392 transaction_context: None,
393 rdf_store: Some(rdf_store),
394 }
395 }
396
397 #[must_use]
399 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
400 self.rdf_store.as_ref()
401 }
402
403 fn process_rdf(
405 &self,
406 query: &str,
407 language: QueryLanguage,
408 params: Option<&QueryParams>,
409 ) -> Result<QueryResult> {
410 use crate::query::planner::rdf::RdfPlanner;
411
412 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
413 Error::Internal("RDF store not configured for this processor".to_string())
414 })?;
415
416 let mut logical_plan = self.translate_rdf(query, language)?;
418
419 if let Some(params) = params {
421 substitute_params(&mut logical_plan, params)?;
422 }
423
424 let mut binder = Binder::new();
426 let _binding_context = binder.bind(&logical_plan)?;
427
428 let optimized_plan = self.optimizer.optimize(logical_plan)?;
430
431 if optimized_plan.explain {
433 return Ok(explain_result(&optimized_plan));
434 }
435
436 let planner = RdfPlanner::new(Arc::clone(rdf_store));
438 let mut physical_plan = planner.plan(&optimized_plan)?;
439
440 let executor = Executor::with_columns(physical_plan.columns.clone());
442 executor.execute(physical_plan.operator.as_mut())
443 }
444
445 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
447 match language {
448 #[cfg(feature = "sparql")]
449 QueryLanguage::Sparql => {
450 use crate::query::translators::sparql;
451 sparql::translate(query)
452 }
453 #[cfg(all(feature = "graphql", feature = "rdf"))]
454 QueryLanguage::GraphQLRdf => {
455 use crate::query::translators::graphql_rdf;
456 graphql_rdf::translate(query, "http://example.org/")
458 }
459 _ => Err(Error::Internal(format!(
460 "Language {:?} is not an RDF language",
461 language
462 ))),
463 }
464 }
465}
466
467pub(crate) fn annotate_pushdown_hints(
472 op: &mut LogicalOperator,
473 store: &dyn grafeo_core::graph::GraphStore,
474) {
475 #[allow(clippy::wildcard_imports)]
476 use crate::query::plan::*;
477
478 match op {
479 LogicalOperator::Filter(filter) => {
480 annotate_pushdown_hints(&mut filter.input, store);
482
483 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
485 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
486 }
487 }
488 LogicalOperator::NodeScan(op) => {
489 if let Some(input) = &mut op.input {
490 annotate_pushdown_hints(input, store);
491 }
492 }
493 LogicalOperator::EdgeScan(op) => {
494 if let Some(input) = &mut op.input {
495 annotate_pushdown_hints(input, store);
496 }
497 }
498 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
499 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
500 LogicalOperator::Join(op) => {
501 annotate_pushdown_hints(&mut op.left, store);
502 annotate_pushdown_hints(&mut op.right, store);
503 }
504 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
505 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
506 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
507 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
508 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
509 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
510 LogicalOperator::Union(op) => {
511 for input in &mut op.inputs {
512 annotate_pushdown_hints(input, store);
513 }
514 }
515 LogicalOperator::Apply(op) => {
516 annotate_pushdown_hints(&mut op.input, store);
517 annotate_pushdown_hints(&mut op.subplan, store);
518 }
519 LogicalOperator::Otherwise(op) => {
520 annotate_pushdown_hints(&mut op.left, store);
521 annotate_pushdown_hints(&mut op.right, store);
522 }
523 _ => {}
524 }
525}
526
527fn infer_pushdown(
529 predicate: &LogicalExpression,
530 scan: &crate::query::plan::NodeScanOp,
531 store: &dyn grafeo_core::graph::GraphStore,
532) -> Option<crate::query::plan::PushdownHint> {
533 #[allow(clippy::wildcard_imports)]
534 use crate::query::plan::*;
535
536 match predicate {
537 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
539 if let Some(prop) = extract_property_name(left, &scan.variable)
540 .or_else(|| extract_property_name(right, &scan.variable))
541 {
542 if store.has_property_index(&prop) {
543 return Some(PushdownHint::IndexLookup { property: prop });
544 }
545 if scan.label.is_some() {
546 return Some(PushdownHint::LabelFirst);
547 }
548 }
549 None
550 }
551 LogicalExpression::Binary {
553 left,
554 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
555 right,
556 } => {
557 if let Some(prop) = extract_property_name(left, &scan.variable)
558 .or_else(|| extract_property_name(right, &scan.variable))
559 {
560 if store.has_property_index(&prop) {
561 return Some(PushdownHint::RangeScan { property: prop });
562 }
563 if scan.label.is_some() {
564 return Some(PushdownHint::LabelFirst);
565 }
566 }
567 None
568 }
569 LogicalExpression::Binary {
571 left,
572 op: BinaryOp::And,
573 ..
574 } => infer_pushdown(left, scan, store),
575 _ => {
576 if scan.label.is_some() {
578 Some(PushdownHint::LabelFirst)
579 } else {
580 None
581 }
582 }
583 }
584}
585
586fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
589 if let LogicalExpression::Property { variable, property } = expr
590 && variable == scan_var
591 {
592 Some(property.clone())
593 } else {
594 None
595 }
596}
597
598pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
600 let tree_text = plan.root.explain_tree();
601 QueryResult {
602 columns: vec!["plan".to_string()],
603 column_types: vec![grafeo_common::types::LogicalType::String],
604 rows: vec![vec![Value::String(tree_text.into())]],
605 execution_time_ms: None,
606 rows_scanned: None,
607 status_message: None,
608 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
609 }
610}
611
612pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
614 substitute_in_operator(&mut plan.root, params)
615}
616
617fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
619 #[allow(clippy::wildcard_imports)]
620 use crate::query::plan::*;
621
622 match op {
623 LogicalOperator::Filter(filter) => {
624 substitute_in_expression(&mut filter.predicate, params)?;
625 substitute_in_operator(&mut filter.input, params)?;
626 }
627 LogicalOperator::Return(ret) => {
628 for item in &mut ret.items {
629 substitute_in_expression(&mut item.expression, params)?;
630 }
631 substitute_in_operator(&mut ret.input, params)?;
632 }
633 LogicalOperator::Project(proj) => {
634 for p in &mut proj.projections {
635 substitute_in_expression(&mut p.expression, params)?;
636 }
637 substitute_in_operator(&mut proj.input, params)?;
638 }
639 LogicalOperator::NodeScan(scan) => {
640 if let Some(input) = &mut scan.input {
641 substitute_in_operator(input, params)?;
642 }
643 }
644 LogicalOperator::EdgeScan(scan) => {
645 if let Some(input) = &mut scan.input {
646 substitute_in_operator(input, params)?;
647 }
648 }
649 LogicalOperator::Expand(expand) => {
650 substitute_in_operator(&mut expand.input, params)?;
651 }
652 LogicalOperator::Join(join) => {
653 substitute_in_operator(&mut join.left, params)?;
654 substitute_in_operator(&mut join.right, params)?;
655 for cond in &mut join.conditions {
656 substitute_in_expression(&mut cond.left, params)?;
657 substitute_in_expression(&mut cond.right, params)?;
658 }
659 }
660 LogicalOperator::LeftJoin(join) => {
661 substitute_in_operator(&mut join.left, params)?;
662 substitute_in_operator(&mut join.right, params)?;
663 if let Some(cond) = &mut join.condition {
664 substitute_in_expression(cond, params)?;
665 }
666 }
667 LogicalOperator::Aggregate(agg) => {
668 for expr in &mut agg.group_by {
669 substitute_in_expression(expr, params)?;
670 }
671 for agg_expr in &mut agg.aggregates {
672 if let Some(expr) = &mut agg_expr.expression {
673 substitute_in_expression(expr, params)?;
674 }
675 }
676 substitute_in_operator(&mut agg.input, params)?;
677 }
678 LogicalOperator::Sort(sort) => {
679 for key in &mut sort.keys {
680 substitute_in_expression(&mut key.expression, params)?;
681 }
682 substitute_in_operator(&mut sort.input, params)?;
683 }
684 LogicalOperator::Limit(limit) => {
685 resolve_count_param(&mut limit.count, params)?;
686 substitute_in_operator(&mut limit.input, params)?;
687 }
688 LogicalOperator::Skip(skip) => {
689 resolve_count_param(&mut skip.count, params)?;
690 substitute_in_operator(&mut skip.input, params)?;
691 }
692 LogicalOperator::Distinct(distinct) => {
693 substitute_in_operator(&mut distinct.input, params)?;
694 }
695 LogicalOperator::CreateNode(create) => {
696 for (_, expr) in &mut create.properties {
697 substitute_in_expression(expr, params)?;
698 }
699 if let Some(input) = &mut create.input {
700 substitute_in_operator(input, params)?;
701 }
702 }
703 LogicalOperator::CreateEdge(create) => {
704 for (_, expr) in &mut create.properties {
705 substitute_in_expression(expr, params)?;
706 }
707 substitute_in_operator(&mut create.input, params)?;
708 }
709 LogicalOperator::DeleteNode(delete) => {
710 substitute_in_operator(&mut delete.input, params)?;
711 }
712 LogicalOperator::DeleteEdge(delete) => {
713 substitute_in_operator(&mut delete.input, params)?;
714 }
715 LogicalOperator::SetProperty(set) => {
716 for (_, expr) in &mut set.properties {
717 substitute_in_expression(expr, params)?;
718 }
719 substitute_in_operator(&mut set.input, params)?;
720 }
721 LogicalOperator::Union(union) => {
722 for input in &mut union.inputs {
723 substitute_in_operator(input, params)?;
724 }
725 }
726 LogicalOperator::AntiJoin(anti) => {
727 substitute_in_operator(&mut anti.left, params)?;
728 substitute_in_operator(&mut anti.right, params)?;
729 }
730 LogicalOperator::Bind(bind) => {
731 substitute_in_expression(&mut bind.expression, params)?;
732 substitute_in_operator(&mut bind.input, params)?;
733 }
734 LogicalOperator::TripleScan(scan) => {
735 if let Some(input) = &mut scan.input {
736 substitute_in_operator(input, params)?;
737 }
738 }
739 LogicalOperator::Unwind(unwind) => {
740 substitute_in_expression(&mut unwind.expression, params)?;
741 substitute_in_operator(&mut unwind.input, params)?;
742 }
743 LogicalOperator::MapCollect(mc) => {
744 substitute_in_operator(&mut mc.input, params)?;
745 }
746 LogicalOperator::Merge(merge) => {
747 for (_, expr) in &mut merge.match_properties {
748 substitute_in_expression(expr, params)?;
749 }
750 for (_, expr) in &mut merge.on_create {
751 substitute_in_expression(expr, params)?;
752 }
753 for (_, expr) in &mut merge.on_match {
754 substitute_in_expression(expr, params)?;
755 }
756 substitute_in_operator(&mut merge.input, params)?;
757 }
758 LogicalOperator::MergeRelationship(merge_rel) => {
759 for (_, expr) in &mut merge_rel.match_properties {
760 substitute_in_expression(expr, params)?;
761 }
762 for (_, expr) in &mut merge_rel.on_create {
763 substitute_in_expression(expr, params)?;
764 }
765 for (_, expr) in &mut merge_rel.on_match {
766 substitute_in_expression(expr, params)?;
767 }
768 substitute_in_operator(&mut merge_rel.input, params)?;
769 }
770 LogicalOperator::AddLabel(add_label) => {
771 substitute_in_operator(&mut add_label.input, params)?;
772 }
773 LogicalOperator::RemoveLabel(remove_label) => {
774 substitute_in_operator(&mut remove_label.input, params)?;
775 }
776 LogicalOperator::ShortestPath(sp) => {
777 substitute_in_operator(&mut sp.input, params)?;
778 }
779 LogicalOperator::InsertTriple(insert) => {
781 if let Some(ref mut input) = insert.input {
782 substitute_in_operator(input, params)?;
783 }
784 }
785 LogicalOperator::DeleteTriple(delete) => {
786 if let Some(ref mut input) = delete.input {
787 substitute_in_operator(input, params)?;
788 }
789 }
790 LogicalOperator::Modify(modify) => {
791 substitute_in_operator(&mut modify.where_clause, params)?;
792 }
793 LogicalOperator::ClearGraph(_)
794 | LogicalOperator::CreateGraph(_)
795 | LogicalOperator::DropGraph(_)
796 | LogicalOperator::LoadGraph(_)
797 | LogicalOperator::CopyGraph(_)
798 | LogicalOperator::MoveGraph(_)
799 | LogicalOperator::AddGraph(_) => {}
800 LogicalOperator::HorizontalAggregate(op) => {
801 substitute_in_operator(&mut op.input, params)?;
802 }
803 LogicalOperator::Empty => {}
804 LogicalOperator::VectorScan(scan) => {
805 substitute_in_expression(&mut scan.query_vector, params)?;
806 if let Some(ref mut input) = scan.input {
807 substitute_in_operator(input, params)?;
808 }
809 }
810 LogicalOperator::VectorJoin(join) => {
811 substitute_in_expression(&mut join.query_vector, params)?;
812 substitute_in_operator(&mut join.input, params)?;
813 }
814 LogicalOperator::Except(except) => {
815 substitute_in_operator(&mut except.left, params)?;
816 substitute_in_operator(&mut except.right, params)?;
817 }
818 LogicalOperator::Intersect(intersect) => {
819 substitute_in_operator(&mut intersect.left, params)?;
820 substitute_in_operator(&mut intersect.right, params)?;
821 }
822 LogicalOperator::Otherwise(otherwise) => {
823 substitute_in_operator(&mut otherwise.left, params)?;
824 substitute_in_operator(&mut otherwise.right, params)?;
825 }
826 LogicalOperator::Apply(apply) => {
827 substitute_in_operator(&mut apply.input, params)?;
828 substitute_in_operator(&mut apply.subplan, params)?;
829 }
830 LogicalOperator::ParameterScan(_) => {}
832 LogicalOperator::MultiWayJoin(mwj) => {
833 for input in &mut mwj.inputs {
834 substitute_in_operator(input, params)?;
835 }
836 for cond in &mut mwj.conditions {
837 substitute_in_expression(&mut cond.left, params)?;
838 substitute_in_expression(&mut cond.right, params)?;
839 }
840 }
841 LogicalOperator::CreatePropertyGraph(_) => {}
843 LogicalOperator::CallProcedure(_) => {}
845 LogicalOperator::LoadData(_) => {}
847 }
848 Ok(())
849}
850
851fn resolve_count_param(
853 count: &mut crate::query::plan::CountExpr,
854 params: &QueryParams,
855) -> Result<()> {
856 use crate::query::plan::CountExpr;
857 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
858
859 if let CountExpr::Parameter(name) = count {
860 let value = params.get(name.as_str()).ok_or_else(|| {
861 Error::Query(QueryError::new(
862 QueryErrorKind::Semantic,
863 format!("Missing parameter for SKIP/LIMIT: ${name}"),
864 ))
865 })?;
866 let n = match value {
867 Value::Int64(i) if *i >= 0 => *i as usize,
868 Value::Int64(i) => {
869 return Err(Error::Query(QueryError::new(
870 QueryErrorKind::Semantic,
871 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
872 )));
873 }
874 other => {
875 return Err(Error::Query(QueryError::new(
876 QueryErrorKind::Semantic,
877 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
878 )));
879 }
880 };
881 *count = CountExpr::Literal(n);
882 }
883 Ok(())
884}
885
886fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
888 use crate::query::plan::LogicalExpression;
889
890 match expr {
891 LogicalExpression::Parameter(name) => {
892 if let Some(value) = params.get(name) {
893 *expr = LogicalExpression::Literal(value.clone());
894 } else {
895 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
896 }
897 }
898 LogicalExpression::Binary { left, right, .. } => {
899 substitute_in_expression(left, params)?;
900 substitute_in_expression(right, params)?;
901 }
902 LogicalExpression::Unary { operand, .. } => {
903 substitute_in_expression(operand, params)?;
904 }
905 LogicalExpression::FunctionCall { args, .. } => {
906 for arg in args {
907 substitute_in_expression(arg, params)?;
908 }
909 }
910 LogicalExpression::List(items) => {
911 for item in items {
912 substitute_in_expression(item, params)?;
913 }
914 }
915 LogicalExpression::Map(pairs) => {
916 for (_, value) in pairs {
917 substitute_in_expression(value, params)?;
918 }
919 }
920 LogicalExpression::IndexAccess { base, index } => {
921 substitute_in_expression(base, params)?;
922 substitute_in_expression(index, params)?;
923 }
924 LogicalExpression::SliceAccess { base, start, end } => {
925 substitute_in_expression(base, params)?;
926 if let Some(s) = start {
927 substitute_in_expression(s, params)?;
928 }
929 if let Some(e) = end {
930 substitute_in_expression(e, params)?;
931 }
932 }
933 LogicalExpression::Case {
934 operand,
935 when_clauses,
936 else_clause,
937 } => {
938 if let Some(op) = operand {
939 substitute_in_expression(op, params)?;
940 }
941 for (cond, result) in when_clauses {
942 substitute_in_expression(cond, params)?;
943 substitute_in_expression(result, params)?;
944 }
945 if let Some(el) = else_clause {
946 substitute_in_expression(el, params)?;
947 }
948 }
949 LogicalExpression::Property { .. }
950 | LogicalExpression::Variable(_)
951 | LogicalExpression::Literal(_)
952 | LogicalExpression::Labels(_)
953 | LogicalExpression::Type(_)
954 | LogicalExpression::Id(_) => {}
955 LogicalExpression::ListComprehension {
956 list_expr,
957 filter_expr,
958 map_expr,
959 ..
960 } => {
961 substitute_in_expression(list_expr, params)?;
962 if let Some(filter) = filter_expr {
963 substitute_in_expression(filter, params)?;
964 }
965 substitute_in_expression(map_expr, params)?;
966 }
967 LogicalExpression::ListPredicate {
968 list_expr,
969 predicate,
970 ..
971 } => {
972 substitute_in_expression(list_expr, params)?;
973 substitute_in_expression(predicate, params)?;
974 }
975 LogicalExpression::ExistsSubquery(_)
976 | LogicalExpression::CountSubquery(_)
977 | LogicalExpression::ValueSubquery(_) => {
978 }
980 LogicalExpression::PatternComprehension { projection, .. } => {
981 substitute_in_expression(projection, params)?;
982 }
983 LogicalExpression::MapProjection { entries, .. } => {
984 for entry in entries {
985 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
986 substitute_in_expression(expr, params)?;
987 }
988 }
989 }
990 LogicalExpression::Reduce {
991 initial,
992 list,
993 expression,
994 ..
995 } => {
996 substitute_in_expression(initial, params)?;
997 substitute_in_expression(list, params)?;
998 substitute_in_expression(expression, params)?;
999 }
1000 }
1001 Ok(())
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007
1008 #[test]
1009 fn test_query_language_is_lpg() {
1010 #[cfg(feature = "gql")]
1011 assert!(QueryLanguage::Gql.is_lpg());
1012 #[cfg(feature = "cypher")]
1013 assert!(QueryLanguage::Cypher.is_lpg());
1014 #[cfg(feature = "sparql")]
1015 assert!(!QueryLanguage::Sparql.is_lpg());
1016 }
1017
1018 #[test]
1019 fn test_processor_creation() {
1020 let store = Arc::new(LpgStore::new().unwrap());
1021 let processor = QueryProcessor::for_lpg(store);
1022 assert!(processor.lpg_store().node_count() == 0);
1023 }
1024
1025 #[cfg(feature = "gql")]
1026 #[test]
1027 fn test_process_simple_gql() {
1028 let store = Arc::new(LpgStore::new().unwrap());
1029 store.create_node(&["Person"]);
1030 store.create_node(&["Person"]);
1031
1032 let processor = QueryProcessor::for_lpg(store);
1033 let result = processor
1034 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1035 .unwrap();
1036
1037 assert_eq!(result.row_count(), 2);
1038 assert_eq!(result.columns[0], "n");
1039 }
1040
1041 #[cfg(feature = "cypher")]
1042 #[test]
1043 fn test_process_simple_cypher() {
1044 let store = Arc::new(LpgStore::new().unwrap());
1045 store.create_node(&["Person"]);
1046
1047 let processor = QueryProcessor::for_lpg(store);
1048 let result = processor
1049 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1050 .unwrap();
1051
1052 assert_eq!(result.row_count(), 1);
1053 }
1054
1055 #[cfg(feature = "gql")]
1056 #[test]
1057 fn test_process_with_params() {
1058 let store = Arc::new(LpgStore::new().unwrap());
1059 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1060 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1061 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1062
1063 let processor = QueryProcessor::for_lpg(store);
1064
1065 let mut params = HashMap::new();
1067 params.insert("min_age".to_string(), Value::Int64(30));
1068
1069 let result = processor
1070 .process(
1071 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1072 QueryLanguage::Gql,
1073 Some(¶ms),
1074 )
1075 .unwrap();
1076
1077 assert_eq!(result.row_count(), 2);
1079 }
1080
1081 #[cfg(feature = "gql")]
1082 #[test]
1083 fn test_missing_param_error() {
1084 let store = Arc::new(LpgStore::new().unwrap());
1085 store.create_node(&["Person"]);
1086
1087 let processor = QueryProcessor::for_lpg(store);
1088
1089 let params: HashMap<String, Value> = HashMap::new();
1091 let result = processor.process(
1092 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1093 QueryLanguage::Gql,
1094 Some(¶ms),
1095 );
1096
1097 assert!(result.is_err());
1099 let err = result.unwrap_err();
1100 assert!(
1101 err.to_string().contains("Missing parameter"),
1102 "Expected 'Missing parameter' error, got: {}",
1103 err
1104 );
1105 }
1106
1107 #[cfg(feature = "gql")]
1108 #[test]
1109 fn test_params_in_filter_with_property() {
1110 let store = Arc::new(LpgStore::new().unwrap());
1112 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1113 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1114
1115 let processor = QueryProcessor::for_lpg(store);
1116
1117 let mut params = HashMap::new();
1118 params.insert("threshold".to_string(), Value::Int64(15));
1119
1120 let result = processor
1121 .process(
1122 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1123 QueryLanguage::Gql,
1124 Some(¶ms),
1125 )
1126 .unwrap();
1127
1128 assert_eq!(result.row_count(), 1);
1130 let row = &result.rows[0];
1131 assert_eq!(row[0], Value::Int64(20));
1132 }
1133
1134 #[cfg(feature = "gql")]
1135 #[test]
1136 fn test_params_in_multiple_where_conditions() {
1137 let store = Arc::new(LpgStore::new().unwrap());
1139 store.create_node_with_props(
1140 &["Person"],
1141 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1142 );
1143 store.create_node_with_props(
1144 &["Person"],
1145 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1146 );
1147 store.create_node_with_props(
1148 &["Person"],
1149 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1150 );
1151
1152 let processor = QueryProcessor::for_lpg(store);
1153
1154 let mut params = HashMap::new();
1155 params.insert("min_age".to_string(), Value::Int64(30));
1156 params.insert("min_score".to_string(), Value::Int64(75));
1157
1158 let result = processor
1159 .process(
1160 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1161 QueryLanguage::Gql,
1162 Some(¶ms),
1163 )
1164 .unwrap();
1165
1166 assert_eq!(result.row_count(), 1);
1168 }
1169
1170 #[cfg(feature = "gql")]
1171 #[test]
1172 fn test_params_with_in_list() {
1173 let store = Arc::new(LpgStore::new().unwrap());
1175 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1176 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1177 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1178
1179 let processor = QueryProcessor::for_lpg(store);
1180
1181 let mut params = HashMap::new();
1183 params.insert("target".to_string(), Value::String("active".into()));
1184
1185 let result = processor
1186 .process(
1187 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1188 QueryLanguage::Gql,
1189 Some(¶ms),
1190 )
1191 .unwrap();
1192
1193 assert_eq!(result.row_count(), 1);
1194 }
1195
1196 #[cfg(feature = "gql")]
1197 #[test]
1198 fn test_params_same_type_comparison() {
1199 let store = Arc::new(LpgStore::new().unwrap());
1201 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1202 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1203
1204 let processor = QueryProcessor::for_lpg(store);
1205
1206 let mut params = HashMap::new();
1208 params.insert("threshold".to_string(), Value::Int64(75));
1209
1210 let result = processor
1211 .process(
1212 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1213 QueryLanguage::Gql,
1214 Some(¶ms),
1215 )
1216 .unwrap();
1217
1218 assert_eq!(result.row_count(), 1);
1220 }
1221
1222 #[cfg(feature = "gql")]
1223 #[test]
1224 fn test_process_empty_result_has_columns() {
1225 let store = Arc::new(LpgStore::new().unwrap());
1227 let processor = QueryProcessor::for_lpg(store);
1230 let result = processor
1231 .process(
1232 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1233 QueryLanguage::Gql,
1234 None,
1235 )
1236 .unwrap();
1237
1238 assert_eq!(result.row_count(), 0);
1239 assert_eq!(result.columns.len(), 2);
1240 assert_eq!(result.columns[0], "name");
1241 assert_eq!(result.columns[1], "age");
1242 }
1243
1244 #[cfg(feature = "gql")]
1245 #[test]
1246 fn test_params_string_equality() {
1247 let store = Arc::new(LpgStore::new().unwrap());
1249 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1250 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1251 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1252
1253 let processor = QueryProcessor::for_lpg(store);
1254
1255 let mut params = HashMap::new();
1256 params.insert("target".to_string(), Value::String("beta".into()));
1257
1258 let result = processor
1259 .process(
1260 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1261 QueryLanguage::Gql,
1262 Some(¶ms),
1263 )
1264 .unwrap();
1265
1266 assert_eq!(result.row_count(), 1);
1267 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1268 }
1269}