1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 transaction_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 transaction_context: Option<(EpochId, TransactionId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 transaction_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 transaction_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_transaction(
136 store: Arc<LpgStore>,
137 transaction_manager: Arc<TransactionManager>,
138 ) -> Self {
139 let optimizer = Optimizer::from_store(&store);
140 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141 Self {
142 lpg_store: store,
143 graph_store,
144 transaction_manager,
145 catalog: Arc::new(Catalog::new()),
146 optimizer,
147 transaction_context: None,
148 #[cfg(feature = "rdf")]
149 rdf_store: None,
150 }
151 }
152
153 #[must_use]
155 pub fn for_graph_store_with_transaction(
156 store: Arc<dyn GraphStoreMut>,
157 transaction_manager: Arc<TransactionManager>,
158 ) -> Self {
159 let optimizer = Optimizer::from_graph_store(&*store);
160 Self {
161 lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), graph_store: store,
163 transaction_manager,
164 catalog: Arc::new(Catalog::new()),
165 optimizer,
166 transaction_context: None,
167 #[cfg(feature = "rdf")]
168 rdf_store: None,
169 }
170 }
171
172 #[cfg(feature = "rdf")]
174 #[must_use]
175 pub fn with_rdf(
176 lpg_store: Arc<LpgStore>,
177 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
178 ) -> Self {
179 let optimizer = Optimizer::from_store(&lpg_store);
180 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
181 Self {
182 lpg_store,
183 graph_store,
184 transaction_manager: Arc::new(TransactionManager::new()),
185 catalog: Arc::new(Catalog::new()),
186 optimizer,
187 transaction_context: None,
188 rdf_store: Some(rdf_store),
189 }
190 }
191
192 #[must_use]
196 pub fn with_transaction_context(
197 mut self,
198 viewing_epoch: EpochId,
199 transaction_id: TransactionId,
200 ) -> Self {
201 self.transaction_context = Some((viewing_epoch, transaction_id));
202 self
203 }
204
205 #[must_use]
207 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
208 self.catalog = catalog;
209 self
210 }
211
212 #[must_use]
214 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
215 self.optimizer = optimizer;
216 self
217 }
218
219 pub fn process(
239 &self,
240 query: &str,
241 language: QueryLanguage,
242 params: Option<&QueryParams>,
243 ) -> Result<QueryResult> {
244 if language.is_lpg() {
245 self.process_lpg(query, language, params)
246 } else {
247 #[cfg(feature = "rdf")]
248 {
249 self.process_rdf(query, language, params)
250 }
251 #[cfg(not(feature = "rdf"))]
252 {
253 Err(Error::Internal(
254 "RDF support not enabled. Compile with --features rdf".to_string(),
255 ))
256 }
257 }
258 }
259
260 fn process_lpg(
262 &self,
263 query: &str,
264 language: QueryLanguage,
265 params: Option<&QueryParams>,
266 ) -> Result<QueryResult> {
267 #[cfg(not(target_arch = "wasm32"))]
268 let start_time = std::time::Instant::now();
269
270 let mut logical_plan = self.translate_lpg(query, language)?;
272
273 if let Some(params) = params {
275 substitute_params(&mut logical_plan, params)?;
276 }
277
278 let mut binder = Binder::new();
280 let _binding_context = binder.bind(&logical_plan)?;
281
282 let optimized_plan = self.optimizer.optimize(logical_plan)?;
284
285 if optimized_plan.explain {
287 let mut plan = optimized_plan;
288 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
289 return Ok(explain_result(&plan));
290 }
291
292 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
294 Planner::with_context(
295 Arc::clone(&self.graph_store),
296 Arc::clone(&self.transaction_manager),
297 Some(transaction_id),
298 epoch,
299 )
300 } else {
301 Planner::with_context(
302 Arc::clone(&self.graph_store),
303 Arc::clone(&self.transaction_manager),
304 None,
305 self.transaction_manager.current_epoch(),
306 )
307 };
308 let mut physical_plan = planner.plan(&optimized_plan)?;
309
310 let executor = Executor::with_columns(physical_plan.columns.clone());
312 let mut result = executor.execute(physical_plan.operator.as_mut())?;
313
314 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
317 {
318 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
319 result.execution_time_ms = Some(elapsed_ms);
320 }
321 result.rows_scanned = Some(rows_scanned);
322
323 Ok(result)
324 }
325
326 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
328 match language {
329 #[cfg(feature = "gql")]
330 QueryLanguage::Gql => {
331 use crate::query::translators::gql;
332 gql::translate(query)
333 }
334 #[cfg(feature = "cypher")]
335 QueryLanguage::Cypher => {
336 use crate::query::translators::cypher;
337 cypher::translate(query)
338 }
339 #[cfg(feature = "gremlin")]
340 QueryLanguage::Gremlin => {
341 use crate::query::translators::gremlin;
342 gremlin::translate(query)
343 }
344 #[cfg(feature = "graphql")]
345 QueryLanguage::GraphQL => {
346 use crate::query::translators::graphql;
347 graphql::translate(query)
348 }
349 #[cfg(feature = "sql-pgq")]
350 QueryLanguage::SqlPgq => {
351 use crate::query::translators::sql_pgq;
352 sql_pgq::translate(query)
353 }
354 #[allow(unreachable_patterns)]
355 _ => Err(Error::Internal(format!(
356 "Language {:?} is not an LPG language",
357 language
358 ))),
359 }
360 }
361
362 #[cfg(feature = "rdf")]
364 fn process_rdf(
365 &self,
366 query: &str,
367 language: QueryLanguage,
368 params: Option<&QueryParams>,
369 ) -> Result<QueryResult> {
370 use crate::query::planner::rdf::RdfPlanner;
371
372 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
373 Error::Internal("RDF store not configured for this processor".to_string())
374 })?;
375
376 let mut logical_plan = self.translate_rdf(query, language)?;
378
379 if let Some(params) = params {
381 substitute_params(&mut logical_plan, params)?;
382 }
383
384 let mut binder = Binder::new();
386 let _binding_context = binder.bind(&logical_plan)?;
387
388 let optimized_plan = self.optimizer.optimize(logical_plan)?;
390
391 if optimized_plan.explain {
393 return Ok(explain_result(&optimized_plan));
394 }
395
396 let planner = RdfPlanner::new(Arc::clone(rdf_store));
398 let mut physical_plan = planner.plan(&optimized_plan)?;
399
400 let executor = Executor::with_columns(physical_plan.columns.clone());
402 executor.execute(physical_plan.operator.as_mut())
403 }
404
405 #[cfg(feature = "rdf")]
407 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
408 match language {
409 #[cfg(feature = "sparql")]
410 QueryLanguage::Sparql => {
411 use crate::query::translators::sparql;
412 sparql::translate(query)
413 }
414 #[cfg(all(feature = "graphql", feature = "rdf"))]
415 QueryLanguage::GraphQLRdf => {
416 use crate::query::translators::graphql_rdf;
417 graphql_rdf::translate(query, "http://example.org/")
419 }
420 _ => Err(Error::Internal(format!(
421 "Language {:?} is not an RDF language",
422 language
423 ))),
424 }
425 }
426
427 #[must_use]
429 pub fn lpg_store(&self) -> &Arc<LpgStore> {
430 &self.lpg_store
431 }
432
433 #[must_use]
435 pub fn catalog(&self) -> &Arc<Catalog> {
436 &self.catalog
437 }
438
439 #[must_use]
441 pub fn optimizer(&self) -> &Optimizer {
442 &self.optimizer
443 }
444
445 #[cfg(feature = "rdf")]
447 #[must_use]
448 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
449 self.rdf_store.as_ref()
450 }
451}
452
453impl QueryProcessor {
454 #[must_use]
456 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
457 &self.transaction_manager
458 }
459}
460
461pub(crate) fn annotate_pushdown_hints(
466 op: &mut LogicalOperator,
467 store: &dyn grafeo_core::graph::GraphStore,
468) {
469 use crate::query::plan::*;
470
471 match op {
472 LogicalOperator::Filter(filter) => {
473 annotate_pushdown_hints(&mut filter.input, store);
475
476 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
478 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
479 }
480 }
481 LogicalOperator::NodeScan(op) => {
482 if let Some(input) = &mut op.input {
483 annotate_pushdown_hints(input, store);
484 }
485 }
486 LogicalOperator::EdgeScan(op) => {
487 if let Some(input) = &mut op.input {
488 annotate_pushdown_hints(input, store);
489 }
490 }
491 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
492 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
493 LogicalOperator::Join(op) => {
494 annotate_pushdown_hints(&mut op.left, store);
495 annotate_pushdown_hints(&mut op.right, store);
496 }
497 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
498 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
499 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
500 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
501 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
502 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
503 LogicalOperator::Union(op) => {
504 for input in &mut op.inputs {
505 annotate_pushdown_hints(input, store);
506 }
507 }
508 LogicalOperator::Apply(op) => {
509 annotate_pushdown_hints(&mut op.input, store);
510 annotate_pushdown_hints(&mut op.subplan, store);
511 }
512 LogicalOperator::Otherwise(op) => {
513 annotate_pushdown_hints(&mut op.left, store);
514 annotate_pushdown_hints(&mut op.right, store);
515 }
516 _ => {}
517 }
518}
519
520fn infer_pushdown(
522 predicate: &LogicalExpression,
523 scan: &crate::query::plan::NodeScanOp,
524 store: &dyn grafeo_core::graph::GraphStore,
525) -> Option<crate::query::plan::PushdownHint> {
526 use crate::query::plan::*;
527
528 match predicate {
529 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
531 if let Some(prop) = extract_property_name(left, &scan.variable)
532 .or_else(|| extract_property_name(right, &scan.variable))
533 {
534 if store.has_property_index(&prop) {
535 return Some(PushdownHint::IndexLookup { property: prop });
536 }
537 if scan.label.is_some() {
538 return Some(PushdownHint::LabelFirst);
539 }
540 }
541 None
542 }
543 LogicalExpression::Binary {
545 left,
546 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
547 right,
548 } => {
549 if let Some(prop) = extract_property_name(left, &scan.variable)
550 .or_else(|| extract_property_name(right, &scan.variable))
551 {
552 if store.has_property_index(&prop) {
553 return Some(PushdownHint::RangeScan { property: prop });
554 }
555 if scan.label.is_some() {
556 return Some(PushdownHint::LabelFirst);
557 }
558 }
559 None
560 }
561 LogicalExpression::Binary {
563 left,
564 op: BinaryOp::And,
565 ..
566 } => infer_pushdown(left, scan, store),
567 _ => {
568 if scan.label.is_some() {
570 Some(PushdownHint::LabelFirst)
571 } else {
572 None
573 }
574 }
575 }
576}
577
578fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
581 if let LogicalExpression::Property { variable, property } = expr
582 && variable == scan_var
583 {
584 Some(property.clone())
585 } else {
586 None
587 }
588}
589
590pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
592 let tree_text = plan.root.explain_tree();
593 QueryResult {
594 columns: vec!["plan".to_string()],
595 column_types: vec![grafeo_common::types::LogicalType::String],
596 rows: vec![vec![Value::String(tree_text.into())]],
597 execution_time_ms: None,
598 rows_scanned: None,
599 status_message: None,
600 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
601 }
602}
603
604pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
606 substitute_in_operator(&mut plan.root, params)
607}
608
609fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
611 use crate::query::plan::*;
612
613 match op {
614 LogicalOperator::Filter(filter) => {
615 substitute_in_expression(&mut filter.predicate, params)?;
616 substitute_in_operator(&mut filter.input, params)?;
617 }
618 LogicalOperator::Return(ret) => {
619 for item in &mut ret.items {
620 substitute_in_expression(&mut item.expression, params)?;
621 }
622 substitute_in_operator(&mut ret.input, params)?;
623 }
624 LogicalOperator::Project(proj) => {
625 for p in &mut proj.projections {
626 substitute_in_expression(&mut p.expression, params)?;
627 }
628 substitute_in_operator(&mut proj.input, params)?;
629 }
630 LogicalOperator::NodeScan(scan) => {
631 if let Some(input) = &mut scan.input {
632 substitute_in_operator(input, params)?;
633 }
634 }
635 LogicalOperator::EdgeScan(scan) => {
636 if let Some(input) = &mut scan.input {
637 substitute_in_operator(input, params)?;
638 }
639 }
640 LogicalOperator::Expand(expand) => {
641 substitute_in_operator(&mut expand.input, params)?;
642 }
643 LogicalOperator::Join(join) => {
644 substitute_in_operator(&mut join.left, params)?;
645 substitute_in_operator(&mut join.right, params)?;
646 for cond in &mut join.conditions {
647 substitute_in_expression(&mut cond.left, params)?;
648 substitute_in_expression(&mut cond.right, params)?;
649 }
650 }
651 LogicalOperator::LeftJoin(join) => {
652 substitute_in_operator(&mut join.left, params)?;
653 substitute_in_operator(&mut join.right, params)?;
654 if let Some(cond) = &mut join.condition {
655 substitute_in_expression(cond, params)?;
656 }
657 }
658 LogicalOperator::Aggregate(agg) => {
659 for expr in &mut agg.group_by {
660 substitute_in_expression(expr, params)?;
661 }
662 for agg_expr in &mut agg.aggregates {
663 if let Some(expr) = &mut agg_expr.expression {
664 substitute_in_expression(expr, params)?;
665 }
666 }
667 substitute_in_operator(&mut agg.input, params)?;
668 }
669 LogicalOperator::Sort(sort) => {
670 for key in &mut sort.keys {
671 substitute_in_expression(&mut key.expression, params)?;
672 }
673 substitute_in_operator(&mut sort.input, params)?;
674 }
675 LogicalOperator::Limit(limit) => {
676 resolve_count_param(&mut limit.count, params)?;
677 substitute_in_operator(&mut limit.input, params)?;
678 }
679 LogicalOperator::Skip(skip) => {
680 resolve_count_param(&mut skip.count, params)?;
681 substitute_in_operator(&mut skip.input, params)?;
682 }
683 LogicalOperator::Distinct(distinct) => {
684 substitute_in_operator(&mut distinct.input, params)?;
685 }
686 LogicalOperator::CreateNode(create) => {
687 for (_, expr) in &mut create.properties {
688 substitute_in_expression(expr, params)?;
689 }
690 if let Some(input) = &mut create.input {
691 substitute_in_operator(input, params)?;
692 }
693 }
694 LogicalOperator::CreateEdge(create) => {
695 for (_, expr) in &mut create.properties {
696 substitute_in_expression(expr, params)?;
697 }
698 substitute_in_operator(&mut create.input, params)?;
699 }
700 LogicalOperator::DeleteNode(delete) => {
701 substitute_in_operator(&mut delete.input, params)?;
702 }
703 LogicalOperator::DeleteEdge(delete) => {
704 substitute_in_operator(&mut delete.input, params)?;
705 }
706 LogicalOperator::SetProperty(set) => {
707 for (_, expr) in &mut set.properties {
708 substitute_in_expression(expr, params)?;
709 }
710 substitute_in_operator(&mut set.input, params)?;
711 }
712 LogicalOperator::Union(union) => {
713 for input in &mut union.inputs {
714 substitute_in_operator(input, params)?;
715 }
716 }
717 LogicalOperator::AntiJoin(anti) => {
718 substitute_in_operator(&mut anti.left, params)?;
719 substitute_in_operator(&mut anti.right, params)?;
720 }
721 LogicalOperator::Bind(bind) => {
722 substitute_in_expression(&mut bind.expression, params)?;
723 substitute_in_operator(&mut bind.input, params)?;
724 }
725 LogicalOperator::TripleScan(scan) => {
726 if let Some(input) = &mut scan.input {
727 substitute_in_operator(input, params)?;
728 }
729 }
730 LogicalOperator::Unwind(unwind) => {
731 substitute_in_expression(&mut unwind.expression, params)?;
732 substitute_in_operator(&mut unwind.input, params)?;
733 }
734 LogicalOperator::MapCollect(mc) => {
735 substitute_in_operator(&mut mc.input, params)?;
736 }
737 LogicalOperator::Merge(merge) => {
738 for (_, expr) in &mut merge.match_properties {
739 substitute_in_expression(expr, params)?;
740 }
741 for (_, expr) in &mut merge.on_create {
742 substitute_in_expression(expr, params)?;
743 }
744 for (_, expr) in &mut merge.on_match {
745 substitute_in_expression(expr, params)?;
746 }
747 substitute_in_operator(&mut merge.input, params)?;
748 }
749 LogicalOperator::MergeRelationship(merge_rel) => {
750 for (_, expr) in &mut merge_rel.match_properties {
751 substitute_in_expression(expr, params)?;
752 }
753 for (_, expr) in &mut merge_rel.on_create {
754 substitute_in_expression(expr, params)?;
755 }
756 for (_, expr) in &mut merge_rel.on_match {
757 substitute_in_expression(expr, params)?;
758 }
759 substitute_in_operator(&mut merge_rel.input, params)?;
760 }
761 LogicalOperator::AddLabel(add_label) => {
762 substitute_in_operator(&mut add_label.input, params)?;
763 }
764 LogicalOperator::RemoveLabel(remove_label) => {
765 substitute_in_operator(&mut remove_label.input, params)?;
766 }
767 LogicalOperator::ShortestPath(sp) => {
768 substitute_in_operator(&mut sp.input, params)?;
769 }
770 LogicalOperator::InsertTriple(insert) => {
772 if let Some(ref mut input) = insert.input {
773 substitute_in_operator(input, params)?;
774 }
775 }
776 LogicalOperator::DeleteTriple(delete) => {
777 if let Some(ref mut input) = delete.input {
778 substitute_in_operator(input, params)?;
779 }
780 }
781 LogicalOperator::Modify(modify) => {
782 substitute_in_operator(&mut modify.where_clause, params)?;
783 }
784 LogicalOperator::ClearGraph(_)
785 | LogicalOperator::CreateGraph(_)
786 | LogicalOperator::DropGraph(_)
787 | LogicalOperator::LoadGraph(_)
788 | LogicalOperator::CopyGraph(_)
789 | LogicalOperator::MoveGraph(_)
790 | LogicalOperator::AddGraph(_) => {}
791 LogicalOperator::HorizontalAggregate(op) => {
792 substitute_in_operator(&mut op.input, params)?;
793 }
794 LogicalOperator::Empty => {}
795 LogicalOperator::VectorScan(scan) => {
796 substitute_in_expression(&mut scan.query_vector, params)?;
797 if let Some(ref mut input) = scan.input {
798 substitute_in_operator(input, params)?;
799 }
800 }
801 LogicalOperator::VectorJoin(join) => {
802 substitute_in_expression(&mut join.query_vector, params)?;
803 substitute_in_operator(&mut join.input, params)?;
804 }
805 LogicalOperator::Except(except) => {
806 substitute_in_operator(&mut except.left, params)?;
807 substitute_in_operator(&mut except.right, params)?;
808 }
809 LogicalOperator::Intersect(intersect) => {
810 substitute_in_operator(&mut intersect.left, params)?;
811 substitute_in_operator(&mut intersect.right, params)?;
812 }
813 LogicalOperator::Otherwise(otherwise) => {
814 substitute_in_operator(&mut otherwise.left, params)?;
815 substitute_in_operator(&mut otherwise.right, params)?;
816 }
817 LogicalOperator::Apply(apply) => {
818 substitute_in_operator(&mut apply.input, params)?;
819 substitute_in_operator(&mut apply.subplan, params)?;
820 }
821 LogicalOperator::ParameterScan(_) => {}
823 LogicalOperator::MultiWayJoin(mwj) => {
824 for input in &mut mwj.inputs {
825 substitute_in_operator(input, params)?;
826 }
827 for cond in &mut mwj.conditions {
828 substitute_in_expression(&mut cond.left, params)?;
829 substitute_in_expression(&mut cond.right, params)?;
830 }
831 }
832 LogicalOperator::CreatePropertyGraph(_) => {}
834 LogicalOperator::CallProcedure(_) => {}
836 LogicalOperator::LoadCsv(_) => {}
838 }
839 Ok(())
840}
841
842fn resolve_count_param(
844 count: &mut crate::query::plan::CountExpr,
845 params: &QueryParams,
846) -> Result<()> {
847 use crate::query::plan::CountExpr;
848 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
849
850 if let CountExpr::Parameter(name) = count {
851 let value = params.get(name.as_str()).ok_or_else(|| {
852 Error::Query(QueryError::new(
853 QueryErrorKind::Semantic,
854 format!("Missing parameter for SKIP/LIMIT: ${name}"),
855 ))
856 })?;
857 let n = match value {
858 Value::Int64(i) if *i >= 0 => *i as usize,
859 Value::Int64(i) => {
860 return Err(Error::Query(QueryError::new(
861 QueryErrorKind::Semantic,
862 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
863 )));
864 }
865 other => {
866 return Err(Error::Query(QueryError::new(
867 QueryErrorKind::Semantic,
868 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
869 )));
870 }
871 };
872 *count = CountExpr::Literal(n);
873 }
874 Ok(())
875}
876
877fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
879 use crate::query::plan::LogicalExpression;
880
881 match expr {
882 LogicalExpression::Parameter(name) => {
883 if let Some(value) = params.get(name) {
884 *expr = LogicalExpression::Literal(value.clone());
885 } else {
886 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
887 }
888 }
889 LogicalExpression::Binary { left, right, .. } => {
890 substitute_in_expression(left, params)?;
891 substitute_in_expression(right, params)?;
892 }
893 LogicalExpression::Unary { operand, .. } => {
894 substitute_in_expression(operand, params)?;
895 }
896 LogicalExpression::FunctionCall { args, .. } => {
897 for arg in args {
898 substitute_in_expression(arg, params)?;
899 }
900 }
901 LogicalExpression::List(items) => {
902 for item in items {
903 substitute_in_expression(item, params)?;
904 }
905 }
906 LogicalExpression::Map(pairs) => {
907 for (_, value) in pairs {
908 substitute_in_expression(value, params)?;
909 }
910 }
911 LogicalExpression::IndexAccess { base, index } => {
912 substitute_in_expression(base, params)?;
913 substitute_in_expression(index, params)?;
914 }
915 LogicalExpression::SliceAccess { base, start, end } => {
916 substitute_in_expression(base, params)?;
917 if let Some(s) = start {
918 substitute_in_expression(s, params)?;
919 }
920 if let Some(e) = end {
921 substitute_in_expression(e, params)?;
922 }
923 }
924 LogicalExpression::Case {
925 operand,
926 when_clauses,
927 else_clause,
928 } => {
929 if let Some(op) = operand {
930 substitute_in_expression(op, params)?;
931 }
932 for (cond, result) in when_clauses {
933 substitute_in_expression(cond, params)?;
934 substitute_in_expression(result, params)?;
935 }
936 if let Some(el) = else_clause {
937 substitute_in_expression(el, params)?;
938 }
939 }
940 LogicalExpression::Property { .. }
941 | LogicalExpression::Variable(_)
942 | LogicalExpression::Literal(_)
943 | LogicalExpression::Labels(_)
944 | LogicalExpression::Type(_)
945 | LogicalExpression::Id(_) => {}
946 LogicalExpression::ListComprehension {
947 list_expr,
948 filter_expr,
949 map_expr,
950 ..
951 } => {
952 substitute_in_expression(list_expr, params)?;
953 if let Some(filter) = filter_expr {
954 substitute_in_expression(filter, params)?;
955 }
956 substitute_in_expression(map_expr, params)?;
957 }
958 LogicalExpression::ListPredicate {
959 list_expr,
960 predicate,
961 ..
962 } => {
963 substitute_in_expression(list_expr, params)?;
964 substitute_in_expression(predicate, params)?;
965 }
966 LogicalExpression::ExistsSubquery(_)
967 | LogicalExpression::CountSubquery(_)
968 | LogicalExpression::ValueSubquery(_) => {
969 }
971 LogicalExpression::PatternComprehension { projection, .. } => {
972 substitute_in_expression(projection, params)?;
973 }
974 LogicalExpression::MapProjection { entries, .. } => {
975 for entry in entries {
976 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
977 substitute_in_expression(expr, params)?;
978 }
979 }
980 }
981 LogicalExpression::Reduce {
982 initial,
983 list,
984 expression,
985 ..
986 } => {
987 substitute_in_expression(initial, params)?;
988 substitute_in_expression(list, params)?;
989 substitute_in_expression(expression, params)?;
990 }
991 }
992 Ok(())
993}
994
995#[cfg(test)]
996mod tests {
997 use super::*;
998
999 #[test]
1000 fn test_query_language_is_lpg() {
1001 #[cfg(feature = "gql")]
1002 assert!(QueryLanguage::Gql.is_lpg());
1003 #[cfg(feature = "cypher")]
1004 assert!(QueryLanguage::Cypher.is_lpg());
1005 #[cfg(feature = "sparql")]
1006 assert!(!QueryLanguage::Sparql.is_lpg());
1007 }
1008
1009 #[test]
1010 fn test_processor_creation() {
1011 let store = Arc::new(LpgStore::new().unwrap());
1012 let processor = QueryProcessor::for_lpg(store);
1013 assert!(processor.lpg_store().node_count() == 0);
1014 }
1015
1016 #[cfg(feature = "gql")]
1017 #[test]
1018 fn test_process_simple_gql() {
1019 let store = Arc::new(LpgStore::new().unwrap());
1020 store.create_node(&["Person"]);
1021 store.create_node(&["Person"]);
1022
1023 let processor = QueryProcessor::for_lpg(store);
1024 let result = processor
1025 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1026 .unwrap();
1027
1028 assert_eq!(result.row_count(), 2);
1029 assert_eq!(result.columns[0], "n");
1030 }
1031
1032 #[cfg(feature = "cypher")]
1033 #[test]
1034 fn test_process_simple_cypher() {
1035 let store = Arc::new(LpgStore::new().unwrap());
1036 store.create_node(&["Person"]);
1037
1038 let processor = QueryProcessor::for_lpg(store);
1039 let result = processor
1040 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1041 .unwrap();
1042
1043 assert_eq!(result.row_count(), 1);
1044 }
1045
1046 #[cfg(feature = "gql")]
1047 #[test]
1048 fn test_process_with_params() {
1049 let store = Arc::new(LpgStore::new().unwrap());
1050 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1051 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1052 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1053
1054 let processor = QueryProcessor::for_lpg(store);
1055
1056 let mut params = HashMap::new();
1058 params.insert("min_age".to_string(), Value::Int64(30));
1059
1060 let result = processor
1061 .process(
1062 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1063 QueryLanguage::Gql,
1064 Some(¶ms),
1065 )
1066 .unwrap();
1067
1068 assert_eq!(result.row_count(), 2);
1070 }
1071
1072 #[cfg(feature = "gql")]
1073 #[test]
1074 fn test_missing_param_error() {
1075 let store = Arc::new(LpgStore::new().unwrap());
1076 store.create_node(&["Person"]);
1077
1078 let processor = QueryProcessor::for_lpg(store);
1079
1080 let params: HashMap<String, Value> = HashMap::new();
1082 let result = processor.process(
1083 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1084 QueryLanguage::Gql,
1085 Some(¶ms),
1086 );
1087
1088 assert!(result.is_err());
1090 let err = result.unwrap_err();
1091 assert!(
1092 err.to_string().contains("Missing parameter"),
1093 "Expected 'Missing parameter' error, got: {}",
1094 err
1095 );
1096 }
1097
1098 #[cfg(feature = "gql")]
1099 #[test]
1100 fn test_params_in_filter_with_property() {
1101 let store = Arc::new(LpgStore::new().unwrap());
1103 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1104 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1105
1106 let processor = QueryProcessor::for_lpg(store);
1107
1108 let mut params = HashMap::new();
1109 params.insert("threshold".to_string(), Value::Int64(15));
1110
1111 let result = processor
1112 .process(
1113 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1114 QueryLanguage::Gql,
1115 Some(¶ms),
1116 )
1117 .unwrap();
1118
1119 assert_eq!(result.row_count(), 1);
1121 let row = &result.rows[0];
1122 assert_eq!(row[0], Value::Int64(20));
1123 }
1124
1125 #[cfg(feature = "gql")]
1126 #[test]
1127 fn test_params_in_multiple_where_conditions() {
1128 let store = Arc::new(LpgStore::new().unwrap());
1130 store.create_node_with_props(
1131 &["Person"],
1132 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1133 );
1134 store.create_node_with_props(
1135 &["Person"],
1136 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1137 );
1138 store.create_node_with_props(
1139 &["Person"],
1140 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1141 );
1142
1143 let processor = QueryProcessor::for_lpg(store);
1144
1145 let mut params = HashMap::new();
1146 params.insert("min_age".to_string(), Value::Int64(30));
1147 params.insert("min_score".to_string(), Value::Int64(75));
1148
1149 let result = processor
1150 .process(
1151 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1152 QueryLanguage::Gql,
1153 Some(¶ms),
1154 )
1155 .unwrap();
1156
1157 assert_eq!(result.row_count(), 1);
1159 }
1160
1161 #[cfg(feature = "gql")]
1162 #[test]
1163 fn test_params_with_in_list() {
1164 let store = Arc::new(LpgStore::new().unwrap());
1166 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1167 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1168 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1169
1170 let processor = QueryProcessor::for_lpg(store);
1171
1172 let mut params = HashMap::new();
1174 params.insert("target".to_string(), Value::String("active".into()));
1175
1176 let result = processor
1177 .process(
1178 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1179 QueryLanguage::Gql,
1180 Some(¶ms),
1181 )
1182 .unwrap();
1183
1184 assert_eq!(result.row_count(), 1);
1185 }
1186
1187 #[cfg(feature = "gql")]
1188 #[test]
1189 fn test_params_same_type_comparison() {
1190 let store = Arc::new(LpgStore::new().unwrap());
1192 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1193 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1194
1195 let processor = QueryProcessor::for_lpg(store);
1196
1197 let mut params = HashMap::new();
1199 params.insert("threshold".to_string(), Value::Int64(75));
1200
1201 let result = processor
1202 .process(
1203 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1204 QueryLanguage::Gql,
1205 Some(¶ms),
1206 )
1207 .unwrap();
1208
1209 assert_eq!(result.row_count(), 1);
1211 }
1212
1213 #[cfg(feature = "gql")]
1214 #[test]
1215 fn test_process_empty_result_has_columns() {
1216 let store = Arc::new(LpgStore::new().unwrap());
1218 let processor = QueryProcessor::for_lpg(store);
1221 let result = processor
1222 .process(
1223 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1224 QueryLanguage::Gql,
1225 None,
1226 )
1227 .unwrap();
1228
1229 assert_eq!(result.row_count(), 0);
1230 assert_eq!(result.columns.len(), 2);
1231 assert_eq!(result.columns[0], "name");
1232 assert_eq!(result.columns[1], "age");
1233 }
1234
1235 #[cfg(feature = "gql")]
1236 #[test]
1237 fn test_params_string_equality() {
1238 let store = Arc::new(LpgStore::new().unwrap());
1240 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1241 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1242 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1243
1244 let processor = QueryProcessor::for_lpg(store);
1245
1246 let mut params = HashMap::new();
1247 params.insert("target".to_string(), Value::String("beta".into()));
1248
1249 let result = processor
1250 .process(
1251 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1252 QueryLanguage::Gql,
1253 Some(¶ms),
1254 )
1255 .unwrap();
1256
1257 assert_eq!(result.row_count(), 1);
1258 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1259 }
1260}