1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::grafeo_debug_span;
13use grafeo_common::types::{EpochId, TransactionId, Value};
14use grafeo_common::utils::error::{Error, Result};
15use grafeo_core::graph::lpg::LpgStore;
16use grafeo_core::graph::{GraphStore, GraphStoreMut};
17
18use crate::catalog::Catalog;
19use crate::database::QueryResult;
20use crate::query::binder::Binder;
21use crate::query::executor::Executor;
22use crate::query::optimizer::Optimizer;
23use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
24use crate::query::planner::Planner;
25use crate::transaction::TransactionManager;
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub enum QueryLanguage {
30 #[cfg(feature = "gql")]
32 Gql,
33 #[cfg(feature = "cypher")]
35 Cypher,
36 #[cfg(feature = "gremlin")]
38 Gremlin,
39 #[cfg(feature = "graphql")]
41 GraphQL,
42 #[cfg(feature = "sql-pgq")]
44 SqlPgq,
45 #[cfg(feature = "sparql")]
47 Sparql,
48 #[cfg(all(feature = "graphql", feature = "rdf"))]
50 GraphQLRdf,
51}
52
53impl QueryLanguage {
54 #[must_use]
56 pub const fn is_lpg(&self) -> bool {
57 match self {
58 #[cfg(feature = "gql")]
59 Self::Gql => true,
60 #[cfg(feature = "cypher")]
61 Self::Cypher => true,
62 #[cfg(feature = "gremlin")]
63 Self::Gremlin => true,
64 #[cfg(feature = "graphql")]
65 Self::GraphQL => true,
66 #[cfg(feature = "sql-pgq")]
67 Self::SqlPgq => true,
68 #[cfg(feature = "sparql")]
69 Self::Sparql => false,
70 #[cfg(all(feature = "graphql", feature = "rdf"))]
71 Self::GraphQLRdf => false,
72 }
73 }
74}
75
76pub type QueryParams = HashMap<String, Value>;
78
79pub struct QueryProcessor {
99 lpg_store: Arc<LpgStore>,
101 graph_store: Arc<dyn GraphStore>,
103 write_store: Option<Arc<dyn GraphStoreMut>>,
105 transaction_manager: Arc<TransactionManager>,
107 catalog: Arc<Catalog>,
109 optimizer: Optimizer,
111 transaction_context: Option<(EpochId, TransactionId)>,
113 #[cfg(feature = "rdf")]
115 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
116}
117
118impl QueryProcessor {
119 #[must_use]
121 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
122 let optimizer = Optimizer::from_store(&store);
123 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
124 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
125 Self {
126 lpg_store: store,
127 graph_store,
128 write_store,
129 transaction_manager: Arc::new(TransactionManager::new()),
130 catalog: Arc::new(Catalog::new()),
131 optimizer,
132 transaction_context: None,
133 #[cfg(feature = "rdf")]
134 rdf_store: None,
135 }
136 }
137
138 #[must_use]
140 pub fn for_lpg_with_transaction(
141 store: Arc<LpgStore>,
142 transaction_manager: Arc<TransactionManager>,
143 ) -> Self {
144 let optimizer = Optimizer::from_store(&store);
145 let graph_store = Arc::clone(&store) as Arc<dyn GraphStore>;
146 let write_store = Some(Arc::clone(&store) as Arc<dyn GraphStoreMut>);
147 Self {
148 lpg_store: store,
149 graph_store,
150 write_store,
151 transaction_manager,
152 catalog: Arc::new(Catalog::new()),
153 optimizer,
154 transaction_context: None,
155 #[cfg(feature = "rdf")]
156 rdf_store: None,
157 }
158 }
159
160 pub fn for_graph_store_with_transaction(
166 store: Arc<dyn GraphStoreMut>,
167 transaction_manager: Arc<TransactionManager>,
168 ) -> Result<Self> {
169 let optimizer = Optimizer::from_graph_store(&*store);
170 let read_store = Arc::clone(&store) as Arc<dyn GraphStore>;
171 Ok(Self {
172 lpg_store: Arc::new(LpgStore::new()?),
173 graph_store: read_store,
174 write_store: Some(store),
175 transaction_manager,
176 catalog: Arc::new(Catalog::new()),
177 optimizer,
178 transaction_context: None,
179 #[cfg(feature = "rdf")]
180 rdf_store: None,
181 })
182 }
183
184 pub fn for_stores_with_transaction(
190 read_store: Arc<dyn GraphStore>,
191 write_store: Option<Arc<dyn GraphStoreMut>>,
192 transaction_manager: Arc<TransactionManager>,
193 ) -> Result<Self> {
194 let optimizer = Optimizer::from_graph_store(&*read_store);
195 Ok(Self {
196 lpg_store: Arc::new(LpgStore::new()?),
197 graph_store: read_store,
198 write_store,
199 transaction_manager,
200 catalog: Arc::new(Catalog::new()),
201 optimizer,
202 transaction_context: None,
203 #[cfg(feature = "rdf")]
204 rdf_store: None,
205 })
206 }
207
208 #[must_use]
212 pub fn with_transaction_context(
213 mut self,
214 viewing_epoch: EpochId,
215 transaction_id: TransactionId,
216 ) -> Self {
217 self.transaction_context = Some((viewing_epoch, transaction_id));
218 self
219 }
220
221 #[must_use]
223 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
224 self.catalog = catalog;
225 self
226 }
227
228 #[must_use]
230 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
231 self.optimizer = optimizer;
232 self
233 }
234
235 pub fn process(
255 &self,
256 query: &str,
257 language: QueryLanguage,
258 params: Option<&QueryParams>,
259 ) -> Result<QueryResult> {
260 if language.is_lpg() {
261 self.process_lpg(query, language, params)
262 } else {
263 #[cfg(feature = "rdf")]
264 {
265 self.process_rdf(query, language, params)
266 }
267 #[cfg(not(feature = "rdf"))]
268 {
269 Err(Error::Internal(
270 "RDF support not enabled. Compile with --features rdf".to_string(),
271 ))
272 }
273 }
274 }
275
276 fn process_lpg(
278 &self,
279 query: &str,
280 language: QueryLanguage,
281 params: Option<&QueryParams>,
282 ) -> Result<QueryResult> {
283 #[cfg(not(target_arch = "wasm32"))]
284 let start_time = std::time::Instant::now();
285
286 let mut logical_plan = self.translate_lpg(query, language)?;
288
289 let has_defaults = !logical_plan.default_params.is_empty();
291 if params.is_some() || has_defaults {
292 let merged = if has_defaults {
293 let mut merged = logical_plan.default_params.clone();
294 if let Some(params) = params {
295 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
296 }
297 merged
298 } else {
299 params.cloned().unwrap_or_default()
300 };
301 substitute_params(&mut logical_plan, &merged)?;
302 }
303
304 let mut binder = Binder::new();
306 let _binding_context = binder.bind(&logical_plan)?;
307
308 let optimized_plan = self.optimizer.optimize(logical_plan)?;
310
311 if optimized_plan.explain {
313 let mut plan = optimized_plan;
314 annotate_pushdown_hints(&mut plan.root, self.graph_store.as_ref());
315 return Ok(explain_result(&plan));
316 }
317
318 let is_read_only =
322 !optimized_plan.root.has_mutations() && self.transaction_context.is_none();
323 let planner = if let Some((epoch, transaction_id)) = self.transaction_context {
324 Planner::with_context(
325 Arc::clone(&self.graph_store),
326 self.write_store.as_ref().map(Arc::clone),
327 Arc::clone(&self.transaction_manager),
328 Some(transaction_id),
329 epoch,
330 )
331 } else {
332 Planner::with_context(
333 Arc::clone(&self.graph_store),
334 self.write_store.as_ref().map(Arc::clone),
335 Arc::clone(&self.transaction_manager),
336 None,
337 self.transaction_manager.current_epoch(),
338 )
339 }
340 .with_read_only(is_read_only);
341 let mut physical_plan = planner.plan(&optimized_plan)?;
342
343 let executor = Executor::with_columns(physical_plan.columns.clone());
345 let mut result = executor.execute(physical_plan.operator.as_mut())?;
346
347 let rows_scanned = result.rows.len() as u64; #[cfg(not(target_arch = "wasm32"))]
350 {
351 let elapsed_ms = start_time.elapsed().as_secs_f64() * 1000.0;
352 result.execution_time_ms = Some(elapsed_ms);
353 }
354 result.rows_scanned = Some(rows_scanned);
355
356 Ok(result)
357 }
358
359 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
361 let _span = grafeo_debug_span!("grafeo::query::parse", ?language);
362 match language {
363 #[cfg(feature = "gql")]
364 QueryLanguage::Gql => {
365 use crate::query::translators::gql;
366 gql::translate(query)
367 }
368 #[cfg(feature = "cypher")]
369 QueryLanguage::Cypher => {
370 use crate::query::translators::cypher;
371 cypher::translate(query)
372 }
373 #[cfg(feature = "gremlin")]
374 QueryLanguage::Gremlin => {
375 use crate::query::translators::gremlin;
376 gremlin::translate(query)
377 }
378 #[cfg(feature = "graphql")]
379 QueryLanguage::GraphQL => {
380 use crate::query::translators::graphql;
381 graphql::translate(query)
382 }
383 #[cfg(feature = "sql-pgq")]
384 QueryLanguage::SqlPgq => {
385 use crate::query::translators::sql_pgq;
386 sql_pgq::translate(query)
387 }
388 #[allow(unreachable_patterns)]
389 _ => Err(Error::Internal(format!(
390 "Language {:?} is not an LPG language",
391 language
392 ))),
393 }
394 }
395
396 #[must_use]
398 pub fn lpg_store(&self) -> &Arc<LpgStore> {
399 &self.lpg_store
400 }
401
402 #[must_use]
404 pub fn catalog(&self) -> &Arc<Catalog> {
405 &self.catalog
406 }
407
408 #[must_use]
410 pub fn optimizer(&self) -> &Optimizer {
411 &self.optimizer
412 }
413}
414
415impl QueryProcessor {
416 #[must_use]
418 pub fn transaction_manager(&self) -> &Arc<TransactionManager> {
419 &self.transaction_manager
420 }
421}
422
423#[cfg(feature = "rdf")]
428impl QueryProcessor {
429 #[must_use]
431 pub fn with_rdf(
432 lpg_store: Arc<LpgStore>,
433 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
434 ) -> Self {
435 let optimizer = Optimizer::from_store(&lpg_store);
436 let graph_store = Arc::clone(&lpg_store) as Arc<dyn GraphStore>;
437 let write_store = Some(Arc::clone(&lpg_store) as Arc<dyn GraphStoreMut>);
438 Self {
439 lpg_store,
440 graph_store,
441 write_store,
442 transaction_manager: Arc::new(TransactionManager::new()),
443 catalog: Arc::new(Catalog::new()),
444 optimizer,
445 transaction_context: None,
446 rdf_store: Some(rdf_store),
447 }
448 }
449
450 #[must_use]
452 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
453 self.rdf_store.as_ref()
454 }
455
456 fn process_rdf(
458 &self,
459 query: &str,
460 language: QueryLanguage,
461 params: Option<&QueryParams>,
462 ) -> Result<QueryResult> {
463 use crate::query::planner::rdf::RdfPlanner;
464
465 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
466 Error::Internal("RDF store not configured for this processor".to_string())
467 })?;
468
469 let mut logical_plan = self.translate_rdf(query, language)?;
471
472 let has_defaults = !logical_plan.default_params.is_empty();
474 if params.is_some() || has_defaults {
475 let merged = if has_defaults {
476 let mut merged = logical_plan.default_params.clone();
477 if let Some(params) = params {
478 merged.extend(params.iter().map(|(k, v)| (k.clone(), v.clone())));
479 }
480 merged
481 } else {
482 params.cloned().unwrap_or_default()
483 };
484 substitute_params(&mut logical_plan, &merged)?;
485 }
486
487 let mut binder = Binder::new();
489 let _binding_context = binder.bind(&logical_plan)?;
490
491 let optimized_plan = self.optimizer.optimize(logical_plan)?;
493
494 if optimized_plan.explain {
496 return Ok(explain_result(&optimized_plan));
497 }
498
499 let planner = RdfPlanner::new(Arc::clone(rdf_store));
501 let mut physical_plan = planner.plan(&optimized_plan)?;
502
503 let executor = Executor::with_columns(physical_plan.columns.clone());
505 executor.execute(physical_plan.operator.as_mut())
506 }
507
508 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
510 match language {
511 #[cfg(feature = "sparql")]
512 QueryLanguage::Sparql => {
513 use crate::query::translators::sparql;
514 sparql::translate(query)
515 }
516 #[cfg(all(feature = "graphql", feature = "rdf"))]
517 QueryLanguage::GraphQLRdf => {
518 use crate::query::translators::graphql_rdf;
519 graphql_rdf::translate(query, "http://example.org/")
521 }
522 _ => Err(Error::Internal(format!(
523 "Language {:?} is not an RDF language",
524 language
525 ))),
526 }
527 }
528}
529
530pub(crate) fn annotate_pushdown_hints(
535 op: &mut LogicalOperator,
536 store: &dyn grafeo_core::graph::GraphStore,
537) {
538 #[allow(clippy::wildcard_imports)]
539 use crate::query::plan::*;
540
541 match op {
542 LogicalOperator::Filter(filter) => {
543 annotate_pushdown_hints(&mut filter.input, store);
545
546 if let LogicalOperator::NodeScan(scan) = filter.input.as_ref() {
548 filter.pushdown_hint = infer_pushdown(&filter.predicate, scan, store);
549 }
550 }
551 LogicalOperator::NodeScan(op) => {
552 if let Some(input) = &mut op.input {
553 annotate_pushdown_hints(input, store);
554 }
555 }
556 LogicalOperator::EdgeScan(op) => {
557 if let Some(input) = &mut op.input {
558 annotate_pushdown_hints(input, store);
559 }
560 }
561 LogicalOperator::Expand(op) => annotate_pushdown_hints(&mut op.input, store),
562 LogicalOperator::Project(op) => annotate_pushdown_hints(&mut op.input, store),
563 LogicalOperator::Join(op) => {
564 annotate_pushdown_hints(&mut op.left, store);
565 annotate_pushdown_hints(&mut op.right, store);
566 }
567 LogicalOperator::Aggregate(op) => annotate_pushdown_hints(&mut op.input, store),
568 LogicalOperator::Limit(op) => annotate_pushdown_hints(&mut op.input, store),
569 LogicalOperator::Skip(op) => annotate_pushdown_hints(&mut op.input, store),
570 LogicalOperator::Sort(op) => annotate_pushdown_hints(&mut op.input, store),
571 LogicalOperator::Distinct(op) => annotate_pushdown_hints(&mut op.input, store),
572 LogicalOperator::Return(op) => annotate_pushdown_hints(&mut op.input, store),
573 LogicalOperator::Union(op) => {
574 for input in &mut op.inputs {
575 annotate_pushdown_hints(input, store);
576 }
577 }
578 LogicalOperator::Apply(op) => {
579 annotate_pushdown_hints(&mut op.input, store);
580 annotate_pushdown_hints(&mut op.subplan, store);
581 }
582 LogicalOperator::Otherwise(op) => {
583 annotate_pushdown_hints(&mut op.left, store);
584 annotate_pushdown_hints(&mut op.right, store);
585 }
586 _ => {}
587 }
588}
589
590fn infer_pushdown(
592 predicate: &LogicalExpression,
593 scan: &crate::query::plan::NodeScanOp,
594 store: &dyn grafeo_core::graph::GraphStore,
595) -> Option<crate::query::plan::PushdownHint> {
596 #[allow(clippy::wildcard_imports)]
597 use crate::query::plan::*;
598
599 match predicate {
600 LogicalExpression::Binary { left, op, right } if *op == BinaryOp::Eq => {
602 if let Some(prop) = extract_property_name(left, &scan.variable)
603 .or_else(|| extract_property_name(right, &scan.variable))
604 {
605 if store.has_property_index(&prop) {
606 return Some(PushdownHint::IndexLookup { property: prop });
607 }
608 if scan.label.is_some() {
609 return Some(PushdownHint::LabelFirst);
610 }
611 }
612 None
613 }
614 LogicalExpression::Binary {
616 left,
617 op: BinaryOp::Gt | BinaryOp::Ge | BinaryOp::Lt | BinaryOp::Le,
618 right,
619 } => {
620 if let Some(prop) = extract_property_name(left, &scan.variable)
621 .or_else(|| extract_property_name(right, &scan.variable))
622 {
623 if store.has_property_index(&prop) {
624 return Some(PushdownHint::RangeScan { property: prop });
625 }
626 if scan.label.is_some() {
627 return Some(PushdownHint::LabelFirst);
628 }
629 }
630 None
631 }
632 LogicalExpression::Binary {
634 left,
635 op: BinaryOp::And,
636 ..
637 } => infer_pushdown(left, scan, store),
638 _ => {
639 if scan.label.is_some() {
641 Some(PushdownHint::LabelFirst)
642 } else {
643 None
644 }
645 }
646 }
647}
648
649fn extract_property_name(expr: &LogicalExpression, scan_var: &str) -> Option<String> {
652 if let LogicalExpression::Property { variable, property } = expr
653 && variable == scan_var
654 {
655 Some(property.clone())
656 } else {
657 None
658 }
659}
660
661pub(crate) fn explain_result(plan: &LogicalPlan) -> QueryResult {
663 let tree_text = plan.root.explain_tree();
664 QueryResult {
665 columns: vec!["plan".to_string()],
666 column_types: vec![grafeo_common::types::LogicalType::String],
667 rows: vec![vec![Value::String(tree_text.into())]],
668 execution_time_ms: None,
669 rows_scanned: None,
670 status_message: None,
671 gql_status: grafeo_common::utils::GqlStatus::SUCCESS,
672 }
673}
674
675pub fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
677 substitute_in_operator(&mut plan.root, params)
678}
679
680fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
682 #[allow(clippy::wildcard_imports)]
683 use crate::query::plan::*;
684
685 match op {
686 LogicalOperator::Filter(filter) => {
687 substitute_in_expression(&mut filter.predicate, params)?;
688 substitute_in_operator(&mut filter.input, params)?;
689 }
690 LogicalOperator::Return(ret) => {
691 for item in &mut ret.items {
692 substitute_in_expression(&mut item.expression, params)?;
693 }
694 substitute_in_operator(&mut ret.input, params)?;
695 }
696 LogicalOperator::Project(proj) => {
697 for p in &mut proj.projections {
698 substitute_in_expression(&mut p.expression, params)?;
699 }
700 substitute_in_operator(&mut proj.input, params)?;
701 }
702 LogicalOperator::NodeScan(scan) => {
703 if let Some(input) = &mut scan.input {
704 substitute_in_operator(input, params)?;
705 }
706 }
707 LogicalOperator::EdgeScan(scan) => {
708 if let Some(input) = &mut scan.input {
709 substitute_in_operator(input, params)?;
710 }
711 }
712 LogicalOperator::Expand(expand) => {
713 substitute_in_operator(&mut expand.input, params)?;
714 }
715 LogicalOperator::Join(join) => {
716 substitute_in_operator(&mut join.left, params)?;
717 substitute_in_operator(&mut join.right, params)?;
718 for cond in &mut join.conditions {
719 substitute_in_expression(&mut cond.left, params)?;
720 substitute_in_expression(&mut cond.right, params)?;
721 }
722 }
723 LogicalOperator::LeftJoin(join) => {
724 substitute_in_operator(&mut join.left, params)?;
725 substitute_in_operator(&mut join.right, params)?;
726 if let Some(cond) = &mut join.condition {
727 substitute_in_expression(cond, params)?;
728 }
729 }
730 LogicalOperator::Aggregate(agg) => {
731 for expr in &mut agg.group_by {
732 substitute_in_expression(expr, params)?;
733 }
734 for agg_expr in &mut agg.aggregates {
735 if let Some(expr) = &mut agg_expr.expression {
736 substitute_in_expression(expr, params)?;
737 }
738 }
739 substitute_in_operator(&mut agg.input, params)?;
740 }
741 LogicalOperator::Sort(sort) => {
742 for key in &mut sort.keys {
743 substitute_in_expression(&mut key.expression, params)?;
744 }
745 substitute_in_operator(&mut sort.input, params)?;
746 }
747 LogicalOperator::Limit(limit) => {
748 resolve_count_param(&mut limit.count, params)?;
749 substitute_in_operator(&mut limit.input, params)?;
750 }
751 LogicalOperator::Skip(skip) => {
752 resolve_count_param(&mut skip.count, params)?;
753 substitute_in_operator(&mut skip.input, params)?;
754 }
755 LogicalOperator::Distinct(distinct) => {
756 substitute_in_operator(&mut distinct.input, params)?;
757 }
758 LogicalOperator::CreateNode(create) => {
759 for (_, expr) in &mut create.properties {
760 substitute_in_expression(expr, params)?;
761 }
762 if let Some(input) = &mut create.input {
763 substitute_in_operator(input, params)?;
764 }
765 }
766 LogicalOperator::CreateEdge(create) => {
767 for (_, expr) in &mut create.properties {
768 substitute_in_expression(expr, params)?;
769 }
770 substitute_in_operator(&mut create.input, params)?;
771 }
772 LogicalOperator::DeleteNode(delete) => {
773 substitute_in_operator(&mut delete.input, params)?;
774 }
775 LogicalOperator::DeleteEdge(delete) => {
776 substitute_in_operator(&mut delete.input, params)?;
777 }
778 LogicalOperator::SetProperty(set) => {
779 for (_, expr) in &mut set.properties {
780 substitute_in_expression(expr, params)?;
781 }
782 substitute_in_operator(&mut set.input, params)?;
783 }
784 LogicalOperator::Union(union) => {
785 for input in &mut union.inputs {
786 substitute_in_operator(input, params)?;
787 }
788 }
789 LogicalOperator::AntiJoin(anti) => {
790 substitute_in_operator(&mut anti.left, params)?;
791 substitute_in_operator(&mut anti.right, params)?;
792 }
793 LogicalOperator::Bind(bind) => {
794 substitute_in_expression(&mut bind.expression, params)?;
795 substitute_in_operator(&mut bind.input, params)?;
796 }
797 LogicalOperator::TripleScan(scan) => {
798 if let Some(input) = &mut scan.input {
799 substitute_in_operator(input, params)?;
800 }
801 }
802 LogicalOperator::Unwind(unwind) => {
803 substitute_in_expression(&mut unwind.expression, params)?;
804 substitute_in_operator(&mut unwind.input, params)?;
805 }
806 LogicalOperator::MapCollect(mc) => {
807 substitute_in_operator(&mut mc.input, params)?;
808 }
809 LogicalOperator::Merge(merge) => {
810 for (_, expr) in &mut merge.match_properties {
811 substitute_in_expression(expr, params)?;
812 }
813 for (_, expr) in &mut merge.on_create {
814 substitute_in_expression(expr, params)?;
815 }
816 for (_, expr) in &mut merge.on_match {
817 substitute_in_expression(expr, params)?;
818 }
819 substitute_in_operator(&mut merge.input, params)?;
820 }
821 LogicalOperator::MergeRelationship(merge_rel) => {
822 for (_, expr) in &mut merge_rel.match_properties {
823 substitute_in_expression(expr, params)?;
824 }
825 for (_, expr) in &mut merge_rel.on_create {
826 substitute_in_expression(expr, params)?;
827 }
828 for (_, expr) in &mut merge_rel.on_match {
829 substitute_in_expression(expr, params)?;
830 }
831 substitute_in_operator(&mut merge_rel.input, params)?;
832 }
833 LogicalOperator::AddLabel(add_label) => {
834 substitute_in_operator(&mut add_label.input, params)?;
835 }
836 LogicalOperator::RemoveLabel(remove_label) => {
837 substitute_in_operator(&mut remove_label.input, params)?;
838 }
839 LogicalOperator::ShortestPath(sp) => {
840 substitute_in_operator(&mut sp.input, params)?;
841 }
842 LogicalOperator::InsertTriple(insert) => {
844 if let Some(ref mut input) = insert.input {
845 substitute_in_operator(input, params)?;
846 }
847 }
848 LogicalOperator::DeleteTriple(delete) => {
849 if let Some(ref mut input) = delete.input {
850 substitute_in_operator(input, params)?;
851 }
852 }
853 LogicalOperator::Modify(modify) => {
854 substitute_in_operator(&mut modify.where_clause, params)?;
855 }
856 LogicalOperator::ClearGraph(_)
857 | LogicalOperator::CreateGraph(_)
858 | LogicalOperator::DropGraph(_)
859 | LogicalOperator::LoadGraph(_)
860 | LogicalOperator::CopyGraph(_)
861 | LogicalOperator::MoveGraph(_)
862 | LogicalOperator::AddGraph(_) => {}
863 LogicalOperator::HorizontalAggregate(op) => {
864 substitute_in_operator(&mut op.input, params)?;
865 }
866 LogicalOperator::Empty => {}
867 LogicalOperator::VectorScan(scan) => {
868 substitute_in_expression(&mut scan.query_vector, params)?;
869 if let Some(ref mut input) = scan.input {
870 substitute_in_operator(input, params)?;
871 }
872 }
873 LogicalOperator::VectorJoin(join) => {
874 substitute_in_expression(&mut join.query_vector, params)?;
875 substitute_in_operator(&mut join.input, params)?;
876 }
877 LogicalOperator::Except(except) => {
878 substitute_in_operator(&mut except.left, params)?;
879 substitute_in_operator(&mut except.right, params)?;
880 }
881 LogicalOperator::Intersect(intersect) => {
882 substitute_in_operator(&mut intersect.left, params)?;
883 substitute_in_operator(&mut intersect.right, params)?;
884 }
885 LogicalOperator::Otherwise(otherwise) => {
886 substitute_in_operator(&mut otherwise.left, params)?;
887 substitute_in_operator(&mut otherwise.right, params)?;
888 }
889 LogicalOperator::Apply(apply) => {
890 substitute_in_operator(&mut apply.input, params)?;
891 substitute_in_operator(&mut apply.subplan, params)?;
892 }
893 LogicalOperator::ParameterScan(_) => {}
895 LogicalOperator::MultiWayJoin(mwj) => {
896 for input in &mut mwj.inputs {
897 substitute_in_operator(input, params)?;
898 }
899 for cond in &mut mwj.conditions {
900 substitute_in_expression(&mut cond.left, params)?;
901 substitute_in_expression(&mut cond.right, params)?;
902 }
903 }
904 LogicalOperator::CreatePropertyGraph(_) => {}
906 LogicalOperator::CallProcedure(_) => {}
908 LogicalOperator::LoadData(_) => {}
910 }
911 Ok(())
912}
913
914fn resolve_count_param(
916 count: &mut crate::query::plan::CountExpr,
917 params: &QueryParams,
918) -> Result<()> {
919 use crate::query::plan::CountExpr;
920 use grafeo_common::utils::error::{QueryError, QueryErrorKind};
921
922 if let CountExpr::Parameter(name) = count {
923 let value = params.get(name.as_str()).ok_or_else(|| {
924 Error::Query(QueryError::new(
925 QueryErrorKind::Semantic,
926 format!("Missing parameter for SKIP/LIMIT: ${name}"),
927 ))
928 })?;
929 let n = match value {
930 Value::Int64(i) if *i >= 0 => *i as usize,
931 Value::Int64(i) => {
932 return Err(Error::Query(QueryError::new(
933 QueryErrorKind::Semantic,
934 format!("SKIP/LIMIT parameter ${name} must be non-negative, got {i}"),
935 )));
936 }
937 other => {
938 return Err(Error::Query(QueryError::new(
939 QueryErrorKind::Semantic,
940 format!("SKIP/LIMIT parameter ${name} must be an integer, got {other:?}"),
941 )));
942 }
943 };
944 *count = CountExpr::Literal(n);
945 }
946 Ok(())
947}
948
949fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
951 use crate::query::plan::LogicalExpression;
952
953 match expr {
954 LogicalExpression::Parameter(name) => {
955 if let Some(value) = params.get(name) {
956 *expr = LogicalExpression::Literal(value.clone());
957 } else {
958 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
959 }
960 }
961 LogicalExpression::Binary { left, right, .. } => {
962 substitute_in_expression(left, params)?;
963 substitute_in_expression(right, params)?;
964 }
965 LogicalExpression::Unary { operand, .. } => {
966 substitute_in_expression(operand, params)?;
967 }
968 LogicalExpression::FunctionCall { args, .. } => {
969 for arg in args {
970 substitute_in_expression(arg, params)?;
971 }
972 }
973 LogicalExpression::List(items) => {
974 for item in items {
975 substitute_in_expression(item, params)?;
976 }
977 }
978 LogicalExpression::Map(pairs) => {
979 for (_, value) in pairs {
980 substitute_in_expression(value, params)?;
981 }
982 }
983 LogicalExpression::IndexAccess { base, index } => {
984 substitute_in_expression(base, params)?;
985 substitute_in_expression(index, params)?;
986 }
987 LogicalExpression::SliceAccess { base, start, end } => {
988 substitute_in_expression(base, params)?;
989 if let Some(s) = start {
990 substitute_in_expression(s, params)?;
991 }
992 if let Some(e) = end {
993 substitute_in_expression(e, params)?;
994 }
995 }
996 LogicalExpression::Case {
997 operand,
998 when_clauses,
999 else_clause,
1000 } => {
1001 if let Some(op) = operand {
1002 substitute_in_expression(op, params)?;
1003 }
1004 for (cond, result) in when_clauses {
1005 substitute_in_expression(cond, params)?;
1006 substitute_in_expression(result, params)?;
1007 }
1008 if let Some(el) = else_clause {
1009 substitute_in_expression(el, params)?;
1010 }
1011 }
1012 LogicalExpression::Property { .. }
1013 | LogicalExpression::Variable(_)
1014 | LogicalExpression::Literal(_)
1015 | LogicalExpression::Labels(_)
1016 | LogicalExpression::Type(_)
1017 | LogicalExpression::Id(_) => {}
1018 LogicalExpression::ListComprehension {
1019 list_expr,
1020 filter_expr,
1021 map_expr,
1022 ..
1023 } => {
1024 substitute_in_expression(list_expr, params)?;
1025 if let Some(filter) = filter_expr {
1026 substitute_in_expression(filter, params)?;
1027 }
1028 substitute_in_expression(map_expr, params)?;
1029 }
1030 LogicalExpression::ListPredicate {
1031 list_expr,
1032 predicate,
1033 ..
1034 } => {
1035 substitute_in_expression(list_expr, params)?;
1036 substitute_in_expression(predicate, params)?;
1037 }
1038 LogicalExpression::ExistsSubquery(_)
1039 | LogicalExpression::CountSubquery(_)
1040 | LogicalExpression::ValueSubquery(_) => {
1041 }
1043 LogicalExpression::PatternComprehension { projection, .. } => {
1044 substitute_in_expression(projection, params)?;
1045 }
1046 LogicalExpression::MapProjection { entries, .. } => {
1047 for entry in entries {
1048 if let crate::query::plan::MapProjectionEntry::LiteralEntry(_, expr) = entry {
1049 substitute_in_expression(expr, params)?;
1050 }
1051 }
1052 }
1053 LogicalExpression::Reduce {
1054 initial,
1055 list,
1056 expression,
1057 ..
1058 } => {
1059 substitute_in_expression(initial, params)?;
1060 substitute_in_expression(list, params)?;
1061 substitute_in_expression(expression, params)?;
1062 }
1063 }
1064 Ok(())
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070
1071 #[test]
1072 fn test_query_language_is_lpg() {
1073 #[cfg(feature = "gql")]
1074 assert!(QueryLanguage::Gql.is_lpg());
1075 #[cfg(feature = "cypher")]
1076 assert!(QueryLanguage::Cypher.is_lpg());
1077 #[cfg(feature = "sparql")]
1078 assert!(!QueryLanguage::Sparql.is_lpg());
1079 }
1080
1081 #[test]
1082 fn test_processor_creation() {
1083 let store = Arc::new(LpgStore::new().unwrap());
1084 let processor = QueryProcessor::for_lpg(store);
1085 assert!(processor.lpg_store().node_count() == 0);
1086 }
1087
1088 #[cfg(feature = "gql")]
1089 #[test]
1090 fn test_process_simple_gql() {
1091 let store = Arc::new(LpgStore::new().unwrap());
1092 store.create_node(&["Person"]);
1093 store.create_node(&["Person"]);
1094
1095 let processor = QueryProcessor::for_lpg(store);
1096 let result = processor
1097 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
1098 .unwrap();
1099
1100 assert_eq!(result.row_count(), 2);
1101 assert_eq!(result.columns[0], "n");
1102 }
1103
1104 #[cfg(feature = "cypher")]
1105 #[test]
1106 fn test_process_simple_cypher() {
1107 let store = Arc::new(LpgStore::new().unwrap());
1108 store.create_node(&["Person"]);
1109
1110 let processor = QueryProcessor::for_lpg(store);
1111 let result = processor
1112 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
1113 .unwrap();
1114
1115 assert_eq!(result.row_count(), 1);
1116 }
1117
1118 #[cfg(feature = "gql")]
1119 #[test]
1120 fn test_process_with_params() {
1121 let store = Arc::new(LpgStore::new().unwrap());
1122 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
1123 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
1124 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
1125
1126 let processor = QueryProcessor::for_lpg(store);
1127
1128 let mut params = HashMap::new();
1130 params.insert("min_age".to_string(), Value::Int64(30));
1131
1132 let result = processor
1133 .process(
1134 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1135 QueryLanguage::Gql,
1136 Some(¶ms),
1137 )
1138 .unwrap();
1139
1140 assert_eq!(result.row_count(), 2);
1142 }
1143
1144 #[cfg(feature = "gql")]
1145 #[test]
1146 fn test_missing_param_error() {
1147 let store = Arc::new(LpgStore::new().unwrap());
1148 store.create_node(&["Person"]);
1149
1150 let processor = QueryProcessor::for_lpg(store);
1151
1152 let params: HashMap<String, Value> = HashMap::new();
1154 let result = processor.process(
1155 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
1156 QueryLanguage::Gql,
1157 Some(¶ms),
1158 );
1159
1160 assert!(result.is_err());
1162 let err = result.unwrap_err();
1163 assert!(
1164 err.to_string().contains("Missing parameter"),
1165 "Expected 'Missing parameter' error, got: {}",
1166 err
1167 );
1168 }
1169
1170 #[cfg(feature = "gql")]
1171 #[test]
1172 fn test_params_in_filter_with_property() {
1173 let store = Arc::new(LpgStore::new().unwrap());
1175 store.create_node_with_props(&["Num"], [("value", Value::Int64(10))]);
1176 store.create_node_with_props(&["Num"], [("value", Value::Int64(20))]);
1177
1178 let processor = QueryProcessor::for_lpg(store);
1179
1180 let mut params = HashMap::new();
1181 params.insert("threshold".to_string(), Value::Int64(15));
1182
1183 let result = processor
1184 .process(
1185 "MATCH (n:Num) WHERE n.value > $threshold RETURN n.value",
1186 QueryLanguage::Gql,
1187 Some(¶ms),
1188 )
1189 .unwrap();
1190
1191 assert_eq!(result.row_count(), 1);
1193 let row = &result.rows[0];
1194 assert_eq!(row[0], Value::Int64(20));
1195 }
1196
1197 #[cfg(feature = "gql")]
1198 #[test]
1199 fn test_params_in_multiple_where_conditions() {
1200 let store = Arc::new(LpgStore::new().unwrap());
1202 store.create_node_with_props(
1203 &["Person"],
1204 [("age", Value::Int64(25)), ("score", Value::Int64(80))],
1205 );
1206 store.create_node_with_props(
1207 &["Person"],
1208 [("age", Value::Int64(35)), ("score", Value::Int64(90))],
1209 );
1210 store.create_node_with_props(
1211 &["Person"],
1212 [("age", Value::Int64(45)), ("score", Value::Int64(70))],
1213 );
1214
1215 let processor = QueryProcessor::for_lpg(store);
1216
1217 let mut params = HashMap::new();
1218 params.insert("min_age".to_string(), Value::Int64(30));
1219 params.insert("min_score".to_string(), Value::Int64(75));
1220
1221 let result = processor
1222 .process(
1223 "MATCH (n:Person) WHERE n.age > $min_age AND n.score > $min_score RETURN n",
1224 QueryLanguage::Gql,
1225 Some(¶ms),
1226 )
1227 .unwrap();
1228
1229 assert_eq!(result.row_count(), 1);
1231 }
1232
1233 #[cfg(feature = "gql")]
1234 #[test]
1235 fn test_params_with_in_list() {
1236 let store = Arc::new(LpgStore::new().unwrap());
1238 store.create_node_with_props(&["Item"], [("status", Value::String("active".into()))]);
1239 store.create_node_with_props(&["Item"], [("status", Value::String("pending".into()))]);
1240 store.create_node_with_props(&["Item"], [("status", Value::String("deleted".into()))]);
1241
1242 let processor = QueryProcessor::for_lpg(store);
1243
1244 let mut params = HashMap::new();
1246 params.insert("target".to_string(), Value::String("active".into()));
1247
1248 let result = processor
1249 .process(
1250 "MATCH (n:Item) WHERE n.status = $target RETURN n",
1251 QueryLanguage::Gql,
1252 Some(¶ms),
1253 )
1254 .unwrap();
1255
1256 assert_eq!(result.row_count(), 1);
1257 }
1258
1259 #[cfg(feature = "gql")]
1260 #[test]
1261 fn test_params_same_type_comparison() {
1262 let store = Arc::new(LpgStore::new().unwrap());
1264 store.create_node_with_props(&["Data"], [("value", Value::Int64(100))]);
1265 store.create_node_with_props(&["Data"], [("value", Value::Int64(50))]);
1266
1267 let processor = QueryProcessor::for_lpg(store);
1268
1269 let mut params = HashMap::new();
1271 params.insert("threshold".to_string(), Value::Int64(75));
1272
1273 let result = processor
1274 .process(
1275 "MATCH (n:Data) WHERE n.value > $threshold RETURN n",
1276 QueryLanguage::Gql,
1277 Some(¶ms),
1278 )
1279 .unwrap();
1280
1281 assert_eq!(result.row_count(), 1);
1283 }
1284
1285 #[cfg(feature = "gql")]
1286 #[test]
1287 fn test_process_empty_result_has_columns() {
1288 let store = Arc::new(LpgStore::new().unwrap());
1290 let processor = QueryProcessor::for_lpg(store);
1293 let result = processor
1294 .process(
1295 "MATCH (n:Person) RETURN n.name AS name, n.age AS age",
1296 QueryLanguage::Gql,
1297 None,
1298 )
1299 .unwrap();
1300
1301 assert_eq!(result.row_count(), 0);
1302 assert_eq!(result.columns.len(), 2);
1303 assert_eq!(result.columns[0], "name");
1304 assert_eq!(result.columns[1], "age");
1305 }
1306
1307 #[cfg(feature = "gql")]
1308 #[test]
1309 fn test_params_string_equality() {
1310 let store = Arc::new(LpgStore::new().unwrap());
1312 store.create_node_with_props(&["Item"], [("name", Value::String("alpha".into()))]);
1313 store.create_node_with_props(&["Item"], [("name", Value::String("beta".into()))]);
1314 store.create_node_with_props(&["Item"], [("name", Value::String("gamma".into()))]);
1315
1316 let processor = QueryProcessor::for_lpg(store);
1317
1318 let mut params = HashMap::new();
1319 params.insert("target".to_string(), Value::String("beta".into()));
1320
1321 let result = processor
1322 .process(
1323 "MATCH (n:Item) WHERE n.name = $target RETURN n.name",
1324 QueryLanguage::Gql,
1325 Some(¶ms),
1326 )
1327 .unwrap();
1328
1329 assert_eq!(result.row_count(), 1);
1330 assert_eq!(result.rows[0][0], Value::String("beta".into()));
1331 }
1332}