1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28 #[cfg(feature = "gql")]
30 Gql,
31 #[cfg(feature = "cypher")]
33 Cypher,
34 #[cfg(feature = "gremlin")]
36 Gremlin,
37 #[cfg(feature = "graphql")]
39 GraphQL,
40 #[cfg(feature = "sparql")]
42 Sparql,
43 #[cfg(all(feature = "graphql", feature = "rdf"))]
45 GraphQLRdf,
46}
47
48impl QueryLanguage {
49 #[must_use]
51 pub const fn is_lpg(&self) -> bool {
52 match self {
53 #[cfg(feature = "gql")]
54 Self::Gql => true,
55 #[cfg(feature = "cypher")]
56 Self::Cypher => true,
57 #[cfg(feature = "gremlin")]
58 Self::Gremlin => true,
59 #[cfg(feature = "graphql")]
60 Self::GraphQL => true,
61 #[cfg(feature = "sparql")]
62 Self::Sparql => false,
63 #[cfg(all(feature = "graphql", feature = "rdf"))]
64 Self::GraphQLRdf => false,
65 }
66 }
67}
68
69pub type QueryParams = HashMap<String, Value>;
71
72pub struct QueryProcessor {
86 lpg_store: Arc<LpgStore>,
88 tx_manager: Arc<TransactionManager>,
90 catalog: Arc<Catalog>,
92 optimizer: Optimizer,
94 tx_context: Option<(EpochId, TxId)>,
96 #[cfg(feature = "rdf")]
98 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
99}
100
101impl QueryProcessor {
102 #[must_use]
104 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
105 Self {
106 lpg_store: store,
107 tx_manager: Arc::new(TransactionManager::new()),
108 catalog: Arc::new(Catalog::new()),
109 optimizer: Optimizer::new(),
110 tx_context: None,
111 #[cfg(feature = "rdf")]
112 rdf_store: None,
113 }
114 }
115
116 #[must_use]
118 pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
119 Self {
120 lpg_store: store,
121 tx_manager,
122 catalog: Arc::new(Catalog::new()),
123 optimizer: Optimizer::new(),
124 tx_context: None,
125 #[cfg(feature = "rdf")]
126 rdf_store: None,
127 }
128 }
129
130 #[cfg(feature = "rdf")]
132 #[must_use]
133 pub fn with_rdf(
134 lpg_store: Arc<LpgStore>,
135 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
136 ) -> Self {
137 Self {
138 lpg_store,
139 tx_manager: Arc::new(TransactionManager::new()),
140 catalog: Arc::new(Catalog::new()),
141 optimizer: Optimizer::new(),
142 tx_context: None,
143 rdf_store: Some(rdf_store),
144 }
145 }
146
147 #[must_use]
151 pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
152 self.tx_context = Some((viewing_epoch, tx_id));
153 self
154 }
155
156 #[must_use]
158 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
159 self.catalog = catalog;
160 self
161 }
162
163 #[must_use]
165 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
166 self.optimizer = optimizer;
167 self
168 }
169
170 pub fn process(
190 &self,
191 query: &str,
192 language: QueryLanguage,
193 params: Option<&QueryParams>,
194 ) -> Result<QueryResult> {
195 if language.is_lpg() {
196 self.process_lpg(query, language, params)
197 } else {
198 #[cfg(feature = "rdf")]
199 {
200 self.process_rdf(query, language, params)
201 }
202 #[cfg(not(feature = "rdf"))]
203 {
204 Err(Error::Internal(
205 "RDF support not enabled. Compile with --features rdf".to_string(),
206 ))
207 }
208 }
209 }
210
211 fn process_lpg(
213 &self,
214 query: &str,
215 language: QueryLanguage,
216 params: Option<&QueryParams>,
217 ) -> Result<QueryResult> {
218 let mut logical_plan = self.translate_lpg(query, language)?;
220
221 if let Some(params) = params {
223 substitute_params(&mut logical_plan, params)?;
224 }
225
226 let mut binder = Binder::new();
228 let _binding_context = binder.bind(&logical_plan)?;
229
230 let optimized_plan = self.optimizer.optimize(logical_plan)?;
232
233 let planner = if let Some((epoch, tx_id)) = self.tx_context {
235 Planner::with_context(
236 Arc::clone(&self.lpg_store),
237 Arc::clone(&self.tx_manager),
238 Some(tx_id),
239 epoch,
240 )
241 } else {
242 Planner::with_context(
243 Arc::clone(&self.lpg_store),
244 Arc::clone(&self.tx_manager),
245 None,
246 self.tx_manager.current_epoch(),
247 )
248 };
249 let mut physical_plan = planner.plan(&optimized_plan)?;
250
251 let executor = Executor::with_columns(physical_plan.columns.clone());
253 executor.execute(physical_plan.operator.as_mut())
254 }
255
256 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
258 match language {
259 #[cfg(feature = "gql")]
260 QueryLanguage::Gql => {
261 use crate::query::gql_translator;
262 gql_translator::translate(query)
263 }
264 #[cfg(feature = "cypher")]
265 QueryLanguage::Cypher => {
266 use crate::query::cypher_translator;
267 cypher_translator::translate(query)
268 }
269 #[cfg(feature = "gremlin")]
270 QueryLanguage::Gremlin => {
271 use crate::query::gremlin_translator;
272 gremlin_translator::translate(query)
273 }
274 #[cfg(feature = "graphql")]
275 QueryLanguage::GraphQL => {
276 use crate::query::graphql_translator;
277 graphql_translator::translate(query)
278 }
279 #[allow(unreachable_patterns)]
280 _ => Err(Error::Internal(format!(
281 "Language {:?} is not an LPG language",
282 language
283 ))),
284 }
285 }
286
287 #[cfg(feature = "rdf")]
289 fn process_rdf(
290 &self,
291 query: &str,
292 language: QueryLanguage,
293 _params: Option<&QueryParams>,
294 ) -> Result<QueryResult> {
295 use crate::query::planner_rdf::RdfPlanner;
296
297 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
298 Error::Internal("RDF store not configured for this processor".to_string())
299 })?;
300
301 let logical_plan = self.translate_rdf(query, language)?;
303
304 let mut binder = Binder::new();
306 let _binding_context = binder.bind(&logical_plan)?;
307
308 let optimized_plan = self.optimizer.optimize(logical_plan)?;
310
311 let planner = RdfPlanner::new(Arc::clone(rdf_store));
313 let mut physical_plan = planner.plan(&optimized_plan)?;
314
315 let executor = Executor::with_columns(physical_plan.columns.clone());
317 executor.execute(physical_plan.operator.as_mut())
318 }
319
320 #[cfg(feature = "rdf")]
322 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
323 match language {
324 #[cfg(feature = "sparql")]
325 QueryLanguage::Sparql => {
326 use crate::query::sparql_translator;
327 sparql_translator::translate(query)
328 }
329 #[cfg(all(feature = "graphql", feature = "rdf"))]
330 QueryLanguage::GraphQLRdf => {
331 use crate::query::graphql_rdf_translator;
332 graphql_rdf_translator::translate(query, "http://example.org/")
334 }
335 _ => Err(Error::Internal(format!(
336 "Language {:?} is not an RDF language",
337 language
338 ))),
339 }
340 }
341
342 #[must_use]
344 pub fn lpg_store(&self) -> &Arc<LpgStore> {
345 &self.lpg_store
346 }
347
348 #[must_use]
350 pub fn catalog(&self) -> &Arc<Catalog> {
351 &self.catalog
352 }
353
354 #[must_use]
356 pub fn optimizer(&self) -> &Optimizer {
357 &self.optimizer
358 }
359
360 #[cfg(feature = "rdf")]
362 #[must_use]
363 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
364 self.rdf_store.as_ref()
365 }
366}
367
368impl QueryProcessor {
370 #[must_use]
375 #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
376 pub fn new() -> Self {
377 Self::for_lpg(Arc::new(LpgStore::new()))
378 }
379
380 #[cfg(feature = "gql")]
386 #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
387 pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
388 self.process(query, QueryLanguage::Gql, None)
389 }
390
391 #[must_use]
393 pub fn tx_manager(&self) -> &Arc<TransactionManager> {
394 &self.tx_manager
395 }
396}
397
398impl Default for QueryProcessor {
399 fn default() -> Self {
400 Self::for_lpg(Arc::new(LpgStore::new()))
401 }
402}
403
404fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
406 substitute_in_operator(&mut plan.root, params)
407}
408
409fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
411 use crate::query::plan::*;
412
413 match op {
414 LogicalOperator::Filter(filter) => {
415 substitute_in_expression(&mut filter.predicate, params)?;
416 substitute_in_operator(&mut filter.input, params)?;
417 }
418 LogicalOperator::Return(ret) => {
419 for item in &mut ret.items {
420 substitute_in_expression(&mut item.expression, params)?;
421 }
422 substitute_in_operator(&mut ret.input, params)?;
423 }
424 LogicalOperator::Project(proj) => {
425 for p in &mut proj.projections {
426 substitute_in_expression(&mut p.expression, params)?;
427 }
428 substitute_in_operator(&mut proj.input, params)?;
429 }
430 LogicalOperator::NodeScan(scan) => {
431 if let Some(input) = &mut scan.input {
432 substitute_in_operator(input, params)?;
433 }
434 }
435 LogicalOperator::EdgeScan(scan) => {
436 if let Some(input) = &mut scan.input {
437 substitute_in_operator(input, params)?;
438 }
439 }
440 LogicalOperator::Expand(expand) => {
441 substitute_in_operator(&mut expand.input, params)?;
442 }
443 LogicalOperator::Join(join) => {
444 substitute_in_operator(&mut join.left, params)?;
445 substitute_in_operator(&mut join.right, params)?;
446 for cond in &mut join.conditions {
447 substitute_in_expression(&mut cond.left, params)?;
448 substitute_in_expression(&mut cond.right, params)?;
449 }
450 }
451 LogicalOperator::LeftJoin(join) => {
452 substitute_in_operator(&mut join.left, params)?;
453 substitute_in_operator(&mut join.right, params)?;
454 if let Some(cond) = &mut join.condition {
455 substitute_in_expression(cond, params)?;
456 }
457 }
458 LogicalOperator::Aggregate(agg) => {
459 for expr in &mut agg.group_by {
460 substitute_in_expression(expr, params)?;
461 }
462 for agg_expr in &mut agg.aggregates {
463 if let Some(expr) = &mut agg_expr.expression {
464 substitute_in_expression(expr, params)?;
465 }
466 }
467 substitute_in_operator(&mut agg.input, params)?;
468 }
469 LogicalOperator::Sort(sort) => {
470 for key in &mut sort.keys {
471 substitute_in_expression(&mut key.expression, params)?;
472 }
473 substitute_in_operator(&mut sort.input, params)?;
474 }
475 LogicalOperator::Limit(limit) => {
476 substitute_in_operator(&mut limit.input, params)?;
477 }
478 LogicalOperator::Skip(skip) => {
479 substitute_in_operator(&mut skip.input, params)?;
480 }
481 LogicalOperator::Distinct(distinct) => {
482 substitute_in_operator(&mut distinct.input, params)?;
483 }
484 LogicalOperator::CreateNode(create) => {
485 for (_, expr) in &mut create.properties {
486 substitute_in_expression(expr, params)?;
487 }
488 if let Some(input) = &mut create.input {
489 substitute_in_operator(input, params)?;
490 }
491 }
492 LogicalOperator::CreateEdge(create) => {
493 for (_, expr) in &mut create.properties {
494 substitute_in_expression(expr, params)?;
495 }
496 substitute_in_operator(&mut create.input, params)?;
497 }
498 LogicalOperator::DeleteNode(delete) => {
499 substitute_in_operator(&mut delete.input, params)?;
500 }
501 LogicalOperator::DeleteEdge(delete) => {
502 substitute_in_operator(&mut delete.input, params)?;
503 }
504 LogicalOperator::SetProperty(set) => {
505 for (_, expr) in &mut set.properties {
506 substitute_in_expression(expr, params)?;
507 }
508 substitute_in_operator(&mut set.input, params)?;
509 }
510 LogicalOperator::Union(union) => {
511 for input in &mut union.inputs {
512 substitute_in_operator(input, params)?;
513 }
514 }
515 LogicalOperator::AntiJoin(anti) => {
516 substitute_in_operator(&mut anti.left, params)?;
517 substitute_in_operator(&mut anti.right, params)?;
518 }
519 LogicalOperator::Bind(bind) => {
520 substitute_in_expression(&mut bind.expression, params)?;
521 substitute_in_operator(&mut bind.input, params)?;
522 }
523 LogicalOperator::TripleScan(scan) => {
524 if let Some(input) = &mut scan.input {
525 substitute_in_operator(input, params)?;
526 }
527 }
528 LogicalOperator::Unwind(unwind) => {
529 substitute_in_expression(&mut unwind.expression, params)?;
530 substitute_in_operator(&mut unwind.input, params)?;
531 }
532 LogicalOperator::Merge(merge) => {
533 for (_, expr) in &mut merge.match_properties {
534 substitute_in_expression(expr, params)?;
535 }
536 for (_, expr) in &mut merge.on_create {
537 substitute_in_expression(expr, params)?;
538 }
539 for (_, expr) in &mut merge.on_match {
540 substitute_in_expression(expr, params)?;
541 }
542 substitute_in_operator(&mut merge.input, params)?;
543 }
544 LogicalOperator::AddLabel(add_label) => {
545 substitute_in_operator(&mut add_label.input, params)?;
546 }
547 LogicalOperator::RemoveLabel(remove_label) => {
548 substitute_in_operator(&mut remove_label.input, params)?;
549 }
550 LogicalOperator::ShortestPath(sp) => {
551 substitute_in_operator(&mut sp.input, params)?;
552 }
553 LogicalOperator::InsertTriple(insert) => {
555 if let Some(ref mut input) = insert.input {
556 substitute_in_operator(input, params)?;
557 }
558 }
559 LogicalOperator::DeleteTriple(delete) => {
560 if let Some(ref mut input) = delete.input {
561 substitute_in_operator(input, params)?;
562 }
563 }
564 LogicalOperator::Modify(modify) => {
565 substitute_in_operator(&mut modify.where_clause, params)?;
566 }
567 LogicalOperator::ClearGraph(_)
568 | LogicalOperator::CreateGraph(_)
569 | LogicalOperator::DropGraph(_)
570 | LogicalOperator::LoadGraph(_)
571 | LogicalOperator::CopyGraph(_)
572 | LogicalOperator::MoveGraph(_)
573 | LogicalOperator::AddGraph(_) => {}
574 LogicalOperator::Empty => {}
575 }
576 Ok(())
577}
578
579fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
581 use crate::query::plan::LogicalExpression;
582
583 match expr {
584 LogicalExpression::Parameter(name) => {
585 if let Some(value) = params.get(name) {
586 *expr = LogicalExpression::Literal(value.clone());
587 } else {
588 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
589 }
590 }
591 LogicalExpression::Binary { left, right, .. } => {
592 substitute_in_expression(left, params)?;
593 substitute_in_expression(right, params)?;
594 }
595 LogicalExpression::Unary { operand, .. } => {
596 substitute_in_expression(operand, params)?;
597 }
598 LogicalExpression::FunctionCall { args, .. } => {
599 for arg in args {
600 substitute_in_expression(arg, params)?;
601 }
602 }
603 LogicalExpression::List(items) => {
604 for item in items {
605 substitute_in_expression(item, params)?;
606 }
607 }
608 LogicalExpression::Map(pairs) => {
609 for (_, value) in pairs {
610 substitute_in_expression(value, params)?;
611 }
612 }
613 LogicalExpression::IndexAccess { base, index } => {
614 substitute_in_expression(base, params)?;
615 substitute_in_expression(index, params)?;
616 }
617 LogicalExpression::SliceAccess { base, start, end } => {
618 substitute_in_expression(base, params)?;
619 if let Some(s) = start {
620 substitute_in_expression(s, params)?;
621 }
622 if let Some(e) = end {
623 substitute_in_expression(e, params)?;
624 }
625 }
626 LogicalExpression::Case {
627 operand,
628 when_clauses,
629 else_clause,
630 } => {
631 if let Some(op) = operand {
632 substitute_in_expression(op, params)?;
633 }
634 for (cond, result) in when_clauses {
635 substitute_in_expression(cond, params)?;
636 substitute_in_expression(result, params)?;
637 }
638 if let Some(el) = else_clause {
639 substitute_in_expression(el, params)?;
640 }
641 }
642 LogicalExpression::Property { .. }
643 | LogicalExpression::Variable(_)
644 | LogicalExpression::Literal(_)
645 | LogicalExpression::Labels(_)
646 | LogicalExpression::Type(_)
647 | LogicalExpression::Id(_) => {}
648 LogicalExpression::ListComprehension {
649 list_expr,
650 filter_expr,
651 map_expr,
652 ..
653 } => {
654 substitute_in_expression(list_expr, params)?;
655 if let Some(filter) = filter_expr {
656 substitute_in_expression(filter, params)?;
657 }
658 substitute_in_expression(map_expr, params)?;
659 }
660 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
661 }
663 }
664 Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670
671 #[test]
672 fn test_query_language_is_lpg() {
673 #[cfg(feature = "gql")]
674 assert!(QueryLanguage::Gql.is_lpg());
675 #[cfg(feature = "cypher")]
676 assert!(QueryLanguage::Cypher.is_lpg());
677 #[cfg(feature = "sparql")]
678 assert!(!QueryLanguage::Sparql.is_lpg());
679 }
680
681 #[test]
682 fn test_processor_creation() {
683 let store = Arc::new(LpgStore::new());
684 let processor = QueryProcessor::for_lpg(store);
685 assert!(processor.lpg_store().node_count() == 0);
686 }
687
688 #[cfg(feature = "gql")]
689 #[test]
690 fn test_process_simple_gql() {
691 let store = Arc::new(LpgStore::new());
692 store.create_node(&["Person"]);
693 store.create_node(&["Person"]);
694
695 let processor = QueryProcessor::for_lpg(store);
696 let result = processor
697 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
698 .unwrap();
699
700 assert_eq!(result.row_count(), 2);
701 assert_eq!(result.columns[0], "n");
702 }
703
704 #[cfg(feature = "cypher")]
705 #[test]
706 fn test_process_simple_cypher() {
707 let store = Arc::new(LpgStore::new());
708 store.create_node(&["Person"]);
709
710 let processor = QueryProcessor::for_lpg(store);
711 let result = processor
712 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
713 .unwrap();
714
715 assert_eq!(result.row_count(), 1);
716 }
717
718 #[cfg(feature = "gql")]
719 #[test]
720 fn test_process_with_params() {
721 let store = Arc::new(LpgStore::new());
722 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
723 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
724 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
725
726 let processor = QueryProcessor::for_lpg(store);
727
728 let mut params = HashMap::new();
730 params.insert("min_age".to_string(), Value::Int64(30));
731
732 let result = processor
733 .process(
734 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
735 QueryLanguage::Gql,
736 Some(¶ms),
737 )
738 .unwrap();
739
740 assert_eq!(result.row_count(), 2);
742 }
743
744 #[cfg(feature = "gql")]
745 #[test]
746 fn test_missing_param_error() {
747 let store = Arc::new(LpgStore::new());
748 store.create_node(&["Person"]);
749
750 let processor = QueryProcessor::for_lpg(store);
751
752 let params: HashMap<String, Value> = HashMap::new();
754 let result = processor.process(
755 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
756 QueryLanguage::Gql,
757 Some(¶ms),
758 );
759
760 assert!(result.is_err());
762 let err = result.unwrap_err();
763 assert!(
764 err.to_string().contains("Missing parameter"),
765 "Expected 'Missing parameter' error, got: {}",
766 err
767 );
768 }
769}