1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15#[cfg(feature = "lpg")]
16use grafeo_core::graph::lpg::LpgStore;
17use grafeo_core::graph::{GraphStoreMut, GraphStoreSearch};
18
19use crate::catalog::Catalog;
20use crate::database::QueryResult;
21use crate::query::binder::Binder;
22use crate::query::executor::Executor;
23use crate::query::optimizer::Optimizer;
24use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
25use crate::query::planner::Planner;
26use crate::transaction::TransactionManager;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30#[non_exhaustive]
31pub enum QueryLanguage {
32 #[cfg(feature = "gql")]
34 Gql,
35 #[cfg(feature = "cypher")]
37 Cypher,
38 #[cfg(feature = "gremlin")]
40 Gremlin,
41 #[cfg(feature = "graphql")]
43 GraphQL,
44 #[cfg(feature = "sql-pgq")]
46 SqlPgq,
47 #[cfg(feature = "sparql")]
49 Sparql,
50 #[cfg(all(feature = "graphql", feature = "triple-store"))]
52 GraphQLRdf,
53}
54
55impl QueryLanguage {
56 #[must_use]
58 pub const fn is_lpg(&self) -> bool {
59 match self {
60 #[cfg(feature = "gql")]
61 Self::Gql => true,
62 #[cfg(feature = "cypher")]
63 Self::Cypher => true,
64 #[cfg(feature = "gremlin")]
65 Self::Gremlin => true,
66 #[cfg(feature = "graphql")]
67 Self::GraphQL => true,
68 #[cfg(feature = "sql-pgq")]
69 Self::SqlPgq => true,
70 #[cfg(feature = "sparql")]
71 Self::Sparql => false,
72 #[cfg(all(feature = "graphql", feature = "triple-store"))]
73 Self::GraphQLRdf => false,
74 #[allow(unreachable_patterns)]
75 _ => false,
76 }
77 }
78}
79
80pub type QueryParams = HashMap<String, Value>;
82
83pub struct QueryProcessor {
103 #[cfg(feature = "lpg")]
105 lpg_store: Arc<LpgStore>,
106 graph_store: Arc<dyn GraphStoreSearch>,
108 write_store: Option<Arc<dyn GraphStoreMut>>,
110 transaction_manager: Arc<TransactionManager>,
112 catalog: Arc<Catalog>,
114 optimizer: Optimizer,
116 transaction_context: Option<(EpochId, TransactionId)>,
118 #[cfg(feature = "triple-store")]
120 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
121}
122
123impl QueryProcessor {
124 #[cfg(feature = "lpg")]
126 #[must_use]
127 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
128 let optimizer = Optimizer::from_store(&store);
129 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
130 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
131 Self {
132 lpg_store: store,
133 graph_store,
134 write_store,
135 transaction_manager: Arc::new(TransactionManager::new()),
136 catalog: Arc::new(Catalog::new()),
137 optimizer,
138 transaction_context: None,
139 #[cfg(feature = "triple-store")]
140 rdf_store: None,
141 }
142 }
143
144 #[cfg(feature = "lpg")]
146 #[must_use]
147 pub fn for_lpg_with_transaction(
148 store: Arc<LpgStore>,
149 transaction_manager: Arc<TransactionManager>,
150 ) -> Self {
151 let optimizer = Optimizer::from_store(&store);
152 let graph_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
153 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
154 Self {
155 lpg_store: store,
156 graph_store,
157 write_store,
158 transaction_manager,
159 catalog: Arc::new(Catalog::new()),
160 optimizer,
161 transaction_context: None,
162 #[cfg(feature = "triple-store")]
163 rdf_store: None,
164 }
165 }
166
167 pub fn for_graph_store_with_transaction(
173 store: Arc<dyn GraphStoreMut>,
174 transaction_manager: Arc<TransactionManager>,
175 ) -> Result<Self> {
176 let optimizer = Optimizer::from_graph_store(&*store);
177 let read_store = Arc::clone(&store) as Arc<dyn GraphStoreSearch>;
178 Ok(Self {
179 #[cfg(feature = "lpg")]
180 lpg_store: Arc::new(LpgStore::new()?),
181 graph_store: read_store,
182 write_store: Some(store),
183 transaction_manager,
184 catalog: Arc::new(Catalog::new()),
185 optimizer,
186 transaction_context: None,
187 #[cfg(feature = "triple-store")]
188 rdf_store: None,
189 })
190 }
191
192 pub fn for_stores_with_transaction(
198 read_store: Arc<dyn GraphStoreSearch>,
199 write_store: Option<Arc<dyn GraphStoreMut>>,
200 transaction_manager: Arc<TransactionManager>,
201 ) -> Result<Self> {
202 let optimizer = Optimizer::from_graph_store(&*read_store);
203 Ok(Self {
204 #[cfg(feature = "lpg")]
205 lpg_store: Arc::new(LpgStore::new()?),
206 graph_store: read_store,
207 write_store,
208 transaction_manager,
209 catalog: Arc::new(Catalog::new()),
210 optimizer,
211 transaction_context: None,
212 #[cfg(feature = "triple-store")]
213 rdf_store: None,
214 })
215 }
216
217 #[must_use]
221 pub fn with_transaction_context(
222 mut self,
223 viewing_epoch: EpochId,
224 transaction_id: TransactionId,
225 ) -> Self {
226 self.transaction_context = Some((viewing_epoch, transaction_id));
227 self
228 }
229
230 #[must_use]
232 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
233 self.catalog = catalog;
234 self
235 }
236
237 #[must_use]
239 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
240 self.optimizer = optimizer;
241 self
242 }
243
244 pub fn process(
264 &self,
265 query: &str,
266 language: QueryLanguage,
267 params: Option<&QueryParams>,
268 ) -> Result<QueryResult> {
269 if language.is_lpg() {
270 self.process_lpg(query, language, params)
271 } else {
272 #[cfg(feature = "triple-store")]
273 {
274 self.process_rdf(query, language, params)
275 }
276 #[cfg(not(feature = "triple-store"))]
277 {
278 Err(Error::Internal(
279 "RDF support not enabled. Compile with --features rdf".to_string(),
280 ))
281 }
282 }
283 }
284
285 fn process_lpg(
287 &self,
288 query: &str,
289 language: QueryLanguage,
290 params: Option<&QueryParams>,
291 ) -> Result<QueryResult> {
292 #[cfg(not(target_arch = "wasm32"))]
293 let start_time = std::time::Instant::now();
294
295 let mut logical_plan = self.translate_lpg(query, language)?;
297
298 let has_defaults = !logical_plan.default_params.is_empty();
300 if params.is_some() || has_defaults {
301 let merged = if has_defaults {
302 let mut merged = logical_plan.default_params.clone();
303 if let Some(params) = params {
304 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
305 }
306 merged
307 } else {
308 params.cloned().unwrap_or_default()
309 };
310 substitute_params(&mut logical_plan, &merged)?;
311 }
312
313 let mut binder = Binder::new();
315 let _binding_context = binder.bind(&logical_plan)?;
316
317 let optimized_plan = self.optimizer.optimize(logical_plan)?;
319
320 if optimized_plan.explain {
322 let mut plan = optimized_plan;
323 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
324 return Ok(explain_result(&plan));
325 }
326
327 let is_read_only =
331 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
332 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
333 Planner::with_context(
334 Arc::clone(&self.graph_store),
335 self.write_store.as_ref().map(Arc::clone),
336 Arc::clone(&self.transaction_manager),
337 Some(transaction_id),
338 epoch,
339 )
340 } else {
341 Planner::with_context(
342 Arc::clone(&self.graph_store),
343 self.write_store.as_ref().map(Arc::clone),
344 Arc::clone(&self.transaction_manager),
345 None,
346 self.transaction_manager.current_epoch(),
347 )
348 }
349 .with_read_only(is_read_only);
350 let mut physical_plan = planner.plan(&optimized_plan)?;
351
352 let executor = Executor::with_columns(physical_plan.columns.clone());
354 let mut result = executor.execute(physical_plan.operator.as_mut())?;
355
356 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
359 {
360 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
361 result.execution_time_ms = Some(elapsed_ms);
362 }
363 result.rows_scanned = Some(rows_scanned);
364
365 Ok(result)
366 }
367
368 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
370 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
371 match language {
372 #[cfg(feature = "gql")]
373 QueryLanguage::Gql => {
374 use crate::query::translators::gql;
375 gql::translate(query)
376 }
377 #[cfg(feature = "cypher")]
378 QueryLanguage::Cypher => {
379 use crate::query::translators::cypher;
380 cypher::translate(query)
381 }
382 #[cfg(feature = "gremlin")]
383 QueryLanguage::Gremlin => {
384 use crate::query::translators::gremlin;
385 gremlin::translate(query)
386 }
387 #[cfg(feature = "graphql")]
388 QueryLanguage::GraphQL => {
389 use crate::query::translators::graphql;
390 graphql::translate(query)
391 }
392 #[cfg(feature = "sql-pgq")]
393 QueryLanguage::SqlPgq => {
394 use crate::query::translators::sql_pgq;
395 sql_pgq::translate(query)
396 }
397 #[allow(unreachable_patterns)]
398 _ => Err(Error::Internal(format!(
399 "Language {:?} is not an LPG language",
400 language
401 ))),
402 }
403 }
404
405 #[cfg(feature = "lpg")]
407 #[must_use]
408 pub fn lpg_store(&self) -> &Arc<LpgStore> {
409 &self.lpg_store
410 }
411
412 #[must_use]
414 pub fn catalog(&self) -> &Arc<Catalog> {
415 &self.catalog
416 }
417
418 #[must_use]
420 pub fn optimizer(&self) -> &Optimizer {
421 &self.optimizer
422 }
423}
424
425impl QueryProcessor {
426 #[must_use]
428 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
429 &self.transaction_manager
430 }
431}
432
433#[cfg(feature = "triple-store")]
438impl QueryProcessor {
439 #[cfg(feature = "lpg")]
441 #[must_use]
442 pub fn with_rdf(
443 lpg_store: Arc<LpgStore>,
444 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
445 ) -> Self {
446 let optimizer = Optimizer::from_store(&lpg_store);
447 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStoreSearch>;
448 let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
449 Self {
450 lpg_store,
451 graph_store,
452 write_store,
453 transaction_manager: Arc::new(TransactionManager::new()),
454 catalog: Arc::new(Catalog::new()),
455 optimizer,
456 transaction_context: None,
457 rdf_store: Some(rdf_store),
458 }
459 }
460
461 #[must_use]
463 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
464 self.rdf_store.as_ref()
465 }
466
467 fn process_rdf(
469 &self,
470 query: &str,
471 language: QueryLanguage,
472 params: Option<&QueryParams>,
473 ) -> Result<QueryResult> {
474 use crate::query::planner::rdf::RdfPlanner;
475
476 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
477 Error::Internal("RDF store not configured for this processor".to_string())
478 })?;
479
480 let mut logical_plan = self.translate_rdf(query, language)?;
482
483 let has_defaults = !logical_plan.default_params.is_empty();
485 if params.is_some() || has_defaults {
486 let merged = if has_defaults {
487 let mut merged = logical_plan.default_params.clone();
488 if let Some(params) = params {
489 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
490 }
491 merged
492 } else {
493 params.cloned().unwrap_or_default()
494 };
495 substitute_params(&mut logical_plan, &merged)?;
496 }
497
498 let mut binder = Binder::new();
500 let _binding_context = binder.bind(&logical_plan)?;
501
502 let rdf_optimizer = {
504 let stats = rdf_store.get_or_collect_statistics();
505 Optimizer::from_rdf_statistics((*stats).clone())
506 };
507 let optimized_plan = rdf_optimizer.optimize(logical_plan)?;
508
509 if optimized_plan.explain {
512 let planner = RdfPlanner::new(Arc::clone(rdf_store));
513 let (_, entries) = planner.plan_profiled(&optimized_plan)?;
514 return Ok(physical_explain_result(&optimized_plan, entries));
515 }
516
517 if optimized_plan.profile {
519 let planner = RdfPlanner::new(Arc::clone(rdf_store));
520 let (mut physical_plan, entries) = planner.plan_profiled(&optimized_plan)?;
521
522 let start = std::time::Instant::now();
523 let executor = Executor::with_columns(physical_plan.columns.clone());
524 let _result = executor.execute(physical_plan.operator.as_mut())?;
525 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
526
527 let tree = crate::query::profile::build_profile_tree(
528 &optimized_plan.root,
529 &mut entries.into_iter(),
530 );
531 return Ok(crate::query::profile::profile_result(&tree, elapsed_ms));
532 }
533
534 let planner = RdfPlanner::new(Arc::clone(rdf_store));
536 let mut physical_plan = planner.plan(&optimized_plan)?;
537
538 let executor = Executor::with_columns(physical_plan.columns.clone());
540 executor.execute(physical_plan.operator.as_mut())
541 }
542
543 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
545 match language {
546 #[cfg(feature = "sparql")]
547 QueryLanguage::Sparql => {
548 use crate::query::translators::sparql;
549 sparql::translate(query)
550 }
551 #[cfg(all(feature = "graphql", feature = "triple-store"))]
552 QueryLanguage::GraphQLRdf => {
553 use crate::query::translators::graphql_rdf;
554 graphql_rdf::translate(query, "http://example.org/")
556 }
557 _ => Err(Error::Internal(format!(
558 "Language {:?} is not an RDF language",
559 language
560 ))),
561 }
562 }
563}
564
565pub(crate) fn annotate_pushdown_hints(
570 op: &mut LogicalOperator,
571 store: &dyn grafeo_core::graph::GraphStore,
572) {
573 #[allow(clippy::wildcard_imports)]
574 use crate::query::plan::*;
575
576 match op {
577 LogicalOperator::Filter(filter) => {
578 annotate_pushdown_hints(&mut filter.input, store);
580
581 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
583 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
584 }
585 }
586 LogicalOperator::NodeScan(op) => {
587 if let Some(input) = &mut op.input {
588 annotate_pushdown_hints(input, store);
589 }
590 }
591 LogicalOperator::EdgeScan(op) => {
592 if let Some(input) = &mut op.input {
593 annotate_pushdown_hints(input, store);
594 }
595 }
596 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
597 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
598 LogicalOperator::Join(op) => {
599 annotate_pushdown_hints(&mut op.left, store);
600 annotate_pushdown_hints(&mut op.right, store);
601 }
602 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
603 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
604 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
605 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
606 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
607 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
608 LogicalOperator::Union(op) => {
609 for input in &mut op.inputs {
610 annotate_pushdown_hints(input, store);
611 }
612 }
613 LogicalOperator::Apply(op) => {
614 annotate_pushdown_hints(&mut op.input, store);
615 annotate_pushdown_hints(&mut op.subplan, store);
616 }
617 LogicalOperator::Otherwise(op) => {
618 annotate_pushdown_hints(&mut op.left, store);
619 annotate_pushdown_hints(&mut op.right, store);
620 }
621 _ => {}
622 }
623}
624
625fn infer_pushdown(
627 predicate: &LogicalExpression,
628 scan: &crate::query::plan::NodeScanOp,
629 store: &dyn grafeo_core::graph::GraphStore,
630) -> Option<crate::query::plan::PushdownHint> {
631 #[allow(clippy::wildcard_imports)]
632 use crate::query::plan::*;
633
634 match predicate {
635 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
637 if let Some(prop) = extract_property_name(left, &scan.variable)
638 .or_else(|| extract_property_name(right, &scan.variable))
639 {
640 if store.has_property_index(&prop) {
641 return Some(PushdownHint::IndexLookup { property: prop });
642 }
643 if scan.label.is_some() {
644 return Some(PushdownHint::LabelFirst);
645 }
646 }
647 None
648 }
649 LogicalExpression::Binary {
651 left,
652 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
653 right,
654 } => {
655 if let Some(prop) = extract_property_name(left, &scan.variable)
656 .or_else(|| extract_property_name(right, &scan.variable))
657 {
658 if store.has_property_index(&prop) {
659 return Some(PushdownHint::RangeScan { property: prop });
660 }
661 if scan.label.is_some() {
662 return Some(PushdownHint::LabelFirst);
663 }
664 }
665 None
666 }
667 LogicalExpression::Binary {
669 left,
670 op: BinaryOp::And,
671 ..
672 } => infer_pushdown(left, scan, store),
673 _ => {
674 if scan.label.is_some() {
676 Some(PushdownHint::LabelFirst)
677 } else {
678 None
679 }
680 }
681 }
682}
683
684fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
687 if let LogicalExpression::Property { variable, property } = expr
688 && variable == scan_var
689 {
690 Some(property.clone())
691 } else {
692 None
693 }
694}
695
696pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
698 let tree_text = plan.root.explain_tree();
699 QueryResult {
700 columns: vec!["plan".to_string()],
701 column_types: vec![grafeo_common::types::LogicalType::String],
702 rows: vec![vec![Value::String(tree_text.into())]],
703 execution_time_ms: None,
704 rows_scanned: None,
705 status_message: None,
706 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
707 }
708}
709
710#[cfg(feature = "triple-store")]
713pub(crate) fn physical_explain_result(
714 plan: &LogicalPlan,
715 entries: Vec<crate::query::profile::ProfileEntry>,
716) -> QueryResult {
717 let tree = crate::query::profile::build_profile_tree(&plan.root, &mut entries.into_iter());
718
719 let mut output = String::new();
720 format_physical_node(&mut output, &tree, 0);
721
722 QueryResult {
723 columns: vec!["plan".to_string()],
724 column_types: vec![grafeo_common::types::LogicalType::String],
725 rows: vec![vec![Value::String(output.into())]],
726 execution_time_ms: None,
727 rows_scanned: None,
728 status_message: None,
729 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
730 }
731}
732
733#[cfg(feature = "triple-store")]
735fn format_physical_node(out: &mut String, node: &crate::query::profile::ProfileNode, depth: usize) {
736 use std::fmt::Write;
737 let indent = " ".repeat(depth);
738 let _ = writeln!(out, "{indent}{} {}", node.name, node.label);
739 for child in &node.children {
740 format_physical_node(out, child, depth + 1);
741 }
742}
743
744pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
750 substitute_in_operator(&mut plan.root, params)
751}
752
753fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
755 #[allow(clippy::wildcard_imports)]
756 use crate::query::plan::*;
757
758 match op {
759 LogicalOperator::Filter(filter) => {
760 substitute_in_expression(&mut filter.predicate, params)?;
761 substitute_in_operator(&mut filter.input, params)?;
762 }
763 LogicalOperator::Return(ret) => {
764 for item in &mut ret.items {
765 substitute_in_expression(&mut item.expression, params)?;
766 }
767 substitute_in_operator(&mut ret.input, params)?;
768 }
769 LogicalOperator::Project(proj) => {
770 for p in &mut proj.projections {
771 substitute_in_expression(&mut p.expression, params)?;
772 }
773 substitute_in_operator(&mut proj.input, params)?;
774 }
775 LogicalOperator::NodeScan(scan) => {
776 if let Some(input) = &mut scan.input {
777 substitute_in_operator(input, params)?;
778 }
779 }
780 LogicalOperator::EdgeScan(scan) => {
781 if let Some(input) = &mut scan.input {
782 substitute_in_operator(input, params)?;
783 }
784 }
785 LogicalOperator::Expand(expand) => {
786 substitute_in_operator(&mut expand.input, params)?;
787 }
788 LogicalOperator::Join(join) => {
789 substitute_in_operator(&mut join.left, params)?;
790 substitute_in_operator(&mut join.right, params)?;
791 for cond in &mut join.conditions {
792 substitute_in_expression(&mut cond.left, params)?;
793 substitute_in_expression(&mut cond.right, params)?;
794 }
795 }
796 LogicalOperator::LeftJoin(join) => {
797 substitute_in_operator(&mut join.left, params)?;
798 substitute_in_operator(&mut join.right, params)?;
799 if let Some(cond) = &mut join.condition {
800 substitute_in_expression(cond, params)?;
801 }
802 }
803 LogicalOperator::Aggregate(agg) => {
804 for expr in &mut agg.group_by {
805 substitute_in_expression(expr, params)?;
806 }
807 for agg_expr in &mut agg.aggregates {
808 if let Some(expr) = &mut agg_expr.expression {
809 substitute_in_expression(expr, params)?;
810 }
811 }
812 substitute_in_operator(&mut agg.input, params)?;
813 }
814 LogicalOperator::Sort(sort) => {
815 for key in &mut sort.keys {
816 substitute_in_expression(&mut key.expression, params)?;
817 }
818 substitute_in_operator(&mut sort.input, params)?;
819 }
820 LogicalOperator::Limit(limit) => {
821 resolve_count_param(&mut limit.count, params)?;
822 substitute_in_operator(&mut limit.input, params)?;
823 }
824 LogicalOperator::Skip(skip) => {
825 resolve_count_param(&mut skip.count, params)?;
826 substitute_in_operator(&mut skip.input, params)?;
827 }
828 LogicalOperator::Distinct(distinct) => {
829 substitute_in_operator(&mut distinct.input, params)?;
830 }
831 LogicalOperator::CreateNode(create) => {
832 for (_, expr) in &mut create.properties {
833 substitute_in_expression(expr, params)?;
834 }
835 if let Some(input) = &mut create.input {
836 substitute_in_operator(input, params)?;
837 }
838 }
839 LogicalOperator::CreateEdge(create) => {
840 for (_, expr) in &mut create.properties {
841 substitute_in_expression(expr, params)?;
842 }
843 substitute_in_operator(&mut create.input, params)?;
844 }
845 LogicalOperator::DeleteNode(delete) => {
846 substitute_in_operator(&mut delete.input, params)?;
847 }
848 LogicalOperator::DeleteEdge(delete) => {
849 substitute_in_operator(&mut delete.input, params)?;
850 }
851 LogicalOperator::SetProperty(set) => {
852 for (_, expr) in &mut set.properties {
853 substitute_in_expression(expr, params)?;
854 }
855 substitute_in_operator(&mut set.input, params)?;
856 }
857 LogicalOperator::Union(union) => {
858 for input in &mut union.inputs {
859 substitute_in_operator(input, params)?;
860 }
861 }
862 LogicalOperator::AntiJoin(anti) => {
863 substitute_in_operator(&mut anti.left, params)?;
864 substitute_in_operator(&mut anti.right, params)?;
865 }
866 LogicalOperator::Bind(bind) => {
867 substitute_in_expression(&mut bind.expression, params)?;
868 substitute_in_operator(&mut bind.input, params)?;
869 }
870 LogicalOperator::TripleScan(scan) => {
871 if let Some(input) = &mut scan.input {
872 substitute_in_operator(input, params)?;
873 }
874 }
875 LogicalOperator::Unwind(unwind) => {
876 substitute_in_expression(&mut unwind.expression, params)?;
877 substitute_in_operator(&mut unwind.input, params)?;
878 }
879 LogicalOperator::MapCollect(mc) => {
880 substitute_in_operator(&mut mc.input, params)?;
881 }
882 LogicalOperator::Merge(merge) => {
883 for (_, expr) in &mut merge.match_properties {
884 substitute_in_expression(expr, params)?;
885 }
886 for (_, expr) in &mut merge.on_create {
887 substitute_in_expression(expr, params)?;
888 }
889 for (_, expr) in &mut merge.on_match {
890 substitute_in_expression(expr, params)?;
891 }
892 substitute_in_operator(&mut merge.input, params)?;
893 }
894 LogicalOperator::MergeRelationship(merge_rel) => {
895 for (_, expr) in &mut merge_rel.match_properties {
896 substitute_in_expression(expr, params)?;
897 }
898 for (_, expr) in &mut merge_rel.on_create {
899 substitute_in_expression(expr, params)?;
900 }
901 for (_, expr) in &mut merge_rel.on_match {
902 substitute_in_expression(expr, params)?;
903 }
904 substitute_in_operator(&mut merge_rel.input, params)?;
905 }
906 LogicalOperator::AddLabel(add_label) => {
907 substitute_in_operator(&mut add_label.input, params)?;
908 }
909 LogicalOperator::RemoveLabel(remove_label) => {
910 substitute_in_operator(&mut remove_label.input, params)?;
911 }
912 LogicalOperator::ShortestPath(sp) => {
913 substitute_in_operator(&mut sp.input, params)?;
914 }
915 LogicalOperator::InsertTriple(insert) => {
917 if let Some(ref mut input) = insert.input {
918 substitute_in_operator(input, params)?;
919 }
920 }
921 LogicalOperator::DeleteTriple(delete) => {
922 if let Some(ref mut input) = delete.input {
923 substitute_in_operator(input, params)?;
924 }
925 }
926 LogicalOperator::Modify(modify) => {
927 substitute_in_operator(&mut modify.where_clause, params)?;
928 }
929 LogicalOperator::ClearGraph(_)
930 | LogicalOperator::CreateGraph(_)
931 | LogicalOperator::DropGraph(_)
932 | LogicalOperator::LoadGraph(_)
933 | LogicalOperator::CopyGraph(_)
934 | LogicalOperator::MoveGraph(_)
935 | LogicalOperator::AddGraph(_) => {}
936 LogicalOperator::HorizontalAggregate(op) => {
937 substitute_in_operator(&mut op.input, params)?;
938 }
939 LogicalOperator::Empty => {}
940 LogicalOperator::VectorScan(scan) => {
941 substitute_in_expression(&mut scan.query_vector, params)?;
942 if let Some(ref mut input) = scan.input {
943 substitute_in_operator(input, params)?;
944 }
945 }
946 LogicalOperator::VectorJoin(join) => {
947 substitute_in_expression(&mut join.query_vector, params)?;
948 substitute_in_operator(&mut join.input, params)?;
949 }
950 LogicalOperator::TextScan(scan) => {
951 substitute_in_expression(&mut scan.query, params)?;
952 }
953 LogicalOperator::Except(except) => {
954 substitute_in_operator(&mut except.left, params)?;
955 substitute_in_operator(&mut except.right, params)?;
956 }
957 LogicalOperator::Intersect(intersect) => {
958 substitute_in_operator(&mut intersect.left, params)?;
959 substitute_in_operator(&mut intersect.right, params)?;
960 }
961 LogicalOperator::Otherwise(otherwise) => {
962 substitute_in_operator(&mut otherwise.left, params)?;
963 substitute_in_operator(&mut otherwise.right, params)?;
964 }
965 LogicalOperator::Apply(apply) => {
966 substitute_in_operator(&mut apply.input, params)?;
967 substitute_in_operator(&mut apply.subplan, params)?;
968 }
969 LogicalOperator::ParameterScan(_) => {}
971 LogicalOperator::MultiWayJoin(mwj) => {
972 for input in &mut mwj.inputs {
973 substitute_in_operator(input, params)?;
974 }
975 for cond in &mut mwj.conditions {
976 substitute_in_expression(&mut cond.left, params)?;
977 substitute_in_expression(&mut cond.right, params)?;
978 }
979 }
980 LogicalOperator::CreatePropertyGraph(_) => {}
982 LogicalOperator::CallProcedure(_) => {}
984 LogicalOperator::LoadData(_) => {}
986 LogicalOperator::Construct(construct) => {
988 substitute_in_operator(&mut construct.input, params)?;
989 }
990 }
991 Ok(())
992}
993
994fn resolve_count_param(
996 count: &mut crate::query::plan::CountExpr,
997 params: &QueryParams,
998) -> Result<()> {
999 use crate::query::plan::CountExpr;
1000 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
1001
1002 if let CountExpr::Parameter(name) = count {
1003 let value = params.get(name.as_str()).ok_or_else(|| {
1004 Error::Query(QueryError::new(
1005 QueryErrorKind::Semantic,
1006 format!("Missing parameter for SKIP/LIMIT: ${name}"),
1007 ))
1008 })?;
1009 let n = match value {
1010 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
1012 Value::Int64(i) if *i >= 0 => *i as usize,
1013 Value::Int64(i) => {
1014 return Err(Error::Query(QueryError::new(
1015 QueryErrorKind::Semantic,
1016 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
1017 )));
1018 }
1019 other => {
1020 return Err(Error::Query(QueryError::new(
1021 QueryErrorKind::Semantic,
1022 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
1023 )));
1024 }
1025 };
1026 *count = CountExpr::Literal(n);
1027 }
1028 Ok(())
1029}
1030
1031fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
1033 use crate::query::plan::LogicalExpression;
1034
1035 match expr {
1036 LogicalExpression::Parameter(name) => {
1037 if let Some(value) = params.get(name) {
1038 *expr = LogicalExpression::Literal(value.clone());
1039 } else {
1040 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
1041 }
1042 }
1043 LogicalExpression::Binary { left, right, .. } => {
1044 substitute_in_expression(left, params)?;
1045 substitute_in_expression(right, params)?;
1046 }
1047 LogicalExpression::Unary { operand, .. } => {
1048 substitute_in_expression(operand, params)?;
1049 }
1050 LogicalExpression::FunctionCall { args, .. } => {
1051 for arg in args {
1052 substitute_in_expression(arg, params)?;
1053 }
1054 }
1055 LogicalExpression::List(items) => {
1056 for item in items {
1057 substitute_in_expression(item, params)?;
1058 }
1059 }
1060 LogicalExpression::Map(pairs) => {
1061 for (_, value) in pairs {
1062 substitute_in_expression(value, params)?;
1063 }
1064 }
1065 LogicalExpression::IndexAccess { base, index } => {
1066 substitute_in_expression(base, params)?;
1067 substitute_in_expression(index, params)?;
1068 }
1069 LogicalExpression::SliceAccess { base, start, end } => {
1070 substitute_in_expression(base, params)?;
1071 if let Some(s) = start {
1072 substitute_in_expression(s, params)?;
1073 }
1074 if let Some(e) = end {
1075 substitute_in_expression(e, params)?;
1076 }
1077 }
1078 LogicalExpression::Case {
1079 operand,
1080 when_clauses,
1081 else_clause,
1082 } => {
1083 if let Some(op) = operand {
1084 substitute_in_expression(op, params)?;
1085 }
1086 for (cond, result) in when_clauses {
1087 substitute_in_expression(cond, params)?;
1088 substitute_in_expression(result, params)?;
1089 }
1090 if let Some(el) = else_clause {
1091 substitute_in_expression(el, params)?;
1092 }
1093 }
1094 LogicalExpression::Property { .. }
1095 | LogicalExpression::Variable(_)
1096 | LogicalExpression::Literal(_)
1097 | LogicalExpression::Labels(_)
1098 | LogicalExpression::Type(_)
1099 | LogicalExpression::Id(_) => {}
1100 LogicalExpression::ListComprehension {
1101 list_expr,
1102 filter_expr,
1103 map_expr,
1104 ..
1105 } => {
1106 substitute_in_expression(list_expr, params)?;
1107 if let Some(filter) = filter_expr {
1108 substitute_in_expression(filter, params)?;
1109 }
1110 substitute_in_expression(map_expr, params)?;
1111 }
1112 LogicalExpression::ListPredicate {
1113 list_expr,
1114 predicate,
1115 ..
1116 } => {
1117 substitute_in_expression(list_expr, params)?;
1118 substitute_in_expression(predicate, params)?;
1119 }
1120 LogicalExpression::ExistsSubquery(subplan)
1121 | LogicalExpression::CountSubquery(subplan)
1122 | LogicalExpression::ValueSubquery(subplan) => {
1123 substitute_in_operator(subplan, params)?;
1124 }
1125 LogicalExpression::PatternComprehension { projection, .. } => {
1126 substitute_in_expression(projection, params)?;
1127 }
1128 LogicalExpression::MapProjection { entries, .. } => {
1129 for entry in entries {
1130 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1131 substitute_in_expression(expr, params)?;
1132 }
1133 }
1134 }
1135 LogicalExpression::Reduce {
1136 initial,
1137 list,
1138 expression,
1139 ..
1140 } => {
1141 substitute_in_expression(initial, params)?;
1142 substitute_in_expression(list, params)?;
1143 substitute_in_expression(expression, params)?;
1144 }
1145 }
1146 Ok(())
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151 use super::*;
1152
1153 #[test]
1154 fn test_query_language_is_lpg() {
1155 #[cfg(feature = "gql")]
1156 assert!(QueryLanguage::Gql.is_lpg());
1157 #[cfg(feature = "cypher")]
1158 assert!(QueryLanguage::Cypher.is_lpg());
1159 #[cfg(feature = "sparql")]
1160 assert!(!QueryLanguage::Sparql.is_lpg());
1161 }
1162
1163 #[test]
1164 fn test_processor_creation() {
1165 let store = Arc::new(LpgStore::new().unwrap());
1166 let processor = QueryProcessor::for_lpg(store);
1167 assert!(processor.lpg_store().node_count() == 0);
1168 }
1169
1170 #[cfg(feature = "gql")]
1171 #[test]
1172 fn test_process_simple_gql() {
1173 let store = Arc::new(LpgStore::new().unwrap());
1174 store.create_node(&["Person"]);
1175 store.create_node(&["Person"]);
1176
1177 let processor = QueryProcessor::for_lpg(store);
1178 let result = processor
1179 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1180 .unwrap();
1181
1182 assert_eq!(result.row_count(), 2);
1183 assert_eq!(result.columns[0], "n");
1184 }
1185
1186 #[cfg(feature = "cypher")]
1187 #[test]
1188 fn test_process_simple_cypher() {
1189 let store = Arc::new(LpgStore::new().unwrap());
1190 store.create_node(&["Person"]);
1191
1192 let processor = QueryProcessor::for_lpg(store);
1193 let result = processor
1194 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1195 .unwrap();
1196
1197 assert_eq!(result.row_count(), 1);
1198 }
1199
1200 #[cfg(feature = "gql")]
1201 #[test]
1202 fn test_process_with_params() {
1203 let store = Arc::new(LpgStore::new().unwrap());
1204 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1205 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1206 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1207
1208 let processor = QueryProcessor::for_lpg(store);
1209
1210 let mut params = HashMap::new();
1212 params.insert("min_age".to_string(), Value::Int64(30));
1213
1214 let result = processor
1215 .process(
1216 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1217 QueryLanguage::Gql,
1218 Some(¶ms),
1219 )
1220 .unwrap();
1221
1222 assert_eq!(result.row_count(), 2);
1224 }
1225
1226 #[cfg(feature = "gql")]
1227 #[test]
1228 fn test_missing_param_error() {
1229 let store = Arc::new(LpgStore::new().unwrap());
1230 store.create_node(&["Person"]);
1231
1232 let processor = QueryProcessor::for_lpg(store);
1233
1234 let params: HashMap<String, Value> = HashMap::new();
1236 let result = processor.process(
1237 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1238 QueryLanguage::Gql,
1239 Some(¶ms),
1240 );
1241
1242 assert!(result.is_err());
1244 let err = result.unwrap_err();
1245 assert!(
1246 err.to_string().contains("Missing parameter"),
1247 "Expected 'Missing parameter' error, got: {}",
1248 err
1249 );
1250 }
1251
1252 #[cfg(feature = "gql")]
1253 #[test]
1254 fn test_params_in_filter_with_property() {
1255 let store = Arc::new(LpgStore::new().unwrap());
1257 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1258 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1259
1260 let processor = QueryProcessor::for_lpg(store);
1261
1262 let mut params = HashMap::new();
1263 params.insert("threshold".to_string(), Value::Int64(15));
1264
1265 let result = processor
1266 .process(
1267 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1268 QueryLanguage::Gql,
1269 Some(¶ms),
1270 )
1271 .unwrap();
1272
1273 assert_eq!(result.row_count(), 1);
1275 let row = &result.rows[0];
1276 assert_eq!(row[0], Value::Int64(20));
1277 }
1278
1279 #[cfg(feature = "gql")]
1280 #[test]
1281 fn test_params_in_multiple_where_conditions() {
1282 let store = Arc::new(LpgStore::new().unwrap());
1284 store.create_node_with_props(
1285 &["Person"],
1286 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1287 );
1288 store.create_node_with_props(
1289 &["Person"],
1290 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1291 );
1292 store.create_node_with_props(
1293 &["Person"],
1294 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1295 );
1296
1297 let processor = QueryProcessor::for_lpg(store);
1298
1299 let mut params = HashMap::new();
1300 params.insert("min_age".to_string(), Value::Int64(30));
1301 params.insert("min_score".to_string(), Value::Int64(75));
1302
1303 let result = processor
1304 .process(
1305 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1306 QueryLanguage::Gql,
1307 Some(¶ms),
1308 )
1309 .unwrap();
1310
1311 assert_eq!(result.row_count(), 1);
1313 }
1314
1315 #[cfg(feature = "gql")]
1316 #[test]
1317 fn test_params_with_in_list() {
1318 let store = Arc::new(LpgStore::new().unwrap());
1320 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1321 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1322 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1323
1324 let processor = QueryProcessor::for_lpg(store);
1325
1326 let mut params = HashMap::new();
1328 params.insert("target".to_string(), Value::String("active".into()));
1329
1330 let result = processor
1331 .process(
1332 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1333 QueryLanguage::Gql,
1334 Some(¶ms),
1335 )
1336 .unwrap();
1337
1338 assert_eq!(result.row_count(), 1);
1339 }
1340
1341 #[cfg(feature = "gql")]
1342 #[test]
1343 fn test_params_same_type_comparison() {
1344 let store = Arc::new(LpgStore::new().unwrap());
1346 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1347 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1348
1349 let processor = QueryProcessor::for_lpg(store);
1350
1351 let mut params = HashMap::new();
1353 params.insert("threshold".to_string(), Value::Int64(75));
1354
1355 let result = processor
1356 .process(
1357 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1358 QueryLanguage::Gql,
1359 Some(¶ms),
1360 )
1361 .unwrap();
1362
1363 assert_eq!(result.row_count(), 1);
1365 }
1366
1367 #[cfg(feature = "gql")]
1368 #[test]
1369 fn test_process_empty_result_has_columns() {
1370 let store = Arc::new(LpgStore::new().unwrap());
1372 let processor = QueryProcessor::for_lpg(store);
1375 let result = processor
1376 .process(
1377 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1378 QueryLanguage::Gql,
1379 None,
1380 )
1381 .unwrap();
1382
1383 assert_eq!(result.row_count(), 0);
1384 assert_eq!(result.columns.len(), 2);
1385 assert_eq!(result.columns[0], "name");
1386 assert_eq!(result.columns[1], "age");
1387 }
1388
1389 #[cfg(feature = "gql")]
1390 #[test]
1391 fn test_params_string_equality() {
1392 let store = Arc::new(LpgStore::new().unwrap());
1394 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1395 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1396 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1397
1398 let processor = QueryProcessor::for_lpg(store);
1399
1400 let mut params = HashMap::new();
1401 params.insert("target".to_string(), Value::String("beta".into()));
1402
1403 let result = processor
1404 .process(
1405 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1406 QueryLanguage::Gql,
1407 Some(¶ms),
1408 )
1409 .unwrap();
1410
1411 assert_eq!(result.row_count(), 1);
1412 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1413 }
1414
1415 #[cfg(feature = "cypher")]
1416 #[test]
1417 fn test_params_in_exists_subquery() {
1418 let store = Arc::new(LpgStore::new().unwrap());
1421 let alix =
1422 store.create_node_with_props(&["Person"], [("name", Value::String("Alix".into()))]);
1423 let gus =
1424 store.create_node_with_props(&["Person"], [("name", Value::String("Gus".into()))]);
1425 let _jules =
1426 store.create_node_with_props(&["Person"], [("name", Value::String("Jules".into()))]);
1427
1428 store.create_edge(alix, gus, "FOLLOWS");
1430
1431 let processor = QueryProcessor::for_lpg(store);
1432
1433 let mut params = HashMap::new();
1435 params.insert("viewer".to_string(), Value::String("Alix".into()));
1436
1437 let result = processor
1438 .process(
1439 "MATCH (p:Person) \
1440 WHERE p.name <> $viewer \
1441 AND NOT EXISTS { MATCH (v:Person)-[:FOLLOWS]->(p) WHERE v.name = $viewer } \
1442 RETURN p.name ORDER BY p.name",
1443 QueryLanguage::Cypher,
1444 Some(¶ms),
1445 )
1446 .unwrap();
1447
1448 assert_eq!(result.row_count(), 1);
1450 assert_eq!(result.rows[0][0], Value::String("Jules".into()));
1451 }
1452}