1use std::collections::HashMap;
10use std::sync::Arc;
11
12use grafeo_common::types::{EpochId, TxId, Value};
13use grafeo_common::utils::error::{Error, Result};
14use grafeo_core::graph::lpg::LpgStore;
15
16use crate::catalog::Catalog;
17use crate::database::QueryResult;
18use crate::query::binder::Binder;
19use crate::query::executor::Executor;
20use crate::query::optimizer::Optimizer;
21use crate::query::plan::{LogicalExpression, LogicalOperator, LogicalPlan};
22use crate::query::planner::Planner;
23use crate::transaction::TransactionManager;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum QueryLanguage {
28 #[cfg(feature = "gql")]
30 Gql,
31 #[cfg(feature = "cypher")]
33 Cypher,
34 #[cfg(feature = "gremlin")]
36 Gremlin,
37 #[cfg(feature = "graphql")]
39 GraphQL,
40 #[cfg(feature = "sparql")]
42 Sparql,
43 #[cfg(all(feature = "graphql", feature = "rdf"))]
45 GraphQLRdf,
46}
47
48impl QueryLanguage {
49 #[must_use]
51 pub const fn is_lpg(&self) -> bool {
52 match self {
53 #[cfg(feature = "gql")]
54 Self::Gql => true,
55 #[cfg(feature = "cypher")]
56 Self::Cypher => true,
57 #[cfg(feature = "gremlin")]
58 Self::Gremlin => true,
59 #[cfg(feature = "graphql")]
60 Self::GraphQL => true,
61 #[cfg(feature = "sparql")]
62 Self::Sparql => false,
63 #[cfg(all(feature = "graphql", feature = "rdf"))]
64 Self::GraphQLRdf => false,
65 }
66 }
67}
68
69pub type QueryParams = HashMap<String, Value>;
71
72pub struct QueryProcessor {
86 lpg_store: Arc<LpgStore>,
88 tx_manager: Arc<TransactionManager>,
90 catalog: Arc<Catalog>,
92 optimizer: Optimizer,
94 tx_context: Option<(EpochId, TxId)>,
96 #[cfg(feature = "rdf")]
98 rdf_store: Option<Arc<grafeo_core::graph::rdf::RdfStore>>,
99}
100
101impl QueryProcessor {
102 #[must_use]
104 pub fn for_lpg(store: Arc<LpgStore>) -> Self {
105 Self {
106 lpg_store: store,
107 tx_manager: Arc::new(TransactionManager::new()),
108 catalog: Arc::new(Catalog::new()),
109 optimizer: Optimizer::new(),
110 tx_context: None,
111 #[cfg(feature = "rdf")]
112 rdf_store: None,
113 }
114 }
115
116 #[must_use]
118 pub fn for_lpg_with_tx(store: Arc<LpgStore>, tx_manager: Arc<TransactionManager>) -> Self {
119 Self {
120 lpg_store: store,
121 tx_manager,
122 catalog: Arc::new(Catalog::new()),
123 optimizer: Optimizer::new(),
124 tx_context: None,
125 #[cfg(feature = "rdf")]
126 rdf_store: None,
127 }
128 }
129
130 #[cfg(feature = "rdf")]
132 #[must_use]
133 pub fn with_rdf(
134 lpg_store: Arc<LpgStore>,
135 rdf_store: Arc<grafeo_core::graph::rdf::RdfStore>,
136 ) -> Self {
137 Self {
138 lpg_store,
139 tx_manager: Arc::new(TransactionManager::new()),
140 catalog: Arc::new(Catalog::new()),
141 optimizer: Optimizer::new(),
142 tx_context: None,
143 rdf_store: Some(rdf_store),
144 }
145 }
146
147 #[must_use]
151 pub fn with_tx_context(mut self, viewing_epoch: EpochId, tx_id: TxId) -> Self {
152 self.tx_context = Some((viewing_epoch, tx_id));
153 self
154 }
155
156 #[must_use]
158 pub fn with_catalog(mut self, catalog: Arc<Catalog>) -> Self {
159 self.catalog = catalog;
160 self
161 }
162
163 #[must_use]
165 pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
166 self.optimizer = optimizer;
167 self
168 }
169
170 pub fn process(
190 &self,
191 query: &str,
192 language: QueryLanguage,
193 params: Option<&QueryParams>,
194 ) -> Result<QueryResult> {
195 if language.is_lpg() {
196 self.process_lpg(query, language, params)
197 } else {
198 #[cfg(feature = "rdf")]
199 {
200 self.process_rdf(query, language, params)
201 }
202 #[cfg(not(feature = "rdf"))]
203 {
204 Err(Error::Internal(
205 "RDF support not enabled. Compile with --features rdf".to_string(),
206 ))
207 }
208 }
209 }
210
211 fn process_lpg(
213 &self,
214 query: &str,
215 language: QueryLanguage,
216 params: Option<&QueryParams>,
217 ) -> Result<QueryResult> {
218 let mut logical_plan = self.translate_lpg(query, language)?;
220
221 if let Some(params) = params {
223 substitute_params(&mut logical_plan, params)?;
224 }
225
226 let mut binder = Binder::new();
228 let _binding_context = binder.bind(&logical_plan)?;
229
230 let optimized_plan = self.optimizer.optimize(logical_plan)?;
232
233 let planner = if let Some((epoch, tx_id)) = self.tx_context {
235 Planner::with_context(
236 Arc::clone(&self.lpg_store),
237 Arc::clone(&self.tx_manager),
238 Some(tx_id),
239 epoch,
240 )
241 } else {
242 Planner::with_context(
243 Arc::clone(&self.lpg_store),
244 Arc::clone(&self.tx_manager),
245 None,
246 self.tx_manager.current_epoch(),
247 )
248 };
249 let mut physical_plan = planner.plan(&optimized_plan)?;
250
251 let executor = Executor::with_columns(physical_plan.columns.clone());
253 executor.execute(physical_plan.operator.as_mut())
254 }
255
256 fn translate_lpg(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
258 match language {
259 #[cfg(feature = "gql")]
260 QueryLanguage::Gql => {
261 use crate::query::gql_translator;
262 gql_translator::translate(query)
263 }
264 #[cfg(feature = "cypher")]
265 QueryLanguage::Cypher => {
266 use crate::query::cypher_translator;
267 cypher_translator::translate(query)
268 }
269 #[cfg(feature = "gremlin")]
270 QueryLanguage::Gremlin => {
271 use crate::query::gremlin_translator;
272 gremlin_translator::translate(query)
273 }
274 #[cfg(feature = "graphql")]
275 QueryLanguage::GraphQL => {
276 use crate::query::graphql_translator;
277 graphql_translator::translate(query)
278 }
279 #[allow(unreachable_patterns)]
280 _ => Err(Error::Internal(format!(
281 "Language {:?} is not an LPG language",
282 language
283 ))),
284 }
285 }
286
287 #[cfg(feature = "rdf")]
289 fn process_rdf(
290 &self,
291 query: &str,
292 language: QueryLanguage,
293 _params: Option<&QueryParams>,
294 ) -> Result<QueryResult> {
295 use crate::query::planner_rdf::RdfPlanner;
296
297 let rdf_store = self.rdf_store.as_ref().ok_or_else(|| {
298 Error::Internal("RDF store not configured for this processor".to_string())
299 })?;
300
301 let logical_plan = self.translate_rdf(query, language)?;
303
304 let mut binder = Binder::new();
306 let _binding_context = binder.bind(&logical_plan)?;
307
308 let optimized_plan = self.optimizer.optimize(logical_plan)?;
310
311 let planner = RdfPlanner::new(Arc::clone(rdf_store));
313 let mut physical_plan = planner.plan(&optimized_plan)?;
314
315 let executor = Executor::with_columns(physical_plan.columns.clone());
317 executor.execute(physical_plan.operator.as_mut())
318 }
319
320 #[cfg(feature = "rdf")]
322 fn translate_rdf(&self, query: &str, language: QueryLanguage) -> Result<LogicalPlan> {
323 match language {
324 #[cfg(feature = "sparql")]
325 QueryLanguage::Sparql => {
326 use crate::query::sparql_translator;
327 sparql_translator::translate(query)
328 }
329 #[cfg(all(feature = "graphql", feature = "rdf"))]
330 QueryLanguage::GraphQLRdf => {
331 use crate::query::graphql_rdf_translator;
332 graphql_rdf_translator::translate(query, "http://example.org/")
334 }
335 _ => Err(Error::Internal(format!(
336 "Language {:?} is not an RDF language",
337 language
338 ))),
339 }
340 }
341
342 #[must_use]
344 pub fn lpg_store(&self) -> &Arc<LpgStore> {
345 &self.lpg_store
346 }
347
348 #[must_use]
350 pub fn catalog(&self) -> &Arc<Catalog> {
351 &self.catalog
352 }
353
354 #[must_use]
356 pub fn optimizer(&self) -> &Optimizer {
357 &self.optimizer
358 }
359
360 #[cfg(feature = "rdf")]
362 #[must_use]
363 pub fn rdf_store(&self) -> Option<&Arc<grafeo_core::graph::rdf::RdfStore>> {
364 self.rdf_store.as_ref()
365 }
366}
367
368impl QueryProcessor {
370 #[must_use]
375 #[deprecated(since = "0.1.0", note = "Use QueryProcessor::for_lpg() instead")]
376 pub fn new() -> Self {
377 Self::for_lpg(Arc::new(LpgStore::new()))
378 }
379
380 #[cfg(feature = "gql")]
386 #[deprecated(since = "0.1.0", note = "Use process() with explicit language")]
387 pub fn process_legacy(&self, query: &str) -> Result<QueryResult> {
388 self.process(query, QueryLanguage::Gql, None)
389 }
390
391 #[must_use]
393 pub fn tx_manager(&self) -> &Arc<TransactionManager> {
394 &self.tx_manager
395 }
396}
397
398impl Default for QueryProcessor {
399 fn default() -> Self {
400 Self::for_lpg(Arc::new(LpgStore::new()))
401 }
402}
403
404fn substitute_params(plan: &mut LogicalPlan, params: &QueryParams) -> Result<()> {
406 substitute_in_operator(&mut plan.root, params)
407}
408
409fn substitute_in_operator(op: &mut LogicalOperator, params: &QueryParams) -> Result<()> {
411 use crate::query::plan::*;
412
413 match op {
414 LogicalOperator::Filter(filter) => {
415 substitute_in_expression(&mut filter.predicate, params)?;
416 substitute_in_operator(&mut filter.input, params)?;
417 }
418 LogicalOperator::Return(ret) => {
419 for item in &mut ret.items {
420 substitute_in_expression(&mut item.expression, params)?;
421 }
422 substitute_in_operator(&mut ret.input, params)?;
423 }
424 LogicalOperator::Project(proj) => {
425 for p in &mut proj.projections {
426 substitute_in_expression(&mut p.expression, params)?;
427 }
428 substitute_in_operator(&mut proj.input, params)?;
429 }
430 LogicalOperator::NodeScan(scan) => {
431 if let Some(input) = &mut scan.input {
432 substitute_in_operator(input, params)?;
433 }
434 }
435 LogicalOperator::EdgeScan(scan) => {
436 if let Some(input) = &mut scan.input {
437 substitute_in_operator(input, params)?;
438 }
439 }
440 LogicalOperator::Expand(expand) => {
441 substitute_in_operator(&mut expand.input, params)?;
442 }
443 LogicalOperator::Join(join) => {
444 substitute_in_operator(&mut join.left, params)?;
445 substitute_in_operator(&mut join.right, params)?;
446 for cond in &mut join.conditions {
447 substitute_in_expression(&mut cond.left, params)?;
448 substitute_in_expression(&mut cond.right, params)?;
449 }
450 }
451 LogicalOperator::LeftJoin(join) => {
452 substitute_in_operator(&mut join.left, params)?;
453 substitute_in_operator(&mut join.right, params)?;
454 if let Some(cond) = &mut join.condition {
455 substitute_in_expression(cond, params)?;
456 }
457 }
458 LogicalOperator::Aggregate(agg) => {
459 for expr in &mut agg.group_by {
460 substitute_in_expression(expr, params)?;
461 }
462 for agg_expr in &mut agg.aggregates {
463 if let Some(expr) = &mut agg_expr.expression {
464 substitute_in_expression(expr, params)?;
465 }
466 }
467 substitute_in_operator(&mut agg.input, params)?;
468 }
469 LogicalOperator::Sort(sort) => {
470 for key in &mut sort.keys {
471 substitute_in_expression(&mut key.expression, params)?;
472 }
473 substitute_in_operator(&mut sort.input, params)?;
474 }
475 LogicalOperator::Limit(limit) => {
476 substitute_in_operator(&mut limit.input, params)?;
477 }
478 LogicalOperator::Skip(skip) => {
479 substitute_in_operator(&mut skip.input, params)?;
480 }
481 LogicalOperator::Distinct(distinct) => {
482 substitute_in_operator(&mut distinct.input, params)?;
483 }
484 LogicalOperator::CreateNode(create) => {
485 for (_, expr) in &mut create.properties {
486 substitute_in_expression(expr, params)?;
487 }
488 if let Some(input) = &mut create.input {
489 substitute_in_operator(input, params)?;
490 }
491 }
492 LogicalOperator::CreateEdge(create) => {
493 for (_, expr) in &mut create.properties {
494 substitute_in_expression(expr, params)?;
495 }
496 substitute_in_operator(&mut create.input, params)?;
497 }
498 LogicalOperator::DeleteNode(delete) => {
499 substitute_in_operator(&mut delete.input, params)?;
500 }
501 LogicalOperator::DeleteEdge(delete) => {
502 substitute_in_operator(&mut delete.input, params)?;
503 }
504 LogicalOperator::SetProperty(set) => {
505 for (_, expr) in &mut set.properties {
506 substitute_in_expression(expr, params)?;
507 }
508 substitute_in_operator(&mut set.input, params)?;
509 }
510 LogicalOperator::Union(union) => {
511 for input in &mut union.inputs {
512 substitute_in_operator(input, params)?;
513 }
514 }
515 LogicalOperator::AntiJoin(anti) => {
516 substitute_in_operator(&mut anti.left, params)?;
517 substitute_in_operator(&mut anti.right, params)?;
518 }
519 LogicalOperator::Bind(bind) => {
520 substitute_in_expression(&mut bind.expression, params)?;
521 substitute_in_operator(&mut bind.input, params)?;
522 }
523 LogicalOperator::TripleScan(scan) => {
524 if let Some(input) = &mut scan.input {
525 substitute_in_operator(input, params)?;
526 }
527 }
528 LogicalOperator::Unwind(unwind) => {
529 substitute_in_expression(&mut unwind.expression, params)?;
530 substitute_in_operator(&mut unwind.input, params)?;
531 }
532 LogicalOperator::Merge(merge) => {
533 for (_, expr) in &mut merge.match_properties {
534 substitute_in_expression(expr, params)?;
535 }
536 for (_, expr) in &mut merge.on_create {
537 substitute_in_expression(expr, params)?;
538 }
539 for (_, expr) in &mut merge.on_match {
540 substitute_in_expression(expr, params)?;
541 }
542 substitute_in_operator(&mut merge.input, params)?;
543 }
544 LogicalOperator::AddLabel(add_label) => {
545 substitute_in_operator(&mut add_label.input, params)?;
546 }
547 LogicalOperator::RemoveLabel(remove_label) => {
548 substitute_in_operator(&mut remove_label.input, params)?;
549 }
550 LogicalOperator::Empty => {}
551 }
552 Ok(())
553}
554
555fn substitute_in_expression(expr: &mut LogicalExpression, params: &QueryParams) -> Result<()> {
557 use crate::query::plan::LogicalExpression;
558
559 match expr {
560 LogicalExpression::Parameter(name) => {
561 if let Some(value) = params.get(name) {
562 *expr = LogicalExpression::Literal(value.clone());
563 } else {
564 return Err(Error::Internal(format!("Missing parameter: ${}", name)));
565 }
566 }
567 LogicalExpression::Binary { left, right, .. } => {
568 substitute_in_expression(left, params)?;
569 substitute_in_expression(right, params)?;
570 }
571 LogicalExpression::Unary { operand, .. } => {
572 substitute_in_expression(operand, params)?;
573 }
574 LogicalExpression::FunctionCall { args, .. } => {
575 for arg in args {
576 substitute_in_expression(arg, params)?;
577 }
578 }
579 LogicalExpression::List(items) => {
580 for item in items {
581 substitute_in_expression(item, params)?;
582 }
583 }
584 LogicalExpression::Map(pairs) => {
585 for (_, value) in pairs {
586 substitute_in_expression(value, params)?;
587 }
588 }
589 LogicalExpression::IndexAccess { base, index } => {
590 substitute_in_expression(base, params)?;
591 substitute_in_expression(index, params)?;
592 }
593 LogicalExpression::SliceAccess { base, start, end } => {
594 substitute_in_expression(base, params)?;
595 if let Some(s) = start {
596 substitute_in_expression(s, params)?;
597 }
598 if let Some(e) = end {
599 substitute_in_expression(e, params)?;
600 }
601 }
602 LogicalExpression::Case {
603 operand,
604 when_clauses,
605 else_clause,
606 } => {
607 if let Some(op) = operand {
608 substitute_in_expression(op, params)?;
609 }
610 for (cond, result) in when_clauses {
611 substitute_in_expression(cond, params)?;
612 substitute_in_expression(result, params)?;
613 }
614 if let Some(el) = else_clause {
615 substitute_in_expression(el, params)?;
616 }
617 }
618 LogicalExpression::Property { .. }
619 | LogicalExpression::Variable(_)
620 | LogicalExpression::Literal(_)
621 | LogicalExpression::Labels(_)
622 | LogicalExpression::Type(_)
623 | LogicalExpression::Id(_) => {}
624 LogicalExpression::ListComprehension {
625 list_expr,
626 filter_expr,
627 map_expr,
628 ..
629 } => {
630 substitute_in_expression(list_expr, params)?;
631 if let Some(filter) = filter_expr {
632 substitute_in_expression(filter, params)?;
633 }
634 substitute_in_expression(map_expr, params)?;
635 }
636 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
637 }
639 }
640 Ok(())
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn test_query_language_is_lpg() {
649 #[cfg(feature = "gql")]
650 assert!(QueryLanguage::Gql.is_lpg());
651 #[cfg(feature = "cypher")]
652 assert!(QueryLanguage::Cypher.is_lpg());
653 #[cfg(feature = "sparql")]
654 assert!(!QueryLanguage::Sparql.is_lpg());
655 }
656
657 #[test]
658 fn test_processor_creation() {
659 let store = Arc::new(LpgStore::new());
660 let processor = QueryProcessor::for_lpg(store);
661 assert!(processor.lpg_store().node_count() == 0);
662 }
663
664 #[cfg(feature = "gql")]
665 #[test]
666 fn test_process_simple_gql() {
667 let store = Arc::new(LpgStore::new());
668 store.create_node(&["Person"]);
669 store.create_node(&["Person"]);
670
671 let processor = QueryProcessor::for_lpg(store);
672 let result = processor
673 .process("MATCH (n:Person) RETURN n", QueryLanguage::Gql, None)
674 .unwrap();
675
676 assert_eq!(result.row_count(), 2);
677 assert_eq!(result.columns[0], "n");
678 }
679
680 #[cfg(feature = "cypher")]
681 #[test]
682 fn test_process_simple_cypher() {
683 let store = Arc::new(LpgStore::new());
684 store.create_node(&["Person"]);
685
686 let processor = QueryProcessor::for_lpg(store);
687 let result = processor
688 .process("MATCH (n:Person) RETURN n", QueryLanguage::Cypher, None)
689 .unwrap();
690
691 assert_eq!(result.row_count(), 1);
692 }
693
694 #[cfg(feature = "gql")]
695 #[test]
696 fn test_process_with_params() {
697 let store = Arc::new(LpgStore::new());
698 store.create_node_with_props(&["Person"], [("age", Value::Int64(25))]);
699 store.create_node_with_props(&["Person"], [("age", Value::Int64(35))]);
700 store.create_node_with_props(&["Person"], [("age", Value::Int64(45))]);
701
702 let processor = QueryProcessor::for_lpg(store);
703
704 let mut params = HashMap::new();
706 params.insert("min_age".to_string(), Value::Int64(30));
707
708 let result = processor
709 .process(
710 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
711 QueryLanguage::Gql,
712 Some(¶ms),
713 )
714 .unwrap();
715
716 assert_eq!(result.row_count(), 2);
718 }
719
720 #[cfg(feature = "gql")]
721 #[test]
722 fn test_missing_param_error() {
723 let store = Arc::new(LpgStore::new());
724 store.create_node(&["Person"]);
725
726 let processor = QueryProcessor::for_lpg(store);
727
728 let params: HashMap<String, Value> = HashMap::new();
730 let result = processor.process(
731 "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
732 QueryLanguage::Gql,
733 Some(¶ms),
734 );
735
736 assert!(result.is_err());
738 let err = result.unwrap_err();
739 assert!(
740 err.to_string().contains("Missing parameter"),
741 "Expected 'Missing parameter' error, got: {}",
742 err
743 );
744 }
745}