1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 transaction_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 transaction_context: Option<(EpochId, TransactionId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 transaction_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 transaction_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_transaction(
136 store: Arc<LpgStore>,
137 transaction_manager: Arc<TransactionManager>,
138 ) -> Self {
139 let optimizer = Optimizer::from_store(&store);
140 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141 Self {
142 lpg_store: store,
143 graph_store,
144 transaction_manager,
145 catalog: Arc::new(Catalog::new()),
146 optimizer,
147 transaction_context: None,
148 #[cfg(feature = "rdf")]
149 rdf_store: None,
150 }
151 }
152
153 pub fn for_graph_store_with_transaction(
159 store: Arc<dyn GraphStoreMut>,
160 transaction_manager: Arc<TransactionManager>,
161 ) -> Result<Self> {
162 let optimizer = Optimizer::from_graph_store(&*store);
163 Ok(Self {
164 lpg_store: Arc::new(LpgStore::new()?),
165 graph_store: store,
166 transaction_manager,
167 catalog: Arc::new(Catalog::new()),
168 optimizer,
169 transaction_context: None,
170 #[cfg(feature = "rdf")]
171 rdf_store: None,
172 })
173 }
174
175 #[cfg(feature = "rdf")]
177 #[must_use]
178 pub fn with_rdf(
179 lpg_store: Arc<LpgStore>,
180 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
181 ) -> Self {
182 let optimizer = Optimizer::from_store(&lpg_store);
183 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
184 Self {
185 lpg_store,
186 graph_store,
187 transaction_manager: Arc::new(TransactionManager::new()),
188 catalog: Arc::new(Catalog::new()),
189 optimizer,
190 transaction_context: None,
191 rdf_store: Some(rdf_store),
192 }
193 }
194
195 #[must_use]
199 pub fn with_transaction_context(
200 mut self,
201 viewing_epoch: EpochId,
202 transaction_id: TransactionId,
203 ) -> Self {
204 self.transaction_context = Some((viewing_epoch, transaction_id));
205 self
206 }
207
208 #[must_use]
210 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
211 self.catalog = catalog;
212 self
213 }
214
215 #[must_use]
217 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
218 self.optimizer = optimizer;
219 self
220 }
221
222 pub fn process(
242 &self,
243 query: &str,
244 language: QueryLanguage,
245 params: Option<&QueryParams>,
246 ) -> Result<QueryResult> {
247 if language.is_lpg() {
248 self.process_lpg(query, language, params)
249 } else {
250 #[cfg(feature = "rdf")]
251 {
252 self.process_rdf(query, language, params)
253 }
254 #[cfg(not(feature = "rdf"))]
255 {
256 Err(Error::Internal(
257 "RDF support not enabled. Compile with --features rdf".to_string(),
258 ))
259 }
260 }
261 }
262
263 fn process_lpg(
265 &self,
266 query: &str,
267 language: QueryLanguage,
268 params: Option<&QueryParams>,
269 ) -> Result<QueryResult> {
270 #[cfg(not(target_arch = "wasm32"))]
271 let start_time = std::time::Instant::now();
272
273 let mut logical_plan = self.translate_lpg(query, language)?;
275
276 if let Some(params) = params {
278 substitute_params(&mut logical_plan, params)?;
279 }
280
281 let mut binder = Binder::new();
283 let _binding_context = binder.bind(&logical_plan)?;
284
285 let optimized_plan = self.optimizer.optimize(logical_plan)?;
287
288 if optimized_plan.explain {
290 let mut plan = optimized_plan;
291 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
292 return Ok(explain_result(&plan));
293 }
294
295 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
297 Planner::with_context(
298 Arc::clone(&self.graph_store),
299 Arc::clone(&self.transaction_manager),
300 Some(transaction_id),
301 epoch,
302 )
303 } else {
304 Planner::with_context(
305 Arc::clone(&self.graph_store),
306 Arc::clone(&self.transaction_manager),
307 None,
308 self.transaction_manager.current_epoch(),
309 )
310 };
311 let mut physical_plan = planner.plan(&optimized_plan)?;
312
313 let executor = Executor::with_columns(physical_plan.columns.clone());
315 let mut result = executor.execute(physical_plan.operator.as_mut())?;
316
317 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
320 {
321 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
322 result.execution_time_ms = Some(elapsed_ms);
323 }
324 result.rows_scanned = Some(rows_scanned);
325
326 Ok(result)
327 }
328
329 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
331 match language {
332 #[cfg(feature = "gql")]
333 QueryLanguage::Gql => {
334 use crate::query::translators::gql;
335 gql::translate(query)
336 }
337 #[cfg(feature = "cypher")]
338 QueryLanguage::Cypher => {
339 use crate::query::translators::cypher;
340 cypher::translate(query)
341 }
342 #[cfg(feature = "gremlin")]
343 QueryLanguage::Gremlin => {
344 use crate::query::translators::gremlin;
345 gremlin::translate(query)
346 }
347 #[cfg(feature = "graphql")]
348 QueryLanguage::GraphQL => {
349 use crate::query::translators::graphql;
350 graphql::translate(query)
351 }
352 #[cfg(feature = "sql-pgq")]
353 QueryLanguage::SqlPgq => {
354 use crate::query::translators::sql_pgq;
355 sql_pgq::translate(query)
356 }
357 #[allow(unreachable_patterns)]
358 _ => Err(Error::Internal(format!(
359 "Language {:?} is not an LPG language",
360 language
361 ))),
362 }
363 }
364
365 #[cfg(feature = "rdf")]
367 fn process_rdf(
368 &self,
369 query: &str,
370 language: QueryLanguage,
371 params: Option<&QueryParams>,
372 ) -> Result<QueryResult> {
373 use crate::query::planner::rdf::RdfPlanner;
374
375 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
376 Error::Internal("RDF store not configured for this processor".to_string())
377 })?;
378
379 let mut logical_plan = self.translate_rdf(query, language)?;
381
382 if let Some(params) = params {
384 substitute_params(&mut logical_plan, params)?;
385 }
386
387 let mut binder = Binder::new();
389 let _binding_context = binder.bind(&logical_plan)?;
390
391 let optimized_plan = self.optimizer.optimize(logical_plan)?;
393
394 if optimized_plan.explain {
396 return Ok(explain_result(&optimized_plan));
397 }
398
399 let planner = RdfPlanner::new(Arc::clone(rdf_store));
401 let mut physical_plan = planner.plan(&optimized_plan)?;
402
403 let executor = Executor::with_columns(physical_plan.columns.clone());
405 executor.execute(physical_plan.operator.as_mut())
406 }
407
408 #[cfg(feature = "rdf")]
410 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
411 match language {
412 #[cfg(feature = "sparql")]
413 QueryLanguage::Sparql => {
414 use crate::query::translators::sparql;
415 sparql::translate(query)
416 }
417 #[cfg(all(feature = "graphql", feature = "rdf"))]
418 QueryLanguage::GraphQLRdf => {
419 use crate::query::translators::graphql_rdf;
420 graphql_rdf::translate(query, "http://example.org/")
422 }
423 _ => Err(Error::Internal(format!(
424 "Language {:?} is not an RDF language",
425 language
426 ))),
427 }
428 }
429
430 #[must_use]
432 pub fn lpg_store(&self) -> &Arc<LpgStore> {
433 &self.lpg_store
434 }
435
436 #[must_use]
438 pub fn catalog(&self) -> &Arc<Catalog> {
439 &self.catalog
440 }
441
442 #[must_use]
444 pub fn optimizer(&self) -> &Optimizer {
445 &self.optimizer
446 }
447
448 #[cfg(feature = "rdf")]
450 #[must_use]
451 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
452 self.rdf_store.as_ref()
453 }
454}
455
456impl QueryProcessor {
457 #[must_use]
459 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
460 &self.transaction_manager
461 }
462}
463
464pub(crate) fn annotate_pushdown_hints(
469 op: &mut LogicalOperator,
470 store: &dyn grafeo_core::graph::GraphStore,
471) {
472 #[allow(clippy::wildcard_imports)]
473 use crate::query::plan::*;
474
475 match op {
476 LogicalOperator::Filter(filter) => {
477 annotate_pushdown_hints(&mut filter.input, store);
479
480 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
482 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
483 }
484 }
485 LogicalOperator::NodeScan(op) => {
486 if let Some(input) = &mut op.input {
487 annotate_pushdown_hints(input, store);
488 }
489 }
490 LogicalOperator::EdgeScan(op) => {
491 if let Some(input) = &mut op.input {
492 annotate_pushdown_hints(input, store);
493 }
494 }
495 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
496 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
497 LogicalOperator::Join(op) => {
498 annotate_pushdown_hints(&mut op.left, store);
499 annotate_pushdown_hints(&mut op.right, store);
500 }
501 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
502 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
503 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
504 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
505 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
506 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
507 LogicalOperator::Union(op) => {
508 for input in &mut op.inputs {
509 annotate_pushdown_hints(input, store);
510 }
511 }
512 LogicalOperator::Apply(op) => {
513 annotate_pushdown_hints(&mut op.input, store);
514 annotate_pushdown_hints(&mut op.subplan, store);
515 }
516 LogicalOperator::Otherwise(op) => {
517 annotate_pushdown_hints(&mut op.left, store);
518 annotate_pushdown_hints(&mut op.right, store);
519 }
520 _ => {}
521 }
522}
523
524fn infer_pushdown(
526 predicate: &LogicalExpression,
527 scan: &crate::query::plan::NodeScanOp,
528 store: &dyn grafeo_core::graph::GraphStore,
529) -> Option<crate::query::plan::PushdownHint> {
530 #[allow(clippy::wildcard_imports)]
531 use crate::query::plan::*;
532
533 match predicate {
534 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
536 if let Some(prop) = extract_property_name(left, &scan.variable)
537 .or_else(|| extract_property_name(right, &scan.variable))
538 {
539 if store.has_property_index(&prop) {
540 return Some(PushdownHint::IndexLookup { property: prop });
541 }
542 if scan.label.is_some() {
543 return Some(PushdownHint::LabelFirst);
544 }
545 }
546 None
547 }
548 LogicalExpression::Binary {
550 left,
551 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
552 right,
553 } => {
554 if let Some(prop) = extract_property_name(left, &scan.variable)
555 .or_else(|| extract_property_name(right, &scan.variable))
556 {
557 if store.has_property_index(&prop) {
558 return Some(PushdownHint::RangeScan { property: prop });
559 }
560 if scan.label.is_some() {
561 return Some(PushdownHint::LabelFirst);
562 }
563 }
564 None
565 }
566 LogicalExpression::Binary {
568 left,
569 op: BinaryOp::And,
570 ..
571 } => infer_pushdown(left, scan, store),
572 _ => {
573 if scan.label.is_some() {
575 Some(PushdownHint::LabelFirst)
576 } else {
577 None
578 }
579 }
580 }
581}
582
583fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
586 if let LogicalExpression::Property { variable, property } = expr
587 && variable == scan_var
588 {
589 Some(property.clone())
590 } else {
591 None
592 }
593}
594
595pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
597 let tree_text = plan.root.explain_tree();
598 QueryResult {
599 columns: vec!["plan".to_string()],
600 column_types: vec![grafeo_common::types::LogicalType::String],
601 rows: vec![vec![Value::String(tree_text.into())]],
602 execution_time_ms: None,
603 rows_scanned: None,
604 status_message: None,
605 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
606 }
607}
608
609pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
611 substitute_in_operator(&mut plan.root, params)
612}
613
614fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
616 #[allow(clippy::wildcard_imports)]
617 use crate::query::plan::*;
618
619 match op {
620 LogicalOperator::Filter(filter) => {
621 substitute_in_expression(&mut filter.predicate, params)?;
622 substitute_in_operator(&mut filter.input, params)?;
623 }
624 LogicalOperator::Return(ret) => {
625 for item in &mut ret.items {
626 substitute_in_expression(&mut item.expression, params)?;
627 }
628 substitute_in_operator(&mut ret.input, params)?;
629 }
630 LogicalOperator::Project(proj) => {
631 for p in &mut proj.projections {
632 substitute_in_expression(&mut p.expression, params)?;
633 }
634 substitute_in_operator(&mut proj.input, params)?;
635 }
636 LogicalOperator::NodeScan(scan) => {
637 if let Some(input) = &mut scan.input {
638 substitute_in_operator(input, params)?;
639 }
640 }
641 LogicalOperator::EdgeScan(scan) => {
642 if let Some(input) = &mut scan.input {
643 substitute_in_operator(input, params)?;
644 }
645 }
646 LogicalOperator::Expand(expand) => {
647 substitute_in_operator(&mut expand.input, params)?;
648 }
649 LogicalOperator::Join(join) => {
650 substitute_in_operator(&mut join.left, params)?;
651 substitute_in_operator(&mut join.right, params)?;
652 for cond in &mut join.conditions {
653 substitute_in_expression(&mut cond.left, params)?;
654 substitute_in_expression(&mut cond.right, params)?;
655 }
656 }
657 LogicalOperator::LeftJoin(join) => {
658 substitute_in_operator(&mut join.left, params)?;
659 substitute_in_operator(&mut join.right, params)?;
660 if let Some(cond) = &mut join.condition {
661 substitute_in_expression(cond, params)?;
662 }
663 }
664 LogicalOperator::Aggregate(agg) => {
665 for expr in &mut agg.group_by {
666 substitute_in_expression(expr, params)?;
667 }
668 for agg_expr in &mut agg.aggregates {
669 if let Some(expr) = &mut agg_expr.expression {
670 substitute_in_expression(expr, params)?;
671 }
672 }
673 substitute_in_operator(&mut agg.input, params)?;
674 }
675 LogicalOperator::Sort(sort) => {
676 for key in &mut sort.keys {
677 substitute_in_expression(&mut key.expression, params)?;
678 }
679 substitute_in_operator(&mut sort.input, params)?;
680 }
681 LogicalOperator::Limit(limit) => {
682 resolve_count_param(&mut limit.count, params)?;
683 substitute_in_operator(&mut limit.input, params)?;
684 }
685 LogicalOperator::Skip(skip) => {
686 resolve_count_param(&mut skip.count, params)?;
687 substitute_in_operator(&mut skip.input, params)?;
688 }
689 LogicalOperator::Distinct(distinct) => {
690 substitute_in_operator(&mut distinct.input, params)?;
691 }
692 LogicalOperator::CreateNode(create) => {
693 for (_, expr) in &mut create.properties {
694 substitute_in_expression(expr, params)?;
695 }
696 if let Some(input) = &mut create.input {
697 substitute_in_operator(input, params)?;
698 }
699 }
700 LogicalOperator::CreateEdge(create) => {
701 for (_, expr) in &mut create.properties {
702 substitute_in_expression(expr, params)?;
703 }
704 substitute_in_operator(&mut create.input, params)?;
705 }
706 LogicalOperator::DeleteNode(delete) => {
707 substitute_in_operator(&mut delete.input, params)?;
708 }
709 LogicalOperator::DeleteEdge(delete) => {
710 substitute_in_operator(&mut delete.input, params)?;
711 }
712 LogicalOperator::SetProperty(set) => {
713 for (_, expr) in &mut set.properties {
714 substitute_in_expression(expr, params)?;
715 }
716 substitute_in_operator(&mut set.input, params)?;
717 }
718 LogicalOperator::Union(union) => {
719 for input in &mut union.inputs {
720 substitute_in_operator(input, params)?;
721 }
722 }
723 LogicalOperator::AntiJoin(anti) => {
724 substitute_in_operator(&mut anti.left, params)?;
725 substitute_in_operator(&mut anti.right, params)?;
726 }
727 LogicalOperator::Bind(bind) => {
728 substitute_in_expression(&mut bind.expression, params)?;
729 substitute_in_operator(&mut bind.input, params)?;
730 }
731 LogicalOperator::TripleScan(scan) => {
732 if let Some(input) = &mut scan.input {
733 substitute_in_operator(input, params)?;
734 }
735 }
736 LogicalOperator::Unwind(unwind) => {
737 substitute_in_expression(&mut unwind.expression, params)?;
738 substitute_in_operator(&mut unwind.input, params)?;
739 }
740 LogicalOperator::MapCollect(mc) => {
741 substitute_in_operator(&mut mc.input, params)?;
742 }
743 LogicalOperator::Merge(merge) => {
744 for (_, expr) in &mut merge.match_properties {
745 substitute_in_expression(expr, params)?;
746 }
747 for (_, expr) in &mut merge.on_create {
748 substitute_in_expression(expr, params)?;
749 }
750 for (_, expr) in &mut merge.on_match {
751 substitute_in_expression(expr, params)?;
752 }
753 substitute_in_operator(&mut merge.input, params)?;
754 }
755 LogicalOperator::MergeRelationship(merge_rel) => {
756 for (_, expr) in &mut merge_rel.match_properties {
757 substitute_in_expression(expr, params)?;
758 }
759 for (_, expr) in &mut merge_rel.on_create {
760 substitute_in_expression(expr, params)?;
761 }
762 for (_, expr) in &mut merge_rel.on_match {
763 substitute_in_expression(expr, params)?;
764 }
765 substitute_in_operator(&mut merge_rel.input, params)?;
766 }
767 LogicalOperator::AddLabel(add_label) => {
768 substitute_in_operator(&mut add_label.input, params)?;
769 }
770 LogicalOperator::RemoveLabel(remove_label) => {
771 substitute_in_operator(&mut remove_label.input, params)?;
772 }
773 LogicalOperator::ShortestPath(sp) => {
774 substitute_in_operator(&mut sp.input, params)?;
775 }
776 LogicalOperator::InsertTriple(insert) => {
778 if let Some(ref mut input) = insert.input {
779 substitute_in_operator(input, params)?;
780 }
781 }
782 LogicalOperator::DeleteTriple(delete) => {
783 if let Some(ref mut input) = delete.input {
784 substitute_in_operator(input, params)?;
785 }
786 }
787 LogicalOperator::Modify(modify) => {
788 substitute_in_operator(&mut modify.where_clause, params)?;
789 }
790 LogicalOperator::ClearGraph(_)
791 | LogicalOperator::CreateGraph(_)
792 | LogicalOperator::DropGraph(_)
793 | LogicalOperator::LoadGraph(_)
794 | LogicalOperator::CopyGraph(_)
795 | LogicalOperator::MoveGraph(_)
796 | LogicalOperator::AddGraph(_) => {}
797 LogicalOperator::HorizontalAggregate(op) => {
798 substitute_in_operator(&mut op.input, params)?;
799 }
800 LogicalOperator::Empty => {}
801 LogicalOperator::VectorScan(scan) => {
802 substitute_in_expression(&mut scan.query_vector, params)?;
803 if let Some(ref mut input) = scan.input {
804 substitute_in_operator(input, params)?;
805 }
806 }
807 LogicalOperator::VectorJoin(join) => {
808 substitute_in_expression(&mut join.query_vector, params)?;
809 substitute_in_operator(&mut join.input, params)?;
810 }
811 LogicalOperator::Except(except) => {
812 substitute_in_operator(&mut except.left, params)?;
813 substitute_in_operator(&mut except.right, params)?;
814 }
815 LogicalOperator::Intersect(intersect) => {
816 substitute_in_operator(&mut intersect.left, params)?;
817 substitute_in_operator(&mut intersect.right, params)?;
818 }
819 LogicalOperator::Otherwise(otherwise) => {
820 substitute_in_operator(&mut otherwise.left, params)?;
821 substitute_in_operator(&mut otherwise.right, params)?;
822 }
823 LogicalOperator::Apply(apply) => {
824 substitute_in_operator(&mut apply.input, params)?;
825 substitute_in_operator(&mut apply.subplan, params)?;
826 }
827 LogicalOperator::ParameterScan(_) => {}
829 LogicalOperator::MultiWayJoin(mwj) => {
830 for input in &mut mwj.inputs {
831 substitute_in_operator(input, params)?;
832 }
833 for cond in &mut mwj.conditions {
834 substitute_in_expression(&mut cond.left, params)?;
835 substitute_in_expression(&mut cond.right, params)?;
836 }
837 }
838 LogicalOperator::CreatePropertyGraph(_) => {}
840 LogicalOperator::CallProcedure(_) => {}
842 LogicalOperator::LoadData(_) => {}
844 }
845 Ok(())
846}
847
848fn resolve_count_param(
850 count: &mut crate::query::plan::CountExpr,
851 params: &QueryParams,
852) -> Result<()> {
853 use crate::query::plan::CountExpr;
854 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
855
856 if let CountExpr::Parameter(name) = count {
857 let value = params.get(name.as_str()).ok_or_else(|| {
858 Error::Query(QueryError::new(
859 QueryErrorKind::Semantic,
860 format!("Missing parameter for SKIP/LIMIT: ${name}"),
861 ))
862 })?;
863 let n = match value {
864 Value::Int64(i) if *i >= 0 => *i as usize,
865 Value::Int64(i) => {
866 return Err(Error::Query(QueryError::new(
867 QueryErrorKind::Semantic,
868 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
869 )));
870 }
871 other => {
872 return Err(Error::Query(QueryError::new(
873 QueryErrorKind::Semantic,
874 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
875 )));
876 }
877 };
878 *count = CountExpr::Literal(n);
879 }
880 Ok(())
881}
882
883fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
885 use crate::query::plan::LogicalExpression;
886
887 match expr {
888 LogicalExpression::Parameter(name) => {
889 if let Some(value) = params.get(name) {
890 *expr = LogicalExpression::Literal(value.clone());
891 } else {
892 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
893 }
894 }
895 LogicalExpression::Binary { left, right, .. } => {
896 substitute_in_expression(left, params)?;
897 substitute_in_expression(right, params)?;
898 }
899 LogicalExpression::Unary { operand, .. } => {
900 substitute_in_expression(operand, params)?;
901 }
902 LogicalExpression::FunctionCall { args, .. } => {
903 for arg in args {
904 substitute_in_expression(arg, params)?;
905 }
906 }
907 LogicalExpression::List(items) => {
908 for item in items {
909 substitute_in_expression(item, params)?;
910 }
911 }
912 LogicalExpression::Map(pairs) => {
913 for (_, value) in pairs {
914 substitute_in_expression(value, params)?;
915 }
916 }
917 LogicalExpression::IndexAccess { base, index } => {
918 substitute_in_expression(base, params)?;
919 substitute_in_expression(index, params)?;
920 }
921 LogicalExpression::SliceAccess { base, start, end } => {
922 substitute_in_expression(base, params)?;
923 if let Some(s) = start {
924 substitute_in_expression(s, params)?;
925 }
926 if let Some(e) = end {
927 substitute_in_expression(e, params)?;
928 }
929 }
930 LogicalExpression::Case {
931 operand,
932 when_clauses,
933 else_clause,
934 } => {
935 if let Some(op) = operand {
936 substitute_in_expression(op, params)?;
937 }
938 for (cond, result) in when_clauses {
939 substitute_in_expression(cond, params)?;
940 substitute_in_expression(result, params)?;
941 }
942 if let Some(el) = else_clause {
943 substitute_in_expression(el, params)?;
944 }
945 }
946 LogicalExpression::Property { .. }
947 | LogicalExpression::Variable(_)
948 | LogicalExpression::Literal(_)
949 | LogicalExpression::Labels(_)
950 | LogicalExpression::Type(_)
951 | LogicalExpression::Id(_) => {}
952 LogicalExpression::ListComprehension {
953 list_expr,
954 filter_expr,
955 map_expr,
956 ..
957 } => {
958 substitute_in_expression(list_expr, params)?;
959 if let Some(filter) = filter_expr {
960 substitute_in_expression(filter, params)?;
961 }
962 substitute_in_expression(map_expr, params)?;
963 }
964 LogicalExpression::ListPredicate {
965 list_expr,
966 predicate,
967 ..
968 } => {
969 substitute_in_expression(list_expr, params)?;
970 substitute_in_expression(predicate, params)?;
971 }
972 LogicalExpression::ExistsSubquery(_)
973 | LogicalExpression::CountSubquery(_)
974 | LogicalExpression::ValueSubquery(_) => {
975 }
977 LogicalExpression::PatternComprehension { projection, .. } => {
978 substitute_in_expression(projection, params)?;
979 }
980 LogicalExpression::MapProjection { entries, .. } => {
981 for entry in entries {
982 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
983 substitute_in_expression(expr, params)?;
984 }
985 }
986 }
987 LogicalExpression::Reduce {
988 initial,
989 list,
990 expression,
991 ..
992 } => {
993 substitute_in_expression(initial, params)?;
994 substitute_in_expression(list, params)?;
995 substitute_in_expression(expression, params)?;
996 }
997 }
998 Ok(())
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use super::*;
1004
1005 #[test]
1006 fn test_query_language_is_lpg() {
1007 #[cfg(feature = "gql")]
1008 assert!(QueryLanguage::Gql.is_lpg());
1009 #[cfg(feature = "cypher")]
1010 assert!(QueryLanguage::Cypher.is_lpg());
1011 #[cfg(feature = "sparql")]
1012 assert!(!QueryLanguage::Sparql.is_lpg());
1013 }
1014
1015 #[test]
1016 fn test_processor_creation() {
1017 let store = Arc::new(LpgStore::new().unwrap());
1018 let processor = QueryProcessor::for_lpg(store);
1019 assert!(processor.lpg_store().node_count() == 0);
1020 }
1021
1022 #[cfg(feature = "gql")]
1023 #[test]
1024 fn test_process_simple_gql() {
1025 let store = Arc::new(LpgStore::new().unwrap());
1026 store.create_node(&["Person"]);
1027 store.create_node(&["Person"]);
1028
1029 let processor = QueryProcessor::for_lpg(store);
1030 let result = processor
1031 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1032 .unwrap();
1033
1034 assert_eq!(result.row_count(), 2);
1035 assert_eq!(result.columns[0], "n");
1036 }
1037
1038 #[cfg(feature = "cypher")]
1039 #[test]
1040 fn test_process_simple_cypher() {
1041 let store = Arc::new(LpgStore::new().unwrap());
1042 store.create_node(&["Person"]);
1043
1044 let processor = QueryProcessor::for_lpg(store);
1045 let result = processor
1046 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1047 .unwrap();
1048
1049 assert_eq!(result.row_count(), 1);
1050 }
1051
1052 #[cfg(feature = "gql")]
1053 #[test]
1054 fn test_process_with_params() {
1055 let store = Arc::new(LpgStore::new().unwrap());
1056 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1057 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1058 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1059
1060 let processor = QueryProcessor::for_lpg(store);
1061
1062 let mut params = HashMap::new();
1064 params.insert("min_age".to_string(), Value::Int64(30));
1065
1066 let result = processor
1067 .process(
1068 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1069 QueryLanguage::Gql,
1070 Some(¶ms),
1071 )
1072 .unwrap();
1073
1074 assert_eq!(result.row_count(), 2);
1076 }
1077
1078 #[cfg(feature = "gql")]
1079 #[test]
1080 fn test_missing_param_error() {
1081 let store = Arc::new(LpgStore::new().unwrap());
1082 store.create_node(&["Person"]);
1083
1084 let processor = QueryProcessor::for_lpg(store);
1085
1086 let params: HashMap<String, Value> = HashMap::new();
1088 let result = processor.process(
1089 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1090 QueryLanguage::Gql,
1091 Some(¶ms),
1092 );
1093
1094 assert!(result.is_err());
1096 let err = result.unwrap_err();
1097 assert!(
1098 err.to_string().contains("Missing parameter"),
1099 "Expected 'Missing parameter' error, got: {}",
1100 err
1101 );
1102 }
1103
1104 #[cfg(feature = "gql")]
1105 #[test]
1106 fn test_params_in_filter_with_property() {
1107 let store = Arc::new(LpgStore::new().unwrap());
1109 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1110 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1111
1112 let processor = QueryProcessor::for_lpg(store);
1113
1114 let mut params = HashMap::new();
1115 params.insert("threshold".to_string(), Value::Int64(15));
1116
1117 let result = processor
1118 .process(
1119 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1120 QueryLanguage::Gql,
1121 Some(¶ms),
1122 )
1123 .unwrap();
1124
1125 assert_eq!(result.row_count(), 1);
1127 let row = &result.rows[0];
1128 assert_eq!(row[0], Value::Int64(20));
1129 }
1130
1131 #[cfg(feature = "gql")]
1132 #[test]
1133 fn test_params_in_multiple_where_conditions() {
1134 let store = Arc::new(LpgStore::new().unwrap());
1136 store.create_node_with_props(
1137 &["Person"],
1138 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1139 );
1140 store.create_node_with_props(
1141 &["Person"],
1142 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1143 );
1144 store.create_node_with_props(
1145 &["Person"],
1146 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1147 );
1148
1149 let processor = QueryProcessor::for_lpg(store);
1150
1151 let mut params = HashMap::new();
1152 params.insert("min_age".to_string(), Value::Int64(30));
1153 params.insert("min_score".to_string(), Value::Int64(75));
1154
1155 let result = processor
1156 .process(
1157 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1158 QueryLanguage::Gql,
1159 Some(¶ms),
1160 )
1161 .unwrap();
1162
1163 assert_eq!(result.row_count(), 1);
1165 }
1166
1167 #[cfg(feature = "gql")]
1168 #[test]
1169 fn test_params_with_in_list() {
1170 let store = Arc::new(LpgStore::new().unwrap());
1172 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1173 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1174 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1175
1176 let processor = QueryProcessor::for_lpg(store);
1177
1178 let mut params = HashMap::new();
1180 params.insert("target".to_string(), Value::String("active".into()));
1181
1182 let result = processor
1183 .process(
1184 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1185 QueryLanguage::Gql,
1186 Some(¶ms),
1187 )
1188 .unwrap();
1189
1190 assert_eq!(result.row_count(), 1);
1191 }
1192
1193 #[cfg(feature = "gql")]
1194 #[test]
1195 fn test_params_same_type_comparison() {
1196 let store = Arc::new(LpgStore::new().unwrap());
1198 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1199 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1200
1201 let processor = QueryProcessor::for_lpg(store);
1202
1203 let mut params = HashMap::new();
1205 params.insert("threshold".to_string(), Value::Int64(75));
1206
1207 let result = processor
1208 .process(
1209 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1210 QueryLanguage::Gql,
1211 Some(¶ms),
1212 )
1213 .unwrap();
1214
1215 assert_eq!(result.row_count(), 1);
1217 }
1218
1219 #[cfg(feature = "gql")]
1220 #[test]
1221 fn test_process_empty_result_has_columns() {
1222 let store = Arc::new(LpgStore::new().unwrap());
1224 let processor = QueryProcessor::for_lpg(store);
1227 let result = processor
1228 .process(
1229 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1230 QueryLanguage::Gql,
1231 None,
1232 )
1233 .unwrap();
1234
1235 assert_eq!(result.row_count(), 0);
1236 assert_eq!(result.columns.len(), 2);
1237 assert_eq!(result.columns[0], "name");
1238 assert_eq!(result.columns[1], "age");
1239 }
1240
1241 #[cfg(feature = "gql")]
1242 #[test]
1243 fn test_params_string_equality() {
1244 let store = Arc::new(LpgStore::new().unwrap());
1246 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1247 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1248 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1249
1250 let processor = QueryProcessor::for_lpg(store);
1251
1252 let mut params = HashMap::new();
1253 params.insert("target".to_string(), Value::String("beta".into()));
1254
1255 let result = processor
1256 .process(
1257 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1258 QueryLanguage::Gql,
1259 Some(¶ms),
1260 )
1261 .unwrap();
1262
1263 assert_eq!(result.row_count(), 1);
1264 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1265 }
1266}