1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TransactionId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::GraphStoreMut;
15use grafeo_core::graph::lpg::LpgStore;
16
17use crate::catalog::Catalog;
18use crate::database::QueryResult;
19use crate::query::binder::Binder;
20use crate::query::executor::Executor;
21use crate::query::optimizer::Optimizer;
22use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
23use crate::query::planner::Planner;
24use crate::transaction::TransactionManager;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
28pub enum QueryLanguage {
29 #[cfg(feature = "gql")]
31 Gql,
32 #[cfg(feature = "cypher")]
34 Cypher,
35 #[cfg(feature = "gremlin")]
37 Gremlin,
38 #[cfg(feature = "graphql")]
40 GraphQL,
41 #[cfg(feature = "sql-pgq")]
43 SqlPgq,
44 #[cfg(feature = "sparql")]
46 Sparql,
47 #[cfg(all(feature = "graphql", feature = "rdf"))]
49 GraphQLRdf,
50}
51
52impl QueryLanguage {
53 #[must_use]
55 pub const fn is_lpg(&self) -> bool {
56 match self {
57 #[cfg(feature = "gql")]
58 Self::Gql => true,
59 #[cfg(feature = "cypher")]
60 Self::Cypher => true,
61 #[cfg(feature = "gremlin")]
62 Self::Gremlin => true,
63 #[cfg(feature = "graphql")]
64 Self::GraphQL => true,
65 #[cfg(feature = "sql-pgq")]
66 Self::SqlPgq => true,
67 #[cfg(feature = "sparql")]
68 Self::Sparql => false,
69 #[cfg(all(feature = "graphql", feature = "rdf"))]
70 Self::GraphQLRdf => false,
71 }
72 }
73}
74
75pub type QueryParams = HashMap<String, Value>;
77
78pub struct QueryProcessor {
98 lpg_store: Arc<LpgStore>,
100 graph_store: Arc<dyn GraphStoreMut>,
102 transaction_manager: Arc<TransactionManager>,
104 catalog: Arc<Catalog>,
106 optimizer: Optimizer,
108 transaction_context: Option<(EpochId, TransactionId)>,
110 #[cfg(feature = "rdf")]
112 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
113}
114
115impl QueryProcessor {
116 #[must_use]
118 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
119 let optimizer = Optimizer::from_store(&store);
120 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
121 Self {
122 lpg_store: store,
123 graph_store,
124 transaction_manager: Arc::new(TransactionManager::new()),
125 catalog: Arc::new(Catalog::new()),
126 optimizer,
127 transaction_context: None,
128 #[cfg(feature = "rdf")]
129 rdf_store: None,
130 }
131 }
132
133 #[must_use]
135 pub fn for_lpg_with_transaction(
136 store: Arc<LpgStore>,
137 transaction_manager: Arc<TransactionManager>,
138 ) -> Self {
139 let optimizer = Optimizer::from_store(&store);
140 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreMut>;
141 Self {
142 lpg_store: store,
143 graph_store,
144 transaction_manager,
145 catalog: Arc::new(Catalog::new()),
146 optimizer,
147 transaction_context: None,
148 #[cfg(feature = "rdf")]
149 rdf_store: None,
150 }
151 }
152
153 #[must_use]
155 pub fn for_graph_store_with_transaction(
156 store: Arc<dyn GraphStoreMut>,
157 transaction_manager: Arc<TransactionManager>,
158 ) -> Self {
159 let optimizer = Optimizer::from_graph_store(&*store);
160 Self {
161 lpg_store: Arc::new(LpgStore::new().expect("arena allocation for dummy LpgStore")), graph_store: store,
163 transaction_manager,
164 catalog: Arc::new(Catalog::new()),
165 optimizer,
166 transaction_context: None,
167 #[cfg(feature = "rdf")]
168 rdf_store: None,
169 }
170 }
171
172 #[cfg(feature = "rdf")]
174 #[must_use]
175 pub fn with_rdf(
176 lpg_store: Arc<LpgStore>,
177 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
178 ) -> Self {
179 let optimizer = Optimizer::from_store(&lpg_store);
180 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>;
181 Self {
182 lpg_store,
183 graph_store,
184 transaction_manager: Arc::new(TransactionManager::new()),
185 catalog: Arc::new(Catalog::new()),
186 optimizer,
187 transaction_context: None,
188 rdf_store: Some(rdf_store),
189 }
190 }
191
192 #[must_use]
196 pub fn with_transaction_context(
197 mut self,
198 viewing_epoch: EpochId,
199 transaction_id: TransactionId,
200 ) -> Self {
201 self.transaction_context = Some((viewing_epoch, transaction_id));
202 self
203 }
204
205 #[must_use]
207 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
208 self.catalog = catalog;
209 self
210 }
211
212 #[must_use]
214 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
215 self.optimizer = optimizer;
216 self
217 }
218
219 pub fn process(
239 &self,
240 query: &str,
241 language: QueryLanguage,
242 params: Option<&QueryParams>,
243 ) -> Result<QueryResult> {
244 if language.is_lpg() {
245 self.process_lpg(query, language, params)
246 } else {
247 #[cfg(feature = "rdf")]
248 {
249 self.process_rdf(query, language, params)
250 }
251 #[cfg(not(feature = "rdf"))]
252 {
253 Err(Error::Internal(
254 "RDF support not enabled. Compile with --features rdf".to_string(),
255 ))
256 }
257 }
258 }
259
260 fn process_lpg(
262 &self,
263 query: &str,
264 language: QueryLanguage,
265 params: Option<&QueryParams>,
266 ) -> Result<QueryResult> {
267 #[cfg(not(target_arch = "wasm32"))]
268 let start_time = std::time::Instant::now();
269
270 let mut logical_plan = self.translate_lpg(query, language)?;
272
273 if let Some(params) = params {
275 substitute_params(&mut logical_plan, params)?;
276 }
277
278 let mut binder = Binder::new();
280 let _binding_context = binder.bind(&logical_plan)?;
281
282 let optimized_plan = self.optimizer.optimize(logical_plan)?;
284
285 if optimized_plan.explain {
287 let mut plan = optimized_plan;
288 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
289 return Ok(explain_result(&plan));
290 }
291
292 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
294 Planner::with_context(
295 Arc::clone(&self.graph_store),
296 Arc::clone(&self.transaction_manager),
297 Some(transaction_id),
298 epoch,
299 )
300 } else {
301 Planner::with_context(
302 Arc::clone(&self.graph_store),
303 Arc::clone(&self.transaction_manager),
304 None,
305 self.transaction_manager.current_epoch(),
306 )
307 };
308 let mut physical_plan = planner.plan(&optimized_plan)?;
309
310 let executor = Executor::with_columns(physical_plan.columns.clone());
312 let mut result = executor.execute(physical_plan.operator.as_mut())?;
313
314 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
317 {
318 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
319 result.execution_time_ms = Some(elapsed_ms);
320 }
321 result.rows_scanned = Some(rows_scanned);
322
323 Ok(result)
324 }
325
326 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
328 match language {
329 #[cfg(feature = "gql")]
330 QueryLanguage::Gql => {
331 use crate::query::translators::gql;
332 gql::translate(query)
333 }
334 #[cfg(feature = "cypher")]
335 QueryLanguage::Cypher => {
336 use crate::query::translators::cypher;
337 cypher::translate(query)
338 }
339 #[cfg(feature = "gremlin")]
340 QueryLanguage::Gremlin => {
341 use crate::query::translators::gremlin;
342 gremlin::translate(query)
343 }
344 #[cfg(feature = "graphql")]
345 QueryLanguage::GraphQL => {
346 use crate::query::translators::graphql;
347 graphql::translate(query)
348 }
349 #[cfg(feature = "sql-pgq")]
350 QueryLanguage::SqlPgq => {
351 use crate::query::translators::sql_pgq;
352 sql_pgq::translate(query)
353 }
354 #[allow(unreachable_patterns)]
355 _ => Err(Error::Internal(format!(
356 "Language {:?} is not an LPG language",
357 language
358 ))),
359 }
360 }
361
362 #[cfg(feature = "rdf")]
364 fn process_rdf(
365 &self,
366 query: &str,
367 language: QueryLanguage,
368 _params: Option<&QueryParams>,
369 ) -> Result<QueryResult> {
370 use crate::query::planner::rdf::RdfPlanner;
371
372 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
373 Error::Internal("RDF store not configured for this processor".to_string())
374 })?;
375
376 let logical_plan = self.translate_rdf(query, language)?;
378
379 let mut binder = Binder::new();
381 let _binding_context = binder.bind(&logical_plan)?;
382
383 let optimized_plan = self.optimizer.optimize(logical_plan)?;
385
386 if optimized_plan.explain {
388 return Ok(explain_result(&optimized_plan));
389 }
390
391 let planner = RdfPlanner::new(Arc::clone(rdf_store));
393 let mut physical_plan = planner.plan(&optimized_plan)?;
394
395 let executor = Executor::with_columns(physical_plan.columns.clone());
397 executor.execute(physical_plan.operator.as_mut())
398 }
399
400 #[cfg(feature = "rdf")]
402 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
403 match language {
404 #[cfg(feature = "sparql")]
405 QueryLanguage::Sparql => {
406 use crate::query::translators::sparql;
407 sparql::translate(query)
408 }
409 #[cfg(all(feature = "graphql", feature = "rdf"))]
410 QueryLanguage::GraphQLRdf => {
411 use crate::query::translators::graphql_rdf;
412 graphql_rdf::translate(query, "http://example.org/")
414 }
415 _ => Err(Error::Internal(format!(
416 "Language {:?} is not an RDF language",
417 language
418 ))),
419 }
420 }
421
422 #[must_use]
424 pub fn lpg_store(&self) -> &Arc<LpgStore> {
425 &self.lpg_store
426 }
427
428 #[must_use]
430 pub fn catalog(&self) -> &Arc<Catalog> {
431 &self.catalog
432 }
433
434 #[must_use]
436 pub fn optimizer(&self) -> &Optimizer {
437 &self.optimizer
438 }
439
440 #[cfg(feature = "rdf")]
442 #[must_use]
443 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
444 self.rdf_store.as_ref()
445 }
446}
447
448impl QueryProcessor {
449 #[must_use]
451 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
452 &self.transaction_manager
453 }
454}
455
456pub(crate) fn annotate_pushdown_hints(
461 op: &mut LogicalOperator,
462 store: &dyn grafeo_core::graph::GraphStore,
463) {
464 use crate::query::plan::*;
465
466 match op {
467 LogicalOperator::Filter(filter) => {
468 annotate_pushdown_hints(&mut filter.input, store);
470
471 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
473 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
474 }
475 }
476 LogicalOperator::NodeScan(op) => {
477 if let Some(input) = &mut op.input {
478 annotate_pushdown_hints(input, store);
479 }
480 }
481 LogicalOperator::EdgeScan(op) => {
482 if let Some(input) = &mut op.input {
483 annotate_pushdown_hints(input, store);
484 }
485 }
486 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
487 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
488 LogicalOperator::Join(op) => {
489 annotate_pushdown_hints(&mut op.left, store);
490 annotate_pushdown_hints(&mut op.right, store);
491 }
492 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
493 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
494 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
495 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
496 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
497 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
498 LogicalOperator::Union(op) => {
499 for input in &mut op.inputs {
500 annotate_pushdown_hints(input, store);
501 }
502 }
503 LogicalOperator::Apply(op) => {
504 annotate_pushdown_hints(&mut op.input, store);
505 annotate_pushdown_hints(&mut op.subplan, store);
506 }
507 LogicalOperator::Otherwise(op) => {
508 annotate_pushdown_hints(&mut op.left, store);
509 annotate_pushdown_hints(&mut op.right, store);
510 }
511 _ => {}
512 }
513}
514
515fn infer_pushdown(
517 predicate: &LogicalExpression,
518 scan: &crate::query::plan::NodeScanOp,
519 store: &dyn grafeo_core::graph::GraphStore,
520) -> Option<crate::query::plan::PushdownHint> {
521 use crate::query::plan::*;
522
523 match predicate {
524 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
526 if let Some(prop) = extract_property_name(left, &scan.variable)
527 .or_else(|| extract_property_name(right, &scan.variable))
528 {
529 if store.has_property_index(&prop) {
530 return Some(PushdownHint::IndexLookup { property: prop });
531 }
532 if scan.label.is_some() {
533 return Some(PushdownHint::LabelFirst);
534 }
535 }
536 None
537 }
538 LogicalExpression::Binary {
540 left,
541 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
542 right,
543 } => {
544 if let Some(prop) = extract_property_name(left, &scan.variable)
545 .or_else(|| extract_property_name(right, &scan.variable))
546 {
547 if store.has_property_index(&prop) {
548 return Some(PushdownHint::RangeScan { property: prop });
549 }
550 if scan.label.is_some() {
551 return Some(PushdownHint::LabelFirst);
552 }
553 }
554 None
555 }
556 LogicalExpression::Binary {
558 left,
559 op: BinaryOp::And,
560 ..
561 } => infer_pushdown(left, scan, store),
562 _ => {
563 if scan.label.is_some() {
565 Some(PushdownHint::LabelFirst)
566 } else {
567 None
568 }
569 }
570 }
571}
572
573fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
576 if let LogicalExpression::Property { variable, property } = expr
577 && variable == scan_var
578 {
579 Some(property.clone())
580 } else {
581 None
582 }
583}
584
585pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
587 let tree_text = plan.root.explain_tree();
588 QueryResult {
589 columns: vec!["plan".to_string()],
590 column_types: vec![grafeo_common::types::LogicalType::String],
591 rows: vec![vec![Value::String(tree_text.into())]],
592 execution_time_ms: None,
593 rows_scanned: None,
594 status_message: None,
595 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
596 }
597}
598
599fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
601 substitute_in_operator(&mut plan.root, params)
602}
603
604fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
606 use crate::query::plan::*;
607
608 match op {
609 LogicalOperator::Filter(filter) => {
610 substitute_in_expression(&mut filter.predicate, params)?;
611 substitute_in_operator(&mut filter.input, params)?;
612 }
613 LogicalOperator::Return(ret) => {
614 for item in &mut ret.items {
615 substitute_in_expression(&mut item.expression, params)?;
616 }
617 substitute_in_operator(&mut ret.input, params)?;
618 }
619 LogicalOperator::Project(proj) => {
620 for p in &mut proj.projections {
621 substitute_in_expression(&mut p.expression, params)?;
622 }
623 substitute_in_operator(&mut proj.input, params)?;
624 }
625 LogicalOperator::NodeScan(scan) => {
626 if let Some(input) = &mut scan.input {
627 substitute_in_operator(input, params)?;
628 }
629 }
630 LogicalOperator::EdgeScan(scan) => {
631 if let Some(input) = &mut scan.input {
632 substitute_in_operator(input, params)?;
633 }
634 }
635 LogicalOperator::Expand(expand) => {
636 substitute_in_operator(&mut expand.input, params)?;
637 }
638 LogicalOperator::Join(join) => {
639 substitute_in_operator(&mut join.left, params)?;
640 substitute_in_operator(&mut join.right, params)?;
641 for cond in &mut join.conditions {
642 substitute_in_expression(&mut cond.left, params)?;
643 substitute_in_expression(&mut cond.right, params)?;
644 }
645 }
646 LogicalOperator::LeftJoin(join) => {
647 substitute_in_operator(&mut join.left, params)?;
648 substitute_in_operator(&mut join.right, params)?;
649 if let Some(cond) = &mut join.condition {
650 substitute_in_expression(cond, params)?;
651 }
652 }
653 LogicalOperator::Aggregate(agg) => {
654 for expr in &mut agg.group_by {
655 substitute_in_expression(expr, params)?;
656 }
657 for agg_expr in &mut agg.aggregates {
658 if let Some(expr) = &mut agg_expr.expression {
659 substitute_in_expression(expr, params)?;
660 }
661 }
662 substitute_in_operator(&mut agg.input, params)?;
663 }
664 LogicalOperator::Sort(sort) => {
665 for key in &mut sort.keys {
666 substitute_in_expression(&mut key.expression, params)?;
667 }
668 substitute_in_operator(&mut sort.input, params)?;
669 }
670 LogicalOperator::Limit(limit) => {
671 resolve_count_param(&mut limit.count, params)?;
672 substitute_in_operator(&mut limit.input, params)?;
673 }
674 LogicalOperator::Skip(skip) => {
675 resolve_count_param(&mut skip.count, params)?;
676 substitute_in_operator(&mut skip.input, params)?;
677 }
678 LogicalOperator::Distinct(distinct) => {
679 substitute_in_operator(&mut distinct.input, params)?;
680 }
681 LogicalOperator::CreateNode(create) => {
682 for (_, expr) in &mut create.properties {
683 substitute_in_expression(expr, params)?;
684 }
685 if let Some(input) = &mut create.input {
686 substitute_in_operator(input, params)?;
687 }
688 }
689 LogicalOperator::CreateEdge(create) => {
690 for (_, expr) in &mut create.properties {
691 substitute_in_expression(expr, params)?;
692 }
693 substitute_in_operator(&mut create.input, params)?;
694 }
695 LogicalOperator::DeleteNode(delete) => {
696 substitute_in_operator(&mut delete.input, params)?;
697 }
698 LogicalOperator::DeleteEdge(delete) => {
699 substitute_in_operator(&mut delete.input, params)?;
700 }
701 LogicalOperator::SetProperty(set) => {
702 for (_, expr) in &mut set.properties {
703 substitute_in_expression(expr, params)?;
704 }
705 substitute_in_operator(&mut set.input, params)?;
706 }
707 LogicalOperator::Union(union) => {
708 for input in &mut union.inputs {
709 substitute_in_operator(input, params)?;
710 }
711 }
712 LogicalOperator::AntiJoin(anti) => {
713 substitute_in_operator(&mut anti.left, params)?;
714 substitute_in_operator(&mut anti.right, params)?;
715 }
716 LogicalOperator::Bind(bind) => {
717 substitute_in_expression(&mut bind.expression, params)?;
718 substitute_in_operator(&mut bind.input, params)?;
719 }
720 LogicalOperator::TripleScan(scan) => {
721 if let Some(input) = &mut scan.input {
722 substitute_in_operator(input, params)?;
723 }
724 }
725 LogicalOperator::Unwind(unwind) => {
726 substitute_in_expression(&mut unwind.expression, params)?;
727 substitute_in_operator(&mut unwind.input, params)?;
728 }
729 LogicalOperator::MapCollect(mc) => {
730 substitute_in_operator(&mut mc.input, params)?;
731 }
732 LogicalOperator::Merge(merge) => {
733 for (_, expr) in &mut merge.match_properties {
734 substitute_in_expression(expr, params)?;
735 }
736 for (_, expr) in &mut merge.on_create {
737 substitute_in_expression(expr, params)?;
738 }
739 for (_, expr) in &mut merge.on_match {
740 substitute_in_expression(expr, params)?;
741 }
742 substitute_in_operator(&mut merge.input, params)?;
743 }
744 LogicalOperator::MergeRelationship(merge_rel) => {
745 for (_, expr) in &mut merge_rel.match_properties {
746 substitute_in_expression(expr, params)?;
747 }
748 for (_, expr) in &mut merge_rel.on_create {
749 substitute_in_expression(expr, params)?;
750 }
751 for (_, expr) in &mut merge_rel.on_match {
752 substitute_in_expression(expr, params)?;
753 }
754 substitute_in_operator(&mut merge_rel.input, params)?;
755 }
756 LogicalOperator::AddLabel(add_label) => {
757 substitute_in_operator(&mut add_label.input, params)?;
758 }
759 LogicalOperator::RemoveLabel(remove_label) => {
760 substitute_in_operator(&mut remove_label.input, params)?;
761 }
762 LogicalOperator::ShortestPath(sp) => {
763 substitute_in_operator(&mut sp.input, params)?;
764 }
765 LogicalOperator::InsertTriple(insert) => {
767 if let Some(ref mut input) = insert.input {
768 substitute_in_operator(input, params)?;
769 }
770 }
771 LogicalOperator::DeleteTriple(delete) => {
772 if let Some(ref mut input) = delete.input {
773 substitute_in_operator(input, params)?;
774 }
775 }
776 LogicalOperator::Modify(modify) => {
777 substitute_in_operator(&mut modify.where_clause, params)?;
778 }
779 LogicalOperator::ClearGraph(_)
780 | LogicalOperator::CreateGraph(_)
781 | LogicalOperator::DropGraph(_)
782 | LogicalOperator::LoadGraph(_)
783 | LogicalOperator::CopyGraph(_)
784 | LogicalOperator::MoveGraph(_)
785 | LogicalOperator::AddGraph(_) => {}
786 LogicalOperator::HorizontalAggregate(op) => {
787 substitute_in_operator(&mut op.input, params)?;
788 }
789 LogicalOperator::Empty => {}
790 LogicalOperator::VectorScan(scan) => {
791 substitute_in_expression(&mut scan.query_vector, params)?;
792 if let Some(ref mut input) = scan.input {
793 substitute_in_operator(input, params)?;
794 }
795 }
796 LogicalOperator::VectorJoin(join) => {
797 substitute_in_expression(&mut join.query_vector, params)?;
798 substitute_in_operator(&mut join.input, params)?;
799 }
800 LogicalOperator::Except(except) => {
801 substitute_in_operator(&mut except.left, params)?;
802 substitute_in_operator(&mut except.right, params)?;
803 }
804 LogicalOperator::Intersect(intersect) => {
805 substitute_in_operator(&mut intersect.left, params)?;
806 substitute_in_operator(&mut intersect.right, params)?;
807 }
808 LogicalOperator::Otherwise(otherwise) => {
809 substitute_in_operator(&mut otherwise.left, params)?;
810 substitute_in_operator(&mut otherwise.right, params)?;
811 }
812 LogicalOperator::Apply(apply) => {
813 substitute_in_operator(&mut apply.input, params)?;
814 substitute_in_operator(&mut apply.subplan, params)?;
815 }
816 LogicalOperator::ParameterScan(_) => {}
818 LogicalOperator::MultiWayJoin(mwj) => {
819 for input in &mut mwj.inputs {
820 substitute_in_operator(input, params)?;
821 }
822 for cond in &mut mwj.conditions {
823 substitute_in_expression(&mut cond.left, params)?;
824 substitute_in_expression(&mut cond.right, params)?;
825 }
826 }
827 LogicalOperator::CreatePropertyGraph(_) => {}
829 LogicalOperator::CallProcedure(_) => {}
831 }
832 Ok(())
833}
834
835fn resolve_count_param(
837 count: &mut crate::query::plan::CountExpr,
838 params: &QueryParams,
839) -> Result<()> {
840 use crate::query::plan::CountExpr;
841 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
842
843 if let CountExpr::Parameter(name) = count {
844 let value = params.get(name.as_str()).ok_or_else(|| {
845 Error::Query(QueryError::new(
846 QueryErrorKind::Semantic,
847 format!("Missing parameter for SKIP/LIMIT: ${name}"),
848 ))
849 })?;
850 let n = match value {
851 Value::Int64(i) if *i >= 0 => *i as usize,
852 Value::Int64(i) => {
853 return Err(Error::Query(QueryError::new(
854 QueryErrorKind::Semantic,
855 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
856 )));
857 }
858 other => {
859 return Err(Error::Query(QueryError::new(
860 QueryErrorKind::Semantic,
861 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
862 )));
863 }
864 };
865 *count = CountExpr::Literal(n);
866 }
867 Ok(())
868}
869
870fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
872 use crate::query::plan::LogicalExpression;
873
874 match expr {
875 LogicalExpression::Parameter(name) => {
876 if let Some(value) = params.get(name) {
877 *expr = LogicalExpression::Literal(value.clone());
878 } else {
879 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
880 }
881 }
882 LogicalExpression::Binary { left, right, .. } => {
883 substitute_in_expression(left, params)?;
884 substitute_in_expression(right, params)?;
885 }
886 LogicalExpression::Unary { operand, .. } => {
887 substitute_in_expression(operand, params)?;
888 }
889 LogicalExpression::FunctionCall { args, .. } => {
890 for arg in args {
891 substitute_in_expression(arg, params)?;
892 }
893 }
894 LogicalExpression::List(items) => {
895 for item in items {
896 substitute_in_expression(item, params)?;
897 }
898 }
899 LogicalExpression::Map(pairs) => {
900 for (_, value) in pairs {
901 substitute_in_expression(value, params)?;
902 }
903 }
904 LogicalExpression::IndexAccess { base, index } => {
905 substitute_in_expression(base, params)?;
906 substitute_in_expression(index, params)?;
907 }
908 LogicalExpression::SliceAccess { base, start, end } => {
909 substitute_in_expression(base, params)?;
910 if let Some(s) = start {
911 substitute_in_expression(s, params)?;
912 }
913 if let Some(e) = end {
914 substitute_in_expression(e, params)?;
915 }
916 }
917 LogicalExpression::Case {
918 operand,
919 when_clauses,
920 else_clause,
921 } => {
922 if let Some(op) = operand {
923 substitute_in_expression(op, params)?;
924 }
925 for (cond, result) in when_clauses {
926 substitute_in_expression(cond, params)?;
927 substitute_in_expression(result, params)?;
928 }
929 if let Some(el) = else_clause {
930 substitute_in_expression(el, params)?;
931 }
932 }
933 LogicalExpression::Property { .. }
934 | LogicalExpression::Variable(_)
935 | LogicalExpression::Literal(_)
936 | LogicalExpression::Labels(_)
937 | LogicalExpression::Type(_)
938 | LogicalExpression::Id(_) => {}
939 LogicalExpression::ListComprehension {
940 list_expr,
941 filter_expr,
942 map_expr,
943 ..
944 } => {
945 substitute_in_expression(list_expr, params)?;
946 if let Some(filter) = filter_expr {
947 substitute_in_expression(filter, params)?;
948 }
949 substitute_in_expression(map_expr, params)?;
950 }
951 LogicalExpression::ListPredicate {
952 list_expr,
953 predicate,
954 ..
955 } => {
956 substitute_in_expression(list_expr, params)?;
957 substitute_in_expression(predicate, params)?;
958 }
959 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
960 }
962 LogicalExpression::PatternComprehension { projection, .. } => {
963 substitute_in_expression(projection, params)?;
964 }
965 LogicalExpression::MapProjection { entries, .. } => {
966 for entry in entries {
967 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
968 substitute_in_expression(expr, params)?;
969 }
970 }
971 }
972 LogicalExpression::Reduce {
973 initial,
974 list,
975 expression,
976 ..
977 } => {
978 substitute_in_expression(initial, params)?;
979 substitute_in_expression(list, params)?;
980 substitute_in_expression(expression, params)?;
981 }
982 }
983 Ok(())
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989
990 #[test]
991 fn test_query_language_is_lpg() {
992 #[cfg(feature = "gql")]
993 assert!(QueryLanguage::Gql.is_lpg());
994 #[cfg(feature = "cypher")]
995 assert!(QueryLanguage::Cypher.is_lpg());
996 #[cfg(feature = "sparql")]
997 assert!(!QueryLanguage::Sparql.is_lpg());
998 }
999
1000 #[test]
1001 fn test_processor_creation() {
1002 let store = Arc::new(LpgStore::new().unwrap());
1003 let processor = QueryProcessor::for_lpg(store);
1004 assert!(processor.lpg_store().node_count() == 0);
1005 }
1006
1007 #[cfg(feature = "gql")]
1008 #[test]
1009 fn test_process_simple_gql() {
1010 let store = Arc::new(LpgStore::new().unwrap());
1011 store.create_node(&["Person"]);
1012 store.create_node(&["Person"]);
1013
1014 let processor = QueryProcessor::for_lpg(store);
1015 let result = processor
1016 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1017 .unwrap();
1018
1019 assert_eq!(result.row_count(), 2);
1020 assert_eq!(result.columns[0], "n");
1021 }
1022
1023 #[cfg(feature = "cypher")]
1024 #[test]
1025 fn test_process_simple_cypher() {
1026 let store = Arc::new(LpgStore::new().unwrap());
1027 store.create_node(&["Person"]);
1028
1029 let processor = QueryProcessor::for_lpg(store);
1030 let result = processor
1031 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1032 .unwrap();
1033
1034 assert_eq!(result.row_count(), 1);
1035 }
1036
1037 #[cfg(feature = "gql")]
1038 #[test]
1039 fn test_process_with_params() {
1040 let store = Arc::new(LpgStore::new().unwrap());
1041 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1042 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1043 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1044
1045 let processor = QueryProcessor::for_lpg(store);
1046
1047 let mut params = HashMap::new();
1049 params.insert("min_age".to_string(), Value::Int64(30));
1050
1051 let result = processor
1052 .process(
1053 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1054 QueryLanguage::Gql,
1055 Some(¶ms),
1056 )
1057 .unwrap();
1058
1059 assert_eq!(result.row_count(), 2);
1061 }
1062
1063 #[cfg(feature = "gql")]
1064 #[test]
1065 fn test_missing_param_error() {
1066 let store = Arc::new(LpgStore::new().unwrap());
1067 store.create_node(&["Person"]);
1068
1069 let processor = QueryProcessor::for_lpg(store);
1070
1071 let params: HashMap<String, Value> = HashMap::new();
1073 let result = processor.process(
1074 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1075 QueryLanguage::Gql,
1076 Some(¶ms),
1077 );
1078
1079 assert!(result.is_err());
1081 let err = result.unwrap_err();
1082 assert!(
1083 err.to_string().contains("Missing parameter"),
1084 "Expected 'Missing parameter' error, got: {}",
1085 err
1086 );
1087 }
1088
1089 #[cfg(feature = "gql")]
1090 #[test]
1091 fn test_params_in_filter_with_property() {
1092 let store = Arc::new(LpgStore::new().unwrap());
1094 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1095 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1096
1097 let processor = QueryProcessor::for_lpg(store);
1098
1099 let mut params = HashMap::new();
1100 params.insert("threshold".to_string(), Value::Int64(15));
1101
1102 let result = processor
1103 .process(
1104 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1105 QueryLanguage::Gql,
1106 Some(¶ms),
1107 )
1108 .unwrap();
1109
1110 assert_eq!(result.row_count(), 1);
1112 let row = &result.rows[0];
1113 assert_eq!(row[0], Value::Int64(20));
1114 }
1115
1116 #[cfg(feature = "gql")]
1117 #[test]
1118 fn test_params_in_multiple_where_conditions() {
1119 let store = Arc::new(LpgStore::new().unwrap());
1121 store.create_node_with_props(
1122 &["Person"],
1123 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1124 );
1125 store.create_node_with_props(
1126 &["Person"],
1127 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1128 );
1129 store.create_node_with_props(
1130 &["Person"],
1131 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1132 );
1133
1134 let processor = QueryProcessor::for_lpg(store);
1135
1136 let mut params = HashMap::new();
1137 params.insert("min_age".to_string(), Value::Int64(30));
1138 params.insert("min_score".to_string(), Value::Int64(75));
1139
1140 let result = processor
1141 .process(
1142 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1143 QueryLanguage::Gql,
1144 Some(¶ms),
1145 )
1146 .unwrap();
1147
1148 assert_eq!(result.row_count(), 1);
1150 }
1151
1152 #[cfg(feature = "gql")]
1153 #[test]
1154 fn test_params_with_in_list() {
1155 let store = Arc::new(LpgStore::new().unwrap());
1157 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1158 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1159 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1160
1161 let processor = QueryProcessor::for_lpg(store);
1162
1163 let mut params = HashMap::new();
1165 params.insert("target".to_string(), Value::String("active".into()));
1166
1167 let result = processor
1168 .process(
1169 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1170 QueryLanguage::Gql,
1171 Some(¶ms),
1172 )
1173 .unwrap();
1174
1175 assert_eq!(result.row_count(), 1);
1176 }
1177
1178 #[cfg(feature = "gql")]
1179 #[test]
1180 fn test_params_same_type_comparison() {
1181 let store = Arc::new(LpgStore::new().unwrap());
1183 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1184 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1185
1186 let processor = QueryProcessor::for_lpg(store);
1187
1188 let mut params = HashMap::new();
1190 params.insert("threshold".to_string(), Value::Int64(75));
1191
1192 let result = processor
1193 .process(
1194 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1195 QueryLanguage::Gql,
1196 Some(¶ms),
1197 )
1198 .unwrap();
1199
1200 assert_eq!(result.row_count(), 1);
1202 }
1203
1204 #[cfg(feature = "gql")]
1205 #[test]
1206 fn test_process_empty_result_has_columns() {
1207 let store = Arc::new(LpgStore::new().unwrap());
1209 let processor = QueryProcessor::for_lpg(store);
1212 let result = processor
1213 .process(
1214 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1215 QueryLanguage::Gql,
1216 None,
1217 )
1218 .unwrap();
1219
1220 assert_eq!(result.row_count(), 0);
1221 assert_eq!(result.columns.len(), 2);
1222 assert_eq!(result.columns[0], "name");
1223 assert_eq!(result.columns[1], "age");
1224 }
1225
1226 #[cfg(feature = "gql")]
1227 #[test]
1228 fn test_params_string_equality() {
1229 let store = Arc::new(LpgStore::new().unwrap());
1231 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1232 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1233 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1234
1235 let processor = QueryProcessor::for_lpg(store);
1236
1237 let mut params = HashMap::new();
1238 params.insert("target".to_string(), Value::String("beta".into()));
1239
1240 let result = processor
1241 .process(
1242 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1243 QueryLanguage::Gql,
1244 Some(¶ms),
1245 )
1246 .unwrap();
1247
1248 assert_eq!(result.row_count(), 1);
1249 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1250 }
1251}