1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStore, GraphStoreMut};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32 #[cfg(feature = "gql")]
34 Gql,
35 #[cfg(feature = "cypher")]
37 Cypher,
38 #[cfg(feature = "gremlin")]
40 Gremlin,
41 #[cfg(feature = "graphql")]
43 GraphQL,
44 #[cfg(feature = "sql-pgq")]
46 SqlPgq,
47 #[cfg(feature = "sparql")]
49 Sparql,
50 #[cfg(all(feature = "graphql", feature = "triple-store"))]
52 GraphQLRdf,
53}
54
55impl QueryLanguage {
56 #[must_use]
58 pub const fn is_lpg(&self) -> bool {
59 match self {
60 #[cfg(feature = "gql")]
61 Self::Gql => true,
62 #[cfg(feature = "cypher")]
63 Self::Cypher => true,
64 #[cfg(feature = "gremlin")]
65 Self::Gremlin => true,
66 #[cfg(feature = "graphql")]
67 Self::GraphQL => true,
68 #[cfg(feature = "sql-pgq")]
69 Self::SqlPgq => true,
70 #[cfg(feature = "sparql")]
71 Self::Sparql => false,
72 #[cfg(all(feature = "graphql", feature = "triple-store"))]
73 Self::GraphQLRdf => false,
74 }
75 }
76}
77
78pub type QueryParams = HashMap<String, Value>;
80
81pub struct QueryProcessor {
101 #[cfg(feature = "lpg")]
103 lpg_store: Arc<LpgStore>,
104 graph_store: Arc<dyn GraphStore>,
106 write_store: Option<Arc<dyn GraphStoreMut>>,
108 transaction_manager: Arc<TransactionManager>,
110 catalog: Arc<Catalog>,
112 optimizer: Optimizer,
114 transaction_context: Option<(EpochId, TransactionId)>,
116 #[cfg(feature = "triple-store")]
118 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
119}
120
121impl QueryProcessor {
122 #[cfg(feature = "lpg")]
124 #[must_use]
125 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
126 let optimizer = Optimizer::from_store(&store);
127 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
128 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
129 Self {
130 lpg_store: store,
131 graph_store,
132 write_store,
133 transaction_manager: Arc::new(TransactionManager::new()),
134 catalog: Arc::new(Catalog::new()),
135 optimizer,
136 transaction_context: None,
137 #[cfg(feature = "triple-store")]
138 rdf_store: None,
139 }
140 }
141
142 #[cfg(feature = "lpg")]
144 #[must_use]
145 pub fn for_lpg_with_transaction(
146 store: Arc<LpgStore>,
147 transaction_manager: Arc<TransactionManager>,
148 ) -> Self {
149 let optimizer = Optimizer::from_store(&store);
150 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
151 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
152 Self {
153 lpg_store: store,
154 graph_store,
155 write_store,
156 transaction_manager,
157 catalog: Arc::new(Catalog::new()),
158 optimizer,
159 transaction_context: None,
160 #[cfg(feature = "triple-store")]
161 rdf_store: None,
162 }
163 }
164
165 pub fn for_graph_store_with_transaction(
171 store: Arc<dyn GraphStoreMut>,
172 transaction_manager: Arc<TransactionManager>,
173 ) -> Result<Self> {
174 let optimizer = Optimizer::from_graph_store(&*store);
175 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
176 Ok(Self {
177 #[cfg(feature = "lpg")]
178 lpg_store: Arc::new(LpgStore::new()?),
179 graph_store: read_store,
180 write_store: Some(store),
181 transaction_manager,
182 catalog: Arc::new(Catalog::new()),
183 optimizer,
184 transaction_context: None,
185 #[cfg(feature = "triple-store")]
186 rdf_store: None,
187 })
188 }
189
190 pub fn for_stores_with_transaction(
196 read_store: Arc<dyn GraphStore>,
197 write_store: Option<Arc<dyn GraphStoreMut>>,
198 transaction_manager: Arc<TransactionManager>,
199 ) -> Result<Self> {
200 let optimizer = Optimizer::from_graph_store(&*read_store);
201 Ok(Self {
202 #[cfg(feature = "lpg")]
203 lpg_store: Arc::new(LpgStore::new()?),
204 graph_store: read_store,
205 write_store,
206 transaction_manager,
207 catalog: Arc::new(Catalog::new()),
208 optimizer,
209 transaction_context: None,
210 #[cfg(feature = "triple-store")]
211 rdf_store: None,
212 })
213 }
214
215 #[must_use]
219 pub fn with_transaction_context(
220 mut self,
221 viewing_epoch: EpochId,
222 transaction_id: TransactionId,
223 ) -> Self {
224 self.transaction_context = Some((viewing_epoch, transaction_id));
225 self
226 }
227
228 #[must_use]
230 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
231 self.catalog = catalog;
232 self
233 }
234
235 #[must_use]
237 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
238 self.optimizer = optimizer;
239 self
240 }
241
242 pub fn process(
262 &self,
263 query: &str,
264 language: QueryLanguage,
265 params: Option<&QueryParams>,
266 ) -> Result<QueryResult> {
267 if language.is_lpg() {
268 self.process_lpg(query, language, params)
269 } else {
270 #[cfg(feature = "triple-store")]
271 {
272 self.process_rdf(query, language, params)
273 }
274 #[cfg(not(feature = "triple-store"))]
275 {
276 Err(Error::Internal(
277 "RDF support not enabled. Compile with --features rdf".to_string(),
278 ))
279 }
280 }
281 }
282
283 fn process_lpg(
285 &self,
286 query: &str,
287 language: QueryLanguage,
288 params: Option<&QueryParams>,
289 ) -> Result<QueryResult> {
290 #[cfg(not(target_arch = "wasm32"))]
291 let start_time = std::time::Instant::now();
292
293 let mut logical_plan = self.translate_lpg(query, language)?;
295
296 let has_defaults = !logical_plan.default_params.is_empty();
298 if params.is_some() || has_defaults {
299 let merged = if has_defaults {
300 let mut merged = logical_plan.default_params.clone();
301 if let Some(params) = params {
302 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
303 }
304 merged
305 } else {
306 params.cloned().unwrap_or_default()
307 };
308 substitute_params(&mut logical_plan, &merged)?;
309 }
310
311 let mut binder = Binder::new();
313 let _binding_context = binder.bind(&logical_plan)?;
314
315 let optimized_plan = self.optimizer.optimize(logical_plan)?;
317
318 if optimized_plan.explain {
320 let mut plan = optimized_plan;
321 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
322 return Ok(explain_result(&plan));
323 }
324
325 let is_read_only =
329 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
330 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
331 Planner::with_context(
332 Arc::clone(&self.graph_store),
333 self.write_store.as_ref().map(Arc::clone),
334 Arc::clone(&self.transaction_manager),
335 Some(transaction_id),
336 epoch,
337 )
338 } else {
339 Planner::with_context(
340 Arc::clone(&self.graph_store),
341 self.write_store.as_ref().map(Arc::clone),
342 Arc::clone(&self.transaction_manager),
343 None,
344 self.transaction_manager.current_epoch(),
345 )
346 }
347 .with_read_only(is_read_only);
348 let mut physical_plan = planner.plan(&optimized_plan)?;
349
350 let executor = Executor::with_columns(physical_plan.columns.clone());
352 let mut result = executor.execute(physical_plan.operator.as_mut())?;
353
354 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
357 {
358 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
359 result.execution_time_ms = Some(elapsed_ms);
360 }
361 result.rows_scanned = Some(rows_scanned);
362
363 Ok(result)
364 }
365
366 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
368 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
369 match language {
370 #[cfg(feature = "gql")]
371 QueryLanguage::Gql => {
372 use crate::query::translators::gql;
373 gql::translate(query)
374 }
375 #[cfg(feature = "cypher")]
376 QueryLanguage::Cypher => {
377 use crate::query::translators::cypher;
378 cypher::translate(query)
379 }
380 #[cfg(feature = "gremlin")]
381 QueryLanguage::Gremlin => {
382 use crate::query::translators::gremlin;
383 gremlin::translate(query)
384 }
385 #[cfg(feature = "graphql")]
386 QueryLanguage::GraphQL => {
387 use crate::query::translators::graphql;
388 graphql::translate(query)
389 }
390 #[cfg(feature = "sql-pgq")]
391 QueryLanguage::SqlPgq => {
392 use crate::query::translators::sql_pgq;
393 sql_pgq::translate(query)
394 }
395 #[allow(unreachable_patterns)]
396 _ => Err(Error::Internal(format!(
397 "Language {:?} is not an LPG language",
398 language
399 ))),
400 }
401 }
402
403 #[cfg(feature = "lpg")]
405 #[must_use]
406 pub fn lpg_store(&self) -> &Arc<LpgStore> {
407 &self.lpg_store
408 }
409
410 #[must_use]
412 pub fn catalog(&self) -> &Arc<Catalog> {
413 &self.catalog
414 }
415
416 #[must_use]
418 pub fn optimizer(&self) -> &Optimizer {
419 &self.optimizer
420 }
421}
422
423impl QueryProcessor {
424 #[must_use]
426 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
427 &self.transaction_manager
428 }
429}
430
431#[cfg(feature = "triple-store")]
436impl QueryProcessor {
437 #[cfg(feature = "lpg")]
439 #[must_use]
440 pub fn with_rdf(
441 lpg_store: Arc<LpgStore>,
442 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
443 ) -> Self {
444 let optimizer = Optimizer::from_store(&lpg_store);
445 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
446 let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
447 Self {
448 lpg_store,
449 graph_store,
450 write_store,
451 transaction_manager: Arc::new(TransactionManager::new()),
452 catalog: Arc::new(Catalog::new()),
453 optimizer,
454 transaction_context: None,
455 rdf_store: Some(rdf_store),
456 }
457 }
458
459 #[must_use]
461 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
462 self.rdf_store.as_ref()
463 }
464
465 fn process_rdf(
467 &self,
468 query: &str,
469 language: QueryLanguage,
470 params: Option<&QueryParams>,
471 ) -> Result<QueryResult> {
472 use crate::query::planner::rdf::RdfPlanner;
473
474 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
475 Error::Internal("RDF store not configured for this processor".to_string())
476 })?;
477
478 let mut logical_plan = self.translate_rdf(query, language)?;
480
481 let has_defaults = !logical_plan.default_params.is_empty();
483 if params.is_some() || has_defaults {
484 let merged = if has_defaults {
485 let mut merged = logical_plan.default_params.clone();
486 if let Some(params) = params {
487 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
488 }
489 merged
490 } else {
491 params.cloned().unwrap_or_default()
492 };
493 substitute_params(&mut logical_plan, &merged)?;
494 }
495
496 let mut binder = Binder::new();
498 let _binding_context = binder.bind(&logical_plan)?;
499
500 let optimized_plan = self.optimizer.optimize(logical_plan)?;
502
503 if optimized_plan.explain {
505 return Ok(explain_result(&optimized_plan));
506 }
507
508 let planner = RdfPlanner::new(Arc::clone(rdf_store));
510 let mut physical_plan = planner.plan(&optimized_plan)?;
511
512 let executor = Executor::with_columns(physical_plan.columns.clone());
514 executor.execute(physical_plan.operator.as_mut())
515 }
516
517 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
519 match language {
520 #[cfg(feature = "sparql")]
521 QueryLanguage::Sparql => {
522 use crate::query::translators::sparql;
523 sparql::translate(query)
524 }
525 #[cfg(all(feature = "graphql", feature = "triple-store"))]
526 QueryLanguage::GraphQLRdf => {
527 use crate::query::translators::graphql_rdf;
528 graphql_rdf::translate(query, "http://example.org/")
530 }
531 _ => Err(Error::Internal(format!(
532 "Language {:?} is not an RDF language",
533 language
534 ))),
535 }
536 }
537}
538
539pub(crate) fn annotate_pushdown_hints(
544 op: &mut LogicalOperator,
545 store: &dyn grafeo_core::graph::GraphStore,
546) {
547 #[allow(clippy::wildcard_imports)]
548 use crate::query::plan::*;
549
550 match op {
551 LogicalOperator::Filter(filter) => {
552 annotate_pushdown_hints(&mut filter.input, store);
554
555 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
557 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
558 }
559 }
560 LogicalOperator::NodeScan(op) => {
561 if let Some(input) = &mut op.input {
562 annotate_pushdown_hints(input, store);
563 }
564 }
565 LogicalOperator::EdgeScan(op) => {
566 if let Some(input) = &mut op.input {
567 annotate_pushdown_hints(input, store);
568 }
569 }
570 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
571 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
572 LogicalOperator::Join(op) => {
573 annotate_pushdown_hints(&mut op.left, store);
574 annotate_pushdown_hints(&mut op.right, store);
575 }
576 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
577 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
578 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
579 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
580 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
581 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
582 LogicalOperator::Union(op) => {
583 for input in &mut op.inputs {
584 annotate_pushdown_hints(input, store);
585 }
586 }
587 LogicalOperator::Apply(op) => {
588 annotate_pushdown_hints(&mut op.input, store);
589 annotate_pushdown_hints(&mut op.subplan, store);
590 }
591 LogicalOperator::Otherwise(op) => {
592 annotate_pushdown_hints(&mut op.left, store);
593 annotate_pushdown_hints(&mut op.right, store);
594 }
595 _ => {}
596 }
597}
598
599fn infer_pushdown(
601 predicate: &LogicalExpression,
602 scan: &crate::query::plan::NodeScanOp,
603 store: &dyn grafeo_core::graph::GraphStore,
604) -> Option<crate::query::plan::PushdownHint> {
605 #[allow(clippy::wildcard_imports)]
606 use crate::query::plan::*;
607
608 match predicate {
609 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
611 if let Some(prop) = extract_property_name(left, &scan.variable)
612 .or_else(|| extract_property_name(right, &scan.variable))
613 {
614 if store.has_property_index(&prop) {
615 return Some(PushdownHint::IndexLookup { property: prop });
616 }
617 if scan.label.is_some() {
618 return Some(PushdownHint::LabelFirst);
619 }
620 }
621 None
622 }
623 LogicalExpression::Binary {
625 left,
626 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
627 right,
628 } => {
629 if let Some(prop) = extract_property_name(left, &scan.variable)
630 .or_else(|| extract_property_name(right, &scan.variable))
631 {
632 if store.has_property_index(&prop) {
633 return Some(PushdownHint::RangeScan { property: prop });
634 }
635 if scan.label.is_some() {
636 return Some(PushdownHint::LabelFirst);
637 }
638 }
639 None
640 }
641 LogicalExpression::Binary {
643 left,
644 op: BinaryOp::And,
645 ..
646 } => infer_pushdown(left, scan, store),
647 _ => {
648 if scan.label.is_some() {
650 Some(PushdownHint::LabelFirst)
651 } else {
652 None
653 }
654 }
655 }
656}
657
658fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
661 if let LogicalExpression::Property { variable, property } = expr
662 && variable == scan_var
663 {
664 Some(property.clone())
665 } else {
666 None
667 }
668}
669
670pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
672 let tree_text = plan.root.explain_tree();
673 QueryResult {
674 columns: vec!["plan".to_string()],
675 column_types: vec![grafeo_common::types::LogicalType::String],
676 rows: vec![vec![Value::String(tree_text.into())]],
677 execution_time_ms: None,
678 rows_scanned: None,
679 status_message: None,
680 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
681 }
682}
683
684pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
690 substitute_in_operator(&mut plan.root, params)
691}
692
693fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
695 #[allow(clippy::wildcard_imports)]
696 use crate::query::plan::*;
697
698 match op {
699 LogicalOperator::Filter(filter) => {
700 substitute_in_expression(&mut filter.predicate, params)?;
701 substitute_in_operator(&mut filter.input, params)?;
702 }
703 LogicalOperator::Return(ret) => {
704 for item in &mut ret.items {
705 substitute_in_expression(&mut item.expression, params)?;
706 }
707 substitute_in_operator(&mut ret.input, params)?;
708 }
709 LogicalOperator::Project(proj) => {
710 for p in &mut proj.projections {
711 substitute_in_expression(&mut p.expression, params)?;
712 }
713 substitute_in_operator(&mut proj.input, params)?;
714 }
715 LogicalOperator::NodeScan(scan) => {
716 if let Some(input) = &mut scan.input {
717 substitute_in_operator(input, params)?;
718 }
719 }
720 LogicalOperator::EdgeScan(scan) => {
721 if let Some(input) = &mut scan.input {
722 substitute_in_operator(input, params)?;
723 }
724 }
725 LogicalOperator::Expand(expand) => {
726 substitute_in_operator(&mut expand.input, params)?;
727 }
728 LogicalOperator::Join(join) => {
729 substitute_in_operator(&mut join.left, params)?;
730 substitute_in_operator(&mut join.right, params)?;
731 for cond in &mut join.conditions {
732 substitute_in_expression(&mut cond.left, params)?;
733 substitute_in_expression(&mut cond.right, params)?;
734 }
735 }
736 LogicalOperator::LeftJoin(join) => {
737 substitute_in_operator(&mut join.left, params)?;
738 substitute_in_operator(&mut join.right, params)?;
739 if let Some(cond) = &mut join.condition {
740 substitute_in_expression(cond, params)?;
741 }
742 }
743 LogicalOperator::Aggregate(agg) => {
744 for expr in &mut agg.group_by {
745 substitute_in_expression(expr, params)?;
746 }
747 for agg_expr in &mut agg.aggregates {
748 if let Some(expr) = &mut agg_expr.expression {
749 substitute_in_expression(expr, params)?;
750 }
751 }
752 substitute_in_operator(&mut agg.input, params)?;
753 }
754 LogicalOperator::Sort(sort) => {
755 for key in &mut sort.keys {
756 substitute_in_expression(&mut key.expression, params)?;
757 }
758 substitute_in_operator(&mut sort.input, params)?;
759 }
760 LogicalOperator::Limit(limit) => {
761 resolve_count_param(&mut limit.count, params)?;
762 substitute_in_operator(&mut limit.input, params)?;
763 }
764 LogicalOperator::Skip(skip) => {
765 resolve_count_param(&mut skip.count, params)?;
766 substitute_in_operator(&mut skip.input, params)?;
767 }
768 LogicalOperator::Distinct(distinct) => {
769 substitute_in_operator(&mut distinct.input, params)?;
770 }
771 LogicalOperator::CreateNode(create) => {
772 for (_, expr) in &mut create.properties {
773 substitute_in_expression(expr, params)?;
774 }
775 if let Some(input) = &mut create.input {
776 substitute_in_operator(input, params)?;
777 }
778 }
779 LogicalOperator::CreateEdge(create) => {
780 for (_, expr) in &mut create.properties {
781 substitute_in_expression(expr, params)?;
782 }
783 substitute_in_operator(&mut create.input, params)?;
784 }
785 LogicalOperator::DeleteNode(delete) => {
786 substitute_in_operator(&mut delete.input, params)?;
787 }
788 LogicalOperator::DeleteEdge(delete) => {
789 substitute_in_operator(&mut delete.input, params)?;
790 }
791 LogicalOperator::SetProperty(set) => {
792 for (_, expr) in &mut set.properties {
793 substitute_in_expression(expr, params)?;
794 }
795 substitute_in_operator(&mut set.input, params)?;
796 }
797 LogicalOperator::Union(union) => {
798 for input in &mut union.inputs {
799 substitute_in_operator(input, params)?;
800 }
801 }
802 LogicalOperator::AntiJoin(anti) => {
803 substitute_in_operator(&mut anti.left, params)?;
804 substitute_in_operator(&mut anti.right, params)?;
805 }
806 LogicalOperator::Bind(bind) => {
807 substitute_in_expression(&mut bind.expression, params)?;
808 substitute_in_operator(&mut bind.input, params)?;
809 }
810 LogicalOperator::TripleScan(scan) => {
811 if let Some(input) = &mut scan.input {
812 substitute_in_operator(input, params)?;
813 }
814 }
815 LogicalOperator::Unwind(unwind) => {
816 substitute_in_expression(&mut unwind.expression, params)?;
817 substitute_in_operator(&mut unwind.input, params)?;
818 }
819 LogicalOperator::MapCollect(mc) => {
820 substitute_in_operator(&mut mc.input, params)?;
821 }
822 LogicalOperator::Merge(merge) => {
823 for (_, expr) in &mut merge.match_properties {
824 substitute_in_expression(expr, params)?;
825 }
826 for (_, expr) in &mut merge.on_create {
827 substitute_in_expression(expr, params)?;
828 }
829 for (_, expr) in &mut merge.on_match {
830 substitute_in_expression(expr, params)?;
831 }
832 substitute_in_operator(&mut merge.input, params)?;
833 }
834 LogicalOperator::MergeRelationship(merge_rel) => {
835 for (_, expr) in &mut merge_rel.match_properties {
836 substitute_in_expression(expr, params)?;
837 }
838 for (_, expr) in &mut merge_rel.on_create {
839 substitute_in_expression(expr, params)?;
840 }
841 for (_, expr) in &mut merge_rel.on_match {
842 substitute_in_expression(expr, params)?;
843 }
844 substitute_in_operator(&mut merge_rel.input, params)?;
845 }
846 LogicalOperator::AddLabel(add_label) => {
847 substitute_in_operator(&mut add_label.input, params)?;
848 }
849 LogicalOperator::RemoveLabel(remove_label) => {
850 substitute_in_operator(&mut remove_label.input, params)?;
851 }
852 LogicalOperator::ShortestPath(sp) => {
853 substitute_in_operator(&mut sp.input, params)?;
854 }
855 LogicalOperator::InsertTriple(insert) => {
857 if let Some(ref mut input) = insert.input {
858 substitute_in_operator(input, params)?;
859 }
860 }
861 LogicalOperator::DeleteTriple(delete) => {
862 if let Some(ref mut input) = delete.input {
863 substitute_in_operator(input, params)?;
864 }
865 }
866 LogicalOperator::Modify(modify) => {
867 substitute_in_operator(&mut modify.where_clause, params)?;
868 }
869 LogicalOperator::ClearGraph(_)
870 | LogicalOperator::CreateGraph(_)
871 | LogicalOperator::DropGraph(_)
872 | LogicalOperator::LoadGraph(_)
873 | LogicalOperator::CopyGraph(_)
874 | LogicalOperator::MoveGraph(_)
875 | LogicalOperator::AddGraph(_) => {}
876 LogicalOperator::HorizontalAggregate(op) => {
877 substitute_in_operator(&mut op.input, params)?;
878 }
879 LogicalOperator::Empty => {}
880 LogicalOperator::VectorScan(scan) => {
881 substitute_in_expression(&mut scan.query_vector, params)?;
882 if let Some(ref mut input) = scan.input {
883 substitute_in_operator(input, params)?;
884 }
885 }
886 LogicalOperator::VectorJoin(join) => {
887 substitute_in_expression(&mut join.query_vector, params)?;
888 substitute_in_operator(&mut join.input, params)?;
889 }
890 LogicalOperator::Except(except) => {
891 substitute_in_operator(&mut except.left, params)?;
892 substitute_in_operator(&mut except.right, params)?;
893 }
894 LogicalOperator::Intersect(intersect) => {
895 substitute_in_operator(&mut intersect.left, params)?;
896 substitute_in_operator(&mut intersect.right, params)?;
897 }
898 LogicalOperator::Otherwise(otherwise) => {
899 substitute_in_operator(&mut otherwise.left, params)?;
900 substitute_in_operator(&mut otherwise.right, params)?;
901 }
902 LogicalOperator::Apply(apply) => {
903 substitute_in_operator(&mut apply.input, params)?;
904 substitute_in_operator(&mut apply.subplan, params)?;
905 }
906 LogicalOperator::ParameterScan(_) => {}
908 LogicalOperator::MultiWayJoin(mwj) => {
909 for input in &mut mwj.inputs {
910 substitute_in_operator(input, params)?;
911 }
912 for cond in &mut mwj.conditions {
913 substitute_in_expression(&mut cond.left, params)?;
914 substitute_in_expression(&mut cond.right, params)?;
915 }
916 }
917 LogicalOperator::CreatePropertyGraph(_) => {}
919 LogicalOperator::CallProcedure(_) => {}
921 LogicalOperator::LoadData(_) => {}
923 }
924 Ok(())
925}
926
927fn resolve_count_param(
929 count: &mut crate::query::plan::CountExpr,
930 params: &QueryParams,
931) -> Result<()> {
932 use crate::query::plan::CountExpr;
933 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
934
935 if let CountExpr::Parameter(name) = count {
936 let value = params.get(name.as_str()).ok_or_else(|| {
937 Error::Query(QueryError::new(
938 QueryErrorKind::Semantic,
939 format!("Missing parameter for SKIP/LIMIT: ${name}"),
940 ))
941 })?;
942 let n = match value {
943 Value::Int64(i) if *i >= 0 => *i as usize,
944 Value::Int64(i) => {
945 return Err(Error::Query(QueryError::new(
946 QueryErrorKind::Semantic,
947 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
948 )));
949 }
950 other => {
951 return Err(Error::Query(QueryError::new(
952 QueryErrorKind::Semantic,
953 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
954 )));
955 }
956 };
957 *count = CountExpr::Literal(n);
958 }
959 Ok(())
960}
961
962fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
964 use crate::query::plan::LogicalExpression;
965
966 match expr {
967 LogicalExpression::Parameter(name) => {
968 if let Some(value) = params.get(name) {
969 *expr = LogicalExpression::Literal(value.clone());
970 } else {
971 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
972 }
973 }
974 LogicalExpression::Binary { left, right, .. } => {
975 substitute_in_expression(left, params)?;
976 substitute_in_expression(right, params)?;
977 }
978 LogicalExpression::Unary { operand, .. } => {
979 substitute_in_expression(operand, params)?;
980 }
981 LogicalExpression::FunctionCall { args, .. } => {
982 for arg in args {
983 substitute_in_expression(arg, params)?;
984 }
985 }
986 LogicalExpression::List(items) => {
987 for item in items {
988 substitute_in_expression(item, params)?;
989 }
990 }
991 LogicalExpression::Map(pairs) => {
992 for (_, value) in pairs {
993 substitute_in_expression(value, params)?;
994 }
995 }
996 LogicalExpression::IndexAccess { base, index } => {
997 substitute_in_expression(base, params)?;
998 substitute_in_expression(index, params)?;
999 }
1000 LogicalExpression::SliceAccess { base, start, end } => {
1001 substitute_in_expression(base, params)?;
1002 if let Some(s) = start {
1003 substitute_in_expression(s, params)?;
1004 }
1005 if let Some(e) = end {
1006 substitute_in_expression(e, params)?;
1007 }
1008 }
1009 LogicalExpression::Case {
1010 operand,
1011 when_clauses,
1012 else_clause,
1013 } => {
1014 if let Some(op) = operand {
1015 substitute_in_expression(op, params)?;
1016 }
1017 for (cond, result) in when_clauses {
1018 substitute_in_expression(cond, params)?;
1019 substitute_in_expression(result, params)?;
1020 }
1021 if let Some(el) = else_clause {
1022 substitute_in_expression(el, params)?;
1023 }
1024 }
1025 LogicalExpression::Property { .. }
1026 | LogicalExpression::Variable(_)
1027 | LogicalExpression::Literal(_)
1028 | LogicalExpression::Labels(_)
1029 | LogicalExpression::Type(_)
1030 | LogicalExpression::Id(_) => {}
1031 LogicalExpression::ListComprehension {
1032 list_expr,
1033 filter_expr,
1034 map_expr,
1035 ..
1036 } => {
1037 substitute_in_expression(list_expr, params)?;
1038 if let Some(filter) = filter_expr {
1039 substitute_in_expression(filter, params)?;
1040 }
1041 substitute_in_expression(map_expr, params)?;
1042 }
1043 LogicalExpression::ListPredicate {
1044 list_expr,
1045 predicate,
1046 ..
1047 } => {
1048 substitute_in_expression(list_expr, params)?;
1049 substitute_in_expression(predicate, params)?;
1050 }
1051 LogicalExpression::ExistsSubquery(_)
1052 | LogicalExpression::CountSubquery(_)
1053 | LogicalExpression::ValueSubquery(_) => {
1054 }
1056 LogicalExpression::PatternComprehension { projection, .. } => {
1057 substitute_in_expression(projection, params)?;
1058 }
1059 LogicalExpression::MapProjection { entries, .. } => {
1060 for entry in entries {
1061 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1062 substitute_in_expression(expr, params)?;
1063 }
1064 }
1065 }
1066 LogicalExpression::Reduce {
1067 initial,
1068 list,
1069 expression,
1070 ..
1071 } => {
1072 substitute_in_expression(initial, params)?;
1073 substitute_in_expression(list, params)?;
1074 substitute_in_expression(expression, params)?;
1075 }
1076 }
1077 Ok(())
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082 use super::*;
1083
1084 #[test]
1085 fn test_query_language_is_lpg() {
1086 #[cfg(feature = "gql")]
1087 assert!(QueryLanguage::Gql.is_lpg());
1088 #[cfg(feature = "cypher")]
1089 assert!(QueryLanguage::Cypher.is_lpg());
1090 #[cfg(feature = "sparql")]
1091 assert!(!QueryLanguage::Sparql.is_lpg());
1092 }
1093
1094 #[test]
1095 fn test_processor_creation() {
1096 let store = Arc::new(LpgStore::new().unwrap());
1097 let processor = QueryProcessor::for_lpg(store);
1098 assert!(processor.lpg_store().node_count() == 0);
1099 }
1100
1101 #[cfg(feature = "gql")]
1102 #[test]
1103 fn test_process_simple_gql() {
1104 let store = Arc::new(LpgStore::new().unwrap());
1105 store.create_node(&["Person"]);
1106 store.create_node(&["Person"]);
1107
1108 let processor = QueryProcessor::for_lpg(store);
1109 let result = processor
1110 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1111 .unwrap();
1112
1113 assert_eq!(result.row_count(), 2);
1114 assert_eq!(result.columns[0], "n");
1115 }
1116
1117 #[cfg(feature = "cypher")]
1118 #[test]
1119 fn test_process_simple_cypher() {
1120 let store = Arc::new(LpgStore::new().unwrap());
1121 store.create_node(&["Person"]);
1122
1123 let processor = QueryProcessor::for_lpg(store);
1124 let result = processor
1125 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1126 .unwrap();
1127
1128 assert_eq!(result.row_count(), 1);
1129 }
1130
1131 #[cfg(feature = "gql")]
1132 #[test]
1133 fn test_process_with_params() {
1134 let store = Arc::new(LpgStore::new().unwrap());
1135 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1136 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1137 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1138
1139 let processor = QueryProcessor::for_lpg(store);
1140
1141 let mut params = HashMap::new();
1143 params.insert("min_age".to_string(), Value::Int64(30));
1144
1145 let result = processor
1146 .process(
1147 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1148 QueryLanguage::Gql,
1149 Some(¶ms),
1150 )
1151 .unwrap();
1152
1153 assert_eq!(result.row_count(), 2);
1155 }
1156
1157 #[cfg(feature = "gql")]
1158 #[test]
1159 fn test_missing_param_error() {
1160 let store = Arc::new(LpgStore::new().unwrap());
1161 store.create_node(&["Person"]);
1162
1163 let processor = QueryProcessor::for_lpg(store);
1164
1165 let params: HashMap<String, Value> = HashMap::new();
1167 let result = processor.process(
1168 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1169 QueryLanguage::Gql,
1170 Some(¶ms),
1171 );
1172
1173 assert!(result.is_err());
1175 let err = result.unwrap_err();
1176 assert!(
1177 err.to_string().contains("Missing parameter"),
1178 "Expected 'Missing parameter' error, got: {}",
1179 err
1180 );
1181 }
1182
1183 #[cfg(feature = "gql")]
1184 #[test]
1185 fn test_params_in_filter_with_property() {
1186 let store = Arc::new(LpgStore::new().unwrap());
1188 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1189 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1190
1191 let processor = QueryProcessor::for_lpg(store);
1192
1193 let mut params = HashMap::new();
1194 params.insert("threshold".to_string(), Value::Int64(15));
1195
1196 let result = processor
1197 .process(
1198 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1199 QueryLanguage::Gql,
1200 Some(¶ms),
1201 )
1202 .unwrap();
1203
1204 assert_eq!(result.row_count(), 1);
1206 let row = &result.rows[0];
1207 assert_eq!(row[0], Value::Int64(20));
1208 }
1209
1210 #[cfg(feature = "gql")]
1211 #[test]
1212 fn test_params_in_multiple_where_conditions() {
1213 let store = Arc::new(LpgStore::new().unwrap());
1215 store.create_node_with_props(
1216 &["Person"],
1217 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1218 );
1219 store.create_node_with_props(
1220 &["Person"],
1221 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1222 );
1223 store.create_node_with_props(
1224 &["Person"],
1225 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1226 );
1227
1228 let processor = QueryProcessor::for_lpg(store);
1229
1230 let mut params = HashMap::new();
1231 params.insert("min_age".to_string(), Value::Int64(30));
1232 params.insert("min_score".to_string(), Value::Int64(75));
1233
1234 let result = processor
1235 .process(
1236 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1237 QueryLanguage::Gql,
1238 Some(¶ms),
1239 )
1240 .unwrap();
1241
1242 assert_eq!(result.row_count(), 1);
1244 }
1245
1246 #[cfg(feature = "gql")]
1247 #[test]
1248 fn test_params_with_in_list() {
1249 let store = Arc::new(LpgStore::new().unwrap());
1251 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1252 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1253 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1254
1255 let processor = QueryProcessor::for_lpg(store);
1256
1257 let mut params = HashMap::new();
1259 params.insert("target".to_string(), Value::String("active".into()));
1260
1261 let result = processor
1262 .process(
1263 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1264 QueryLanguage::Gql,
1265 Some(¶ms),
1266 )
1267 .unwrap();
1268
1269 assert_eq!(result.row_count(), 1);
1270 }
1271
1272 #[cfg(feature = "gql")]
1273 #[test]
1274 fn test_params_same_type_comparison() {
1275 let store = Arc::new(LpgStore::new().unwrap());
1277 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1278 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1279
1280 let processor = QueryProcessor::for_lpg(store);
1281
1282 let mut params = HashMap::new();
1284 params.insert("threshold".to_string(), Value::Int64(75));
1285
1286 let result = processor
1287 .process(
1288 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1289 QueryLanguage::Gql,
1290 Some(¶ms),
1291 )
1292 .unwrap();
1293
1294 assert_eq!(result.row_count(), 1);
1296 }
1297
1298 #[cfg(feature = "gql")]
1299 #[test]
1300 fn test_process_empty_result_has_columns() {
1301 let store = Arc::new(LpgStore::new().unwrap());
1303 let processor = QueryProcessor::for_lpg(store);
1306 let result = processor
1307 .process(
1308 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1309 QueryLanguage::Gql,
1310 None,
1311 )
1312 .unwrap();
1313
1314 assert_eq!(result.row_count(), 0);
1315 assert_eq!(result.columns.len(), 2);
1316 assert_eq!(result.columns[0], "name");
1317 assert_eq!(result.columns[1], "age");
1318 }
1319
1320 #[cfg(feature = "gql")]
1321 #[test]
1322 fn test_params_string_equality() {
1323 let store = Arc::new(LpgStore::new().unwrap());
1325 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1326 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1327 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1328
1329 let processor = QueryProcessor::for_lpg(store);
1330
1331 let mut params = HashMap::new();
1332 params.insert("target".to_string(), Value::String("beta".into()));
1333
1334 let result = processor
1335 .process(
1336 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1337 QueryLanguage::Gql,
1338 Some(¶ms),
1339 )
1340 .unwrap();
1341
1342 assert_eq!(result.row_count(), 1);
1343 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1344 }
1345}