1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStore, GraphStoreMut};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32 #[cfg(feature = "gql")]
34 Gql,
35 #[cfg(feature = "cypher")]
37 Cypher,
38 #[cfg(feature = "gremlin")]
40 Gremlin,
41 #[cfg(feature = "graphql")]
43 GraphQL,
44 #[cfg(feature = "sql-pgq")]
46 SqlPgq,
47 #[cfg(feature = "sparql")]
49 Sparql,
50 #[cfg(all(feature = "graphql", feature = "triple-store"))]
52 GraphQLRdf,
53}
54
55impl QueryLanguage {
56 #[must_use]
58 pub const fn is_lpg(&self) -> bool {
59 match self {
60 #[cfg(feature = "gql")]
61 Self::Gql => true,
62 #[cfg(feature = "cypher")]
63 Self::Cypher => true,
64 #[cfg(feature = "gremlin")]
65 Self::Gremlin => true,
66 #[cfg(feature = "graphql")]
67 Self::GraphQL => true,
68 #[cfg(feature = "sql-pgq")]
69 Self::SqlPgq => true,
70 #[cfg(feature = "sparql")]
71 Self::Sparql => false,
72 #[cfg(all(feature = "graphql", feature = "triple-store"))]
73 Self::GraphQLRdf => false,
74 #[allow(unreachable_patterns)]
75 _ => false,
76 }
77 }
78}
79
80pub type QueryParams = HashMap<String, Value>;
82
83pub struct QueryProcessor {
103 #[cfg(feature = "lpg")]
105 lpg_store: Arc<LpgStore>,
106 graph_store: Arc<dyn GraphStore>,
108 write_store: Option<Arc<dyn GraphStoreMut>>,
110 transaction_manager: Arc<TransactionManager>,
112 catalog: Arc<Catalog>,
114 optimizer: Optimizer,
116 transaction_context: Option<(EpochId, TransactionId)>,
118 #[cfg(feature = "triple-store")]
120 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
121}
122
123impl QueryProcessor {
124 #[cfg(feature = "lpg")]
126 #[must_use]
127 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
128 let optimizer = Optimizer::from_store(&store);
129 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
130 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
131 Self {
132 lpg_store: store,
133 graph_store,
134 write_store,
135 transaction_manager: Arc::new(TransactionManager::new()),
136 catalog: Arc::new(Catalog::new()),
137 optimizer,
138 transaction_context: None,
139 #[cfg(feature = "triple-store")]
140 rdf_store: None,
141 }
142 }
143
144 #[cfg(feature = "lpg")]
146 #[must_use]
147 pub fn for_lpg_with_transaction(
148 store: Arc<LpgStore>,
149 transaction_manager: Arc<TransactionManager>,
150 ) -> Self {
151 let optimizer = Optimizer::from_store(&store);
152 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
153 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
154 Self {
155 lpg_store: store,
156 graph_store,
157 write_store,
158 transaction_manager,
159 catalog: Arc::new(Catalog::new()),
160 optimizer,
161 transaction_context: None,
162 #[cfg(feature = "triple-store")]
163 rdf_store: None,
164 }
165 }
166
167 pub fn for_graph_store_with_transaction(
173 store: Arc<dyn GraphStoreMut>,
174 transaction_manager: Arc<TransactionManager>,
175 ) -> Result<Self> {
176 let optimizer = Optimizer::from_graph_store(&*store);
177 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
178 Ok(Self {
179 #[cfg(feature = "lpg")]
180 lpg_store: Arc::new(LpgStore::new()?),
181 graph_store: read_store,
182 write_store: Some(store),
183 transaction_manager,
184 catalog: Arc::new(Catalog::new()),
185 optimizer,
186 transaction_context: None,
187 #[cfg(feature = "triple-store")]
188 rdf_store: None,
189 })
190 }
191
192 pub fn for_stores_with_transaction(
198 read_store: Arc<dyn GraphStore>,
199 write_store: Option<Arc<dyn GraphStoreMut>>,
200 transaction_manager: Arc<TransactionManager>,
201 ) -> Result<Self> {
202 let optimizer = Optimizer::from_graph_store(&*read_store);
203 Ok(Self {
204 #[cfg(feature = "lpg")]
205 lpg_store: Arc::new(LpgStore::new()?),
206 graph_store: read_store,
207 write_store,
208 transaction_manager,
209 catalog: Arc::new(Catalog::new()),
210 optimizer,
211 transaction_context: None,
212 #[cfg(feature = "triple-store")]
213 rdf_store: None,
214 })
215 }
216
217 #[must_use]
221 pub fn with_transaction_context(
222 mut self,
223 viewing_epoch: EpochId,
224 transaction_id: TransactionId,
225 ) -> Self {
226 self.transaction_context = Some((viewing_epoch, transaction_id));
227 self
228 }
229
230 #[must_use]
232 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
233 self.catalog = catalog;
234 self
235 }
236
237 #[must_use]
239 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
240 self.optimizer = optimizer;
241 self
242 }
243
244 pub fn process(
264 &self,
265 query: &str,
266 language: QueryLanguage,
267 params: Option<&QueryParams>,
268 ) -> Result<QueryResult> {
269 if language.is_lpg() {
270 self.process_lpg(query, language, params)
271 } else {
272 #[cfg(feature = "triple-store")]
273 {
274 self.process_rdf(query, language, params)
275 }
276 #[cfg(not(feature = "triple-store"))]
277 {
278 Err(Error::Internal(
279 "RDF support not enabled. Compile with --features rdf".to_string(),
280 ))
281 }
282 }
283 }
284
285 fn process_lpg(
287 &self,
288 query: &str,
289 language: QueryLanguage,
290 params: Option<&QueryParams>,
291 ) -> Result<QueryResult> {
292 #[cfg(not(target_arch = "wasm32"))]
293 let start_time = std::time::Instant::now();
294
295 let mut logical_plan = self.translate_lpg(query, language)?;
297
298 let has_defaults = !logical_plan.default_params.is_empty();
300 if params.is_some() || has_defaults {
301 let merged = if has_defaults {
302 let mut merged = logical_plan.default_params.clone();
303 if let Some(params) = params {
304 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
305 }
306 merged
307 } else {
308 params.cloned().unwrap_or_default()
309 };
310 substitute_params(&mut logical_plan, &merged)?;
311 }
312
313 let mut binder = Binder::new();
315 let _binding_context = binder.bind(&logical_plan)?;
316
317 let optimized_plan = self.optimizer.optimize(logical_plan)?;
319
320 if optimized_plan.explain {
322 let mut plan = optimized_plan;
323 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
324 return Ok(explain_result(&plan));
325 }
326
327 let is_read_only =
331 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
332 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
333 Planner::with_context(
334 Arc::clone(&self.graph_store),
335 self.write_store.as_ref().map(Arc::clone),
336 Arc::clone(&self.transaction_manager),
337 Some(transaction_id),
338 epoch,
339 )
340 } else {
341 Planner::with_context(
342 Arc::clone(&self.graph_store),
343 self.write_store.as_ref().map(Arc::clone),
344 Arc::clone(&self.transaction_manager),
345 None,
346 self.transaction_manager.current_epoch(),
347 )
348 }
349 .with_read_only(is_read_only);
350 let mut physical_plan = planner.plan(&optimized_plan)?;
351
352 let executor = Executor::with_columns(physical_plan.columns.clone());
354 let mut result = executor.execute(physical_plan.operator.as_mut())?;
355
356 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
359 {
360 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
361 result.execution_time_ms = Some(elapsed_ms);
362 }
363 result.rows_scanned = Some(rows_scanned);
364
365 Ok(result)
366 }
367
368 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
370 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
371 match language {
372 #[cfg(feature = "gql")]
373 QueryLanguage::Gql => {
374 use crate::query::translators::gql;
375 gql::translate(query)
376 }
377 #[cfg(feature = "cypher")]
378 QueryLanguage::Cypher => {
379 use crate::query::translators::cypher;
380 cypher::translate(query)
381 }
382 #[cfg(feature = "gremlin")]
383 QueryLanguage::Gremlin => {
384 use crate::query::translators::gremlin;
385 gremlin::translate(query)
386 }
387 #[cfg(feature = "graphql")]
388 QueryLanguage::GraphQL => {
389 use crate::query::translators::graphql;
390 graphql::translate(query)
391 }
392 #[cfg(feature = "sql-pgq")]
393 QueryLanguage::SqlPgq => {
394 use crate::query::translators::sql_pgq;
395 sql_pgq::translate(query)
396 }
397 #[allow(unreachable_patterns)]
398 _ => Err(Error::Internal(format!(
399 "Language {:?} is not an LPG language",
400 language
401 ))),
402 }
403 }
404
405 #[cfg(feature = "lpg")]
407 #[must_use]
408 pub fn lpg_store(&self) -> &Arc<LpgStore> {
409 &self.lpg_store
410 }
411
412 #[must_use]
414 pub fn catalog(&self) -> &Arc<Catalog> {
415 &self.catalog
416 }
417
418 #[must_use]
420 pub fn optimizer(&self) -> &Optimizer {
421 &self.optimizer
422 }
423}
424
425impl QueryProcessor {
426 #[must_use]
428 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
429 &self.transaction_manager
430 }
431}
432
433#[cfg(feature = "triple-store")]
438impl QueryProcessor {
439 #[cfg(feature = "lpg")]
441 #[must_use]
442 pub fn with_rdf(
443 lpg_store: Arc<LpgStore>,
444 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
445 ) -> Self {
446 let optimizer = Optimizer::from_store(&lpg_store);
447 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
448 let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
449 Self {
450 lpg_store,
451 graph_store,
452 write_store,
453 transaction_manager: Arc::new(TransactionManager::new()),
454 catalog: Arc::new(Catalog::new()),
455 optimizer,
456 transaction_context: None,
457 rdf_store: Some(rdf_store),
458 }
459 }
460
461 #[must_use]
463 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
464 self.rdf_store.as_ref()
465 }
466
467 fn process_rdf(
469 &self,
470 query: &str,
471 language: QueryLanguage,
472 params: Option<&QueryParams>,
473 ) -> Result<QueryResult> {
474 use crate::query::planner::rdf::RdfPlanner;
475
476 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
477 Error::Internal("RDF store not configured for this processor".to_string())
478 })?;
479
480 let mut logical_plan = self.translate_rdf(query, language)?;
482
483 let has_defaults = !logical_plan.default_params.is_empty();
485 if params.is_some() || has_defaults {
486 let merged = if has_defaults {
487 let mut merged = logical_plan.default_params.clone();
488 if let Some(params) = params {
489 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
490 }
491 merged
492 } else {
493 params.cloned().unwrap_or_default()
494 };
495 substitute_params(&mut logical_plan, &merged)?;
496 }
497
498 let mut binder = Binder::new();
500 let _binding_context = binder.bind(&logical_plan)?;
501
502 let rdf_optimizer = {
504 let stats = rdf_store.get_or_collect_statistics();
505 Optimizer::from_rdf_statistics((*stats).clone())
506 };
507 let optimized_plan = rdf_optimizer.optimize(logical_plan)?;
508
509 if optimized_plan.explain {
512 let planner = RdfPlanner::new(Arc::clone(rdf_store));
513 let (_, entries) = planner.plan_profiled(&optimized_plan)?;
514 return Ok(physical_explain_result(&optimized_plan, entries));
515 }
516
517 if optimized_plan.profile {
519 let planner = RdfPlanner::new(Arc::clone(rdf_store));
520 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
521
522 let start = std::time::Instant::now();
523 let executor = Executor::with_columns(physical_plan.columns.clone());
524 let _result = executor.execute(physical_plan.operator.as_mut())?;
525 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
526
527 let tree = crate::query::profile::build_profile_tree(
528 &optimized_plan.root,
529 &mut entries.into_iter(),
530 );
531 return Ok(crate::query::profile::profile_result(&tree, elapsed_ms));
532 }
533
534 let planner = RdfPlanner::new(Arc::clone(rdf_store));
536 let mut physical_plan = planner.plan(&optimized_plan)?;
537
538 let executor = Executor::with_columns(physical_plan.columns.clone());
540 executor.execute(physical_plan.operator.as_mut())
541 }
542
543 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
545 match language {
546 #[cfg(feature = "sparql")]
547 QueryLanguage::Sparql => {
548 use crate::query::translators::sparql;
549 sparql::translate(query)
550 }
551 #[cfg(all(feature = "graphql", feature = "triple-store"))]
552 QueryLanguage::GraphQLRdf => {
553 use crate::query::translators::graphql_rdf;
554 graphql_rdf::translate(query, "http://example.org/")
556 }
557 _ => Err(Error::Internal(format!(
558 "Language {:?} is not an RDF language",
559 language
560 ))),
561 }
562 }
563}
564
565pub(crate) fn annotate_pushdown_hints(
570 op: &mut LogicalOperator,
571 store: &dyn grafeo_core::graph::GraphStore,
572) {
573 #[allow(clippy::wildcard_imports)]
574 use crate::query::plan::*;
575
576 match op {
577 LogicalOperator::Filter(filter) => {
578 annotate_pushdown_hints(&mut filter.input, store);
580
581 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
583 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
584 }
585 }
586 LogicalOperator::NodeScan(op) => {
587 if let Some(input) = &mut op.input {
588 annotate_pushdown_hints(input, store);
589 }
590 }
591 LogicalOperator::EdgeScan(op) => {
592 if let Some(input) = &mut op.input {
593 annotate_pushdown_hints(input, store);
594 }
595 }
596 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
597 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
598 LogicalOperator::Join(op) => {
599 annotate_pushdown_hints(&mut op.left, store);
600 annotate_pushdown_hints(&mut op.right, store);
601 }
602 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
603 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
604 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
605 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
606 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
607 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
608 LogicalOperator::Union(op) => {
609 for input in &mut op.inputs {
610 annotate_pushdown_hints(input, store);
611 }
612 }
613 LogicalOperator::Apply(op) => {
614 annotate_pushdown_hints(&mut op.input, store);
615 annotate_pushdown_hints(&mut op.subplan, store);
616 }
617 LogicalOperator::Otherwise(op) => {
618 annotate_pushdown_hints(&mut op.left, store);
619 annotate_pushdown_hints(&mut op.right, store);
620 }
621 _ => {}
622 }
623}
624
625fn infer_pushdown(
627 predicate: &LogicalExpression,
628 scan: &crate::query::plan::NodeScanOp,
629 store: &dyn grafeo_core::graph::GraphStore,
630) -> Option<crate::query::plan::PushdownHint> {
631 #[allow(clippy::wildcard_imports)]
632 use crate::query::plan::*;
633
634 match predicate {
635 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
637 if let Some(prop) = extract_property_name(left, &scan.variable)
638 .or_else(|| extract_property_name(right, &scan.variable))
639 {
640 if store.has_property_index(&prop) {
641 return Some(PushdownHint::IndexLookup { property: prop });
642 }
643 if scan.label.is_some() {
644 return Some(PushdownHint::LabelFirst);
645 }
646 }
647 None
648 }
649 LogicalExpression::Binary {
651 left,
652 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
653 right,
654 } => {
655 if let Some(prop) = extract_property_name(left, &scan.variable)
656 .or_else(|| extract_property_name(right, &scan.variable))
657 {
658 if store.has_property_index(&prop) {
659 return Some(PushdownHint::RangeScan { property: prop });
660 }
661 if scan.label.is_some() {
662 return Some(PushdownHint::LabelFirst);
663 }
664 }
665 None
666 }
667 LogicalExpression::Binary {
669 left,
670 op: BinaryOp::And,
671 ..
672 } => infer_pushdown(left, scan, store),
673 _ => {
674 if scan.label.is_some() {
676 Some(PushdownHint::LabelFirst)
677 } else {
678 None
679 }
680 }
681 }
682}
683
684fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
687 if let LogicalExpression::Property { variable, property } = expr
688 && variable == scan_var
689 {
690 Some(property.clone())
691 } else {
692 None
693 }
694}
695
696pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
698 let tree_text = plan.root.explain_tree();
699 QueryResult {
700 columns: vec!["plan".to_string()],
701 column_types: vec![grafeo_common::types::LogicalType::String],
702 rows: vec![vec![Value::String(tree_text.into())]],
703 execution_time_ms: None,
704 rows_scanned: None,
705 status_message: None,
706 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
707 }
708}
709
710pub(crate) fn physical_explain_result(
713 plan: &LogicalPlan,
714 entries: Vec<crate::query::profile::ProfileEntry>,
715) -> QueryResult {
716 let tree = crate::query::profile::build_profile_tree(&plan.root, &mut entries.into_iter());
717
718 let mut output = String::new();
719 format_physical_node(&mut output, &tree, 0);
720
721 QueryResult {
722 columns: vec!["plan".to_string()],
723 column_types: vec![grafeo_common::types::LogicalType::String],
724 rows: vec![vec![Value::String(output.into())]],
725 execution_time_ms: None,
726 rows_scanned: None,
727 status_message: None,
728 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
729 }
730}
731
732fn format_physical_node(out: &mut String, node: &crate::query::profile::ProfileNode, depth: usize) {
734 use std::fmt::Write;
735 let indent = " ".repeat(depth);
736 let _ = writeln!(out, "{indent}{} {}", node.name, node.label);
737 for child in &node.children {
738 format_physical_node(out, child, depth + 1);
739 }
740}
741
742pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
748 substitute_in_operator(&mut plan.root, params)
749}
750
751fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
753 #[allow(clippy::wildcard_imports)]
754 use crate::query::plan::*;
755
756 match op {
757 LogicalOperator::Filter(filter) => {
758 substitute_in_expression(&mut filter.predicate, params)?;
759 substitute_in_operator(&mut filter.input, params)?;
760 }
761 LogicalOperator::Return(ret) => {
762 for item in &mut ret.items {
763 substitute_in_expression(&mut item.expression, params)?;
764 }
765 substitute_in_operator(&mut ret.input, params)?;
766 }
767 LogicalOperator::Project(proj) => {
768 for p in &mut proj.projections {
769 substitute_in_expression(&mut p.expression, params)?;
770 }
771 substitute_in_operator(&mut proj.input, params)?;
772 }
773 LogicalOperator::NodeScan(scan) => {
774 if let Some(input) = &mut scan.input {
775 substitute_in_operator(input, params)?;
776 }
777 }
778 LogicalOperator::EdgeScan(scan) => {
779 if let Some(input) = &mut scan.input {
780 substitute_in_operator(input, params)?;
781 }
782 }
783 LogicalOperator::Expand(expand) => {
784 substitute_in_operator(&mut expand.input, params)?;
785 }
786 LogicalOperator::Join(join) => {
787 substitute_in_operator(&mut join.left, params)?;
788 substitute_in_operator(&mut join.right, params)?;
789 for cond in &mut join.conditions {
790 substitute_in_expression(&mut cond.left, params)?;
791 substitute_in_expression(&mut cond.right, params)?;
792 }
793 }
794 LogicalOperator::LeftJoin(join) => {
795 substitute_in_operator(&mut join.left, params)?;
796 substitute_in_operator(&mut join.right, params)?;
797 if let Some(cond) = &mut join.condition {
798 substitute_in_expression(cond, params)?;
799 }
800 }
801 LogicalOperator::Aggregate(agg) => {
802 for expr in &mut agg.group_by {
803 substitute_in_expression(expr, params)?;
804 }
805 for agg_expr in &mut agg.aggregates {
806 if let Some(expr) = &mut agg_expr.expression {
807 substitute_in_expression(expr, params)?;
808 }
809 }
810 substitute_in_operator(&mut agg.input, params)?;
811 }
812 LogicalOperator::Sort(sort) => {
813 for key in &mut sort.keys {
814 substitute_in_expression(&mut key.expression, params)?;
815 }
816 substitute_in_operator(&mut sort.input, params)?;
817 }
818 LogicalOperator::Limit(limit) => {
819 resolve_count_param(&mut limit.count, params)?;
820 substitute_in_operator(&mut limit.input, params)?;
821 }
822 LogicalOperator::Skip(skip) => {
823 resolve_count_param(&mut skip.count, params)?;
824 substitute_in_operator(&mut skip.input, params)?;
825 }
826 LogicalOperator::Distinct(distinct) => {
827 substitute_in_operator(&mut distinct.input, params)?;
828 }
829 LogicalOperator::CreateNode(create) => {
830 for (_, expr) in &mut create.properties {
831 substitute_in_expression(expr, params)?;
832 }
833 if let Some(input) = &mut create.input {
834 substitute_in_operator(input, params)?;
835 }
836 }
837 LogicalOperator::CreateEdge(create) => {
838 for (_, expr) in &mut create.properties {
839 substitute_in_expression(expr, params)?;
840 }
841 substitute_in_operator(&mut create.input, params)?;
842 }
843 LogicalOperator::DeleteNode(delete) => {
844 substitute_in_operator(&mut delete.input, params)?;
845 }
846 LogicalOperator::DeleteEdge(delete) => {
847 substitute_in_operator(&mut delete.input, params)?;
848 }
849 LogicalOperator::SetProperty(set) => {
850 for (_, expr) in &mut set.properties {
851 substitute_in_expression(expr, params)?;
852 }
853 substitute_in_operator(&mut set.input, params)?;
854 }
855 LogicalOperator::Union(union) => {
856 for input in &mut union.inputs {
857 substitute_in_operator(input, params)?;
858 }
859 }
860 LogicalOperator::AntiJoin(anti) => {
861 substitute_in_operator(&mut anti.left, params)?;
862 substitute_in_operator(&mut anti.right, params)?;
863 }
864 LogicalOperator::Bind(bind) => {
865 substitute_in_expression(&mut bind.expression, params)?;
866 substitute_in_operator(&mut bind.input, params)?;
867 }
868 LogicalOperator::TripleScan(scan) => {
869 if let Some(input) = &mut scan.input {
870 substitute_in_operator(input, params)?;
871 }
872 }
873 LogicalOperator::Unwind(unwind) => {
874 substitute_in_expression(&mut unwind.expression, params)?;
875 substitute_in_operator(&mut unwind.input, params)?;
876 }
877 LogicalOperator::MapCollect(mc) => {
878 substitute_in_operator(&mut mc.input, params)?;
879 }
880 LogicalOperator::Merge(merge) => {
881 for (_, expr) in &mut merge.match_properties {
882 substitute_in_expression(expr, params)?;
883 }
884 for (_, expr) in &mut merge.on_create {
885 substitute_in_expression(expr, params)?;
886 }
887 for (_, expr) in &mut merge.on_match {
888 substitute_in_expression(expr, params)?;
889 }
890 substitute_in_operator(&mut merge.input, params)?;
891 }
892 LogicalOperator::MergeRelationship(merge_rel) => {
893 for (_, expr) in &mut merge_rel.match_properties {
894 substitute_in_expression(expr, params)?;
895 }
896 for (_, expr) in &mut merge_rel.on_create {
897 substitute_in_expression(expr, params)?;
898 }
899 for (_, expr) in &mut merge_rel.on_match {
900 substitute_in_expression(expr, params)?;
901 }
902 substitute_in_operator(&mut merge_rel.input, params)?;
903 }
904 LogicalOperator::AddLabel(add_label) => {
905 substitute_in_operator(&mut add_label.input, params)?;
906 }
907 LogicalOperator::RemoveLabel(remove_label) => {
908 substitute_in_operator(&mut remove_label.input, params)?;
909 }
910 LogicalOperator::ShortestPath(sp) => {
911 substitute_in_operator(&mut sp.input, params)?;
912 }
913 LogicalOperator::InsertTriple(insert) => {
915 if let Some(ref mut input) = insert.input {
916 substitute_in_operator(input, params)?;
917 }
918 }
919 LogicalOperator::DeleteTriple(delete) => {
920 if let Some(ref mut input) = delete.input {
921 substitute_in_operator(input, params)?;
922 }
923 }
924 LogicalOperator::Modify(modify) => {
925 substitute_in_operator(&mut modify.where_clause, params)?;
926 }
927 LogicalOperator::ClearGraph(_)
928 | LogicalOperator::CreateGraph(_)
929 | LogicalOperator::DropGraph(_)
930 | LogicalOperator::LoadGraph(_)
931 | LogicalOperator::CopyGraph(_)
932 | LogicalOperator::MoveGraph(_)
933 | LogicalOperator::AddGraph(_) => {}
934 LogicalOperator::HorizontalAggregate(op) => {
935 substitute_in_operator(&mut op.input, params)?;
936 }
937 LogicalOperator::Empty => {}
938 LogicalOperator::VectorScan(scan) => {
939 substitute_in_expression(&mut scan.query_vector, params)?;
940 if let Some(ref mut input) = scan.input {
941 substitute_in_operator(input, params)?;
942 }
943 }
944 LogicalOperator::VectorJoin(join) => {
945 substitute_in_expression(&mut join.query_vector, params)?;
946 substitute_in_operator(&mut join.input, params)?;
947 }
948 LogicalOperator::Except(except) => {
949 substitute_in_operator(&mut except.left, params)?;
950 substitute_in_operator(&mut except.right, params)?;
951 }
952 LogicalOperator::Intersect(intersect) => {
953 substitute_in_operator(&mut intersect.left, params)?;
954 substitute_in_operator(&mut intersect.right, params)?;
955 }
956 LogicalOperator::Otherwise(otherwise) => {
957 substitute_in_operator(&mut otherwise.left, params)?;
958 substitute_in_operator(&mut otherwise.right, params)?;
959 }
960 LogicalOperator::Apply(apply) => {
961 substitute_in_operator(&mut apply.input, params)?;
962 substitute_in_operator(&mut apply.subplan, params)?;
963 }
964 LogicalOperator::ParameterScan(_) => {}
966 LogicalOperator::MultiWayJoin(mwj) => {
967 for input in &mut mwj.inputs {
968 substitute_in_operator(input, params)?;
969 }
970 for cond in &mut mwj.conditions {
971 substitute_in_expression(&mut cond.left, params)?;
972 substitute_in_expression(&mut cond.right, params)?;
973 }
974 }
975 LogicalOperator::CreatePropertyGraph(_) => {}
977 LogicalOperator::CallProcedure(_) => {}
979 LogicalOperator::LoadData(_) => {}
981 LogicalOperator::Construct(construct) => {
983 substitute_in_operator(&mut construct.input, params)?;
984 }
985 }
986 Ok(())
987}
988
989fn resolve_count_param(
991 count: &mut crate::query::plan::CountExpr,
992 params: &QueryParams,
993) -> Result<()> {
994 use crate::query::plan::CountExpr;
995 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
996
997 if let CountExpr::Parameter(name) = count {
998 let value = params.get(name.as_str()).ok_or_else(|| {
999 Error::Query(QueryError::new(
1000 QueryErrorKind::Semantic,
1001 format!("Missing parameter for SKIP/LIMIT: ${name}"),
1002 ))
1003 })?;
1004 let n = match value {
1005 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1007 Value::Int64(i) if *i >= 0 => *i as usize,
1008 Value::Int64(i) => {
1009 return Err(Error::Query(QueryError::new(
1010 QueryErrorKind::Semantic,
1011 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
1012 )));
1013 }
1014 other => {
1015 return Err(Error::Query(QueryError::new(
1016 QueryErrorKind::Semantic,
1017 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
1018 )));
1019 }
1020 };
1021 *count = CountExpr::Literal(n);
1022 }
1023 Ok(())
1024}
1025
1026fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
1028 use crate::query::plan::LogicalExpression;
1029
1030 match expr {
1031 LogicalExpression::Parameter(name) => {
1032 if let Some(value) = params.get(name) {
1033 *expr = LogicalExpression::Literal(value.clone());
1034 } else {
1035 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
1036 }
1037 }
1038 LogicalExpression::Binary { left, right, .. } => {
1039 substitute_in_expression(left, params)?;
1040 substitute_in_expression(right, params)?;
1041 }
1042 LogicalExpression::Unary { operand, .. } => {
1043 substitute_in_expression(operand, params)?;
1044 }
1045 LogicalExpression::FunctionCall { args, .. } => {
1046 for arg in args {
1047 substitute_in_expression(arg, params)?;
1048 }
1049 }
1050 LogicalExpression::List(items) => {
1051 for item in items {
1052 substitute_in_expression(item, params)?;
1053 }
1054 }
1055 LogicalExpression::Map(pairs) => {
1056 for (_, value) in pairs {
1057 substitute_in_expression(value, params)?;
1058 }
1059 }
1060 LogicalExpression::IndexAccess { base, index } => {
1061 substitute_in_expression(base, params)?;
1062 substitute_in_expression(index, params)?;
1063 }
1064 LogicalExpression::SliceAccess { base, start, end } => {
1065 substitute_in_expression(base, params)?;
1066 if let Some(s) = start {
1067 substitute_in_expression(s, params)?;
1068 }
1069 if let Some(e) = end {
1070 substitute_in_expression(e, params)?;
1071 }
1072 }
1073 LogicalExpression::Case {
1074 operand,
1075 when_clauses,
1076 else_clause,
1077 } => {
1078 if let Some(op) = operand {
1079 substitute_in_expression(op, params)?;
1080 }
1081 for (cond, result) in when_clauses {
1082 substitute_in_expression(cond, params)?;
1083 substitute_in_expression(result, params)?;
1084 }
1085 if let Some(el) = else_clause {
1086 substitute_in_expression(el, params)?;
1087 }
1088 }
1089 LogicalExpression::Property { .. }
1090 | LogicalExpression::Variable(_)
1091 | LogicalExpression::Literal(_)
1092 | LogicalExpression::Labels(_)
1093 | LogicalExpression::Type(_)
1094 | LogicalExpression::Id(_) => {}
1095 LogicalExpression::ListComprehension {
1096 list_expr,
1097 filter_expr,
1098 map_expr,
1099 ..
1100 } => {
1101 substitute_in_expression(list_expr, params)?;
1102 if let Some(filter) = filter_expr {
1103 substitute_in_expression(filter, params)?;
1104 }
1105 substitute_in_expression(map_expr, params)?;
1106 }
1107 LogicalExpression::ListPredicate {
1108 list_expr,
1109 predicate,
1110 ..
1111 } => {
1112 substitute_in_expression(list_expr, params)?;
1113 substitute_in_expression(predicate, params)?;
1114 }
1115 LogicalExpression::ExistsSubquery(subplan)
1116 | LogicalExpression::CountSubquery(subplan)
1117 | LogicalExpression::ValueSubquery(subplan) => {
1118 substitute_in_operator(subplan, params)?;
1119 }
1120 LogicalExpression::PatternComprehension { projection, .. } => {
1121 substitute_in_expression(projection, params)?;
1122 }
1123 LogicalExpression::MapProjection { entries, .. } => {
1124 for entry in entries {
1125 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1126 substitute_in_expression(expr, params)?;
1127 }
1128 }
1129 }
1130 LogicalExpression::Reduce {
1131 initial,
1132 list,
1133 expression,
1134 ..
1135 } => {
1136 substitute_in_expression(initial, params)?;
1137 substitute_in_expression(list, params)?;
1138 substitute_in_expression(expression, params)?;
1139 }
1140 }
1141 Ok(())
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146 use super::*;
1147
1148 #[test]
1149 fn test_query_language_is_lpg() {
1150 #[cfg(feature = "gql")]
1151 assert!(QueryLanguage::Gql.is_lpg());
1152 #[cfg(feature = "cypher")]
1153 assert!(QueryLanguage::Cypher.is_lpg());
1154 #[cfg(feature = "sparql")]
1155 assert!(!QueryLanguage::Sparql.is_lpg());
1156 }
1157
1158 #[test]
1159 fn test_processor_creation() {
1160 let store = Arc::new(LpgStore::new().unwrap());
1161 let processor = QueryProcessor::for_lpg(store);
1162 assert!(processor.lpg_store().node_count() == 0);
1163 }
1164
1165 #[cfg(feature = "gql")]
1166 #[test]
1167 fn test_process_simple_gql() {
1168 let store = Arc::new(LpgStore::new().unwrap());
1169 store.create_node(&["Person"]);
1170 store.create_node(&["Person"]);
1171
1172 let processor = QueryProcessor::for_lpg(store);
1173 let result = processor
1174 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1175 .unwrap();
1176
1177 assert_eq!(result.row_count(), 2);
1178 assert_eq!(result.columns[0], "n");
1179 }
1180
1181 #[cfg(feature = "cypher")]
1182 #[test]
1183 fn test_process_simple_cypher() {
1184 let store = Arc::new(LpgStore::new().unwrap());
1185 store.create_node(&["Person"]);
1186
1187 let processor = QueryProcessor::for_lpg(store);
1188 let result = processor
1189 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1190 .unwrap();
1191
1192 assert_eq!(result.row_count(), 1);
1193 }
1194
1195 #[cfg(feature = "gql")]
1196 #[test]
1197 fn test_process_with_params() {
1198 let store = Arc::new(LpgStore::new().unwrap());
1199 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1200 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1201 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1202
1203 let processor = QueryProcessor::for_lpg(store);
1204
1205 let mut params = HashMap::new();
1207 params.insert("min_age".to_string(), Value::Int64(30));
1208
1209 let result = processor
1210 .process(
1211 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1212 QueryLanguage::Gql,
1213 Some(¶ms),
1214 )
1215 .unwrap();
1216
1217 assert_eq!(result.row_count(), 2);
1219 }
1220
1221 #[cfg(feature = "gql")]
1222 #[test]
1223 fn test_missing_param_error() {
1224 let store = Arc::new(LpgStore::new().unwrap());
1225 store.create_node(&["Person"]);
1226
1227 let processor = QueryProcessor::for_lpg(store);
1228
1229 let params: HashMap<String, Value> = HashMap::new();
1231 let result = processor.process(
1232 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1233 QueryLanguage::Gql,
1234 Some(¶ms),
1235 );
1236
1237 assert!(result.is_err());
1239 let err = result.unwrap_err();
1240 assert!(
1241 err.to_string().contains("Missing parameter"),
1242 "Expected 'Missing parameter' error, got: {}",
1243 err
1244 );
1245 }
1246
1247 #[cfg(feature = "gql")]
1248 #[test]
1249 fn test_params_in_filter_with_property() {
1250 let store = Arc::new(LpgStore::new().unwrap());
1252 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1253 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1254
1255 let processor = QueryProcessor::for_lpg(store);
1256
1257 let mut params = HashMap::new();
1258 params.insert("threshold".to_string(), Value::Int64(15));
1259
1260 let result = processor
1261 .process(
1262 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1263 QueryLanguage::Gql,
1264 Some(¶ms),
1265 )
1266 .unwrap();
1267
1268 assert_eq!(result.row_count(), 1);
1270 let row = &result.rows[0];
1271 assert_eq!(row[0], Value::Int64(20));
1272 }
1273
1274 #[cfg(feature = "gql")]
1275 #[test]
1276 fn test_params_in_multiple_where_conditions() {
1277 let store = Arc::new(LpgStore::new().unwrap());
1279 store.create_node_with_props(
1280 &["Person"],
1281 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1282 );
1283 store.create_node_with_props(
1284 &["Person"],
1285 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1286 );
1287 store.create_node_with_props(
1288 &["Person"],
1289 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1290 );
1291
1292 let processor = QueryProcessor::for_lpg(store);
1293
1294 let mut params = HashMap::new();
1295 params.insert("min_age".to_string(), Value::Int64(30));
1296 params.insert("min_score".to_string(), Value::Int64(75));
1297
1298 let result = processor
1299 .process(
1300 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1301 QueryLanguage::Gql,
1302 Some(¶ms),
1303 )
1304 .unwrap();
1305
1306 assert_eq!(result.row_count(), 1);
1308 }
1309
1310 #[cfg(feature = "gql")]
1311 #[test]
1312 fn test_params_with_in_list() {
1313 let store = Arc::new(LpgStore::new().unwrap());
1315 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1316 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1317 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1318
1319 let processor = QueryProcessor::for_lpg(store);
1320
1321 let mut params = HashMap::new();
1323 params.insert("target".to_string(), Value::String("active".into()));
1324
1325 let result = processor
1326 .process(
1327 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1328 QueryLanguage::Gql,
1329 Some(¶ms),
1330 )
1331 .unwrap();
1332
1333 assert_eq!(result.row_count(), 1);
1334 }
1335
1336 #[cfg(feature = "gql")]
1337 #[test]
1338 fn test_params_same_type_comparison() {
1339 let store = Arc::new(LpgStore::new().unwrap());
1341 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1342 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1343
1344 let processor = QueryProcessor::for_lpg(store);
1345
1346 let mut params = HashMap::new();
1348 params.insert("threshold".to_string(), Value::Int64(75));
1349
1350 let result = processor
1351 .process(
1352 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1353 QueryLanguage::Gql,
1354 Some(¶ms),
1355 )
1356 .unwrap();
1357
1358 assert_eq!(result.row_count(), 1);
1360 }
1361
1362 #[cfg(feature = "gql")]
1363 #[test]
1364 fn test_process_empty_result_has_columns() {
1365 let store = Arc::new(LpgStore::new().unwrap());
1367 let processor = QueryProcessor::for_lpg(store);
1370 let result = processor
1371 .process(
1372 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1373 QueryLanguage::Gql,
1374 None,
1375 )
1376 .unwrap();
1377
1378 assert_eq!(result.row_count(), 0);
1379 assert_eq!(result.columns.len(), 2);
1380 assert_eq!(result.columns[0], "name");
1381 assert_eq!(result.columns[1], "age");
1382 }
1383
1384 #[cfg(feature = "gql")]
1385 #[test]
1386 fn test_params_string_equality() {
1387 let store = Arc::new(LpgStore::new().unwrap());
1389 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1390 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1391 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1392
1393 let processor = QueryProcessor::for_lpg(store);
1394
1395 let mut params = HashMap::new();
1396 params.insert("target".to_string(), Value::String("beta".into()));
1397
1398 let result = processor
1399 .process(
1400 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1401 QueryLanguage::Gql,
1402 Some(¶ms),
1403 )
1404 .unwrap();
1405
1406 assert_eq!(result.row_count(), 1);
1407 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1408 }
1409
1410 #[cfg(feature = "cypher")]
1411 #[test]
1412 fn test_params_in_exists_subquery() {
1413 let store = Arc::new(LpgStore::new().unwrap());
1416 let alix =
1417 store.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
1418 let gus =
1419 store.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
1420 let _jules =
1421 store.create_node_with_props(&["Person"], [("name", Value::String("Jules".into()))]);
1422
1423 store.create_edge(alix, gus, "FOLLOWS");
1425
1426 let processor = QueryProcessor::for_lpg(store);
1427
1428 let mut params = HashMap::new();
1430 params.insert("viewer".to_string(), Value::String("Alix".into()));
1431
1432 let result = processor
1433 .process(
1434 "MATCH (p:Person) \
1435 WHERE p.name <> $viewer \
1436 AND NOT EXISTS { MATCH (v:Person)-[:FOLLOWS]->(p) WHERE v.name = $viewer } \
1437 RETURN p.name ORDER BY p.name",
1438 QueryLanguage::Cypher,
1439 Some(¶ms),
1440 )
1441 .unwrap();
1442
1443 assert_eq!(result.row_count(), 1);
1445 assert_eq!(result.rows[0][0], Value::String("Jules".into()));
1446 }
1447}