rdf_fusion_execution/sparql/
optimizer.rs1use crate::sparql::OptimizationLevel;
2use datafusion::optimizer::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
3use datafusion::optimizer::eliminate_limit::EliminateLimit;
4use datafusion::optimizer::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
5use datafusion::optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin;
6use datafusion::optimizer::{Optimizer, OptimizerRule};
7use datafusion::physical_optimizer::PhysicalOptimizerRule;
8use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
9use rdf_fusion_extensions::RdfFusionContextView;
10use rdf_fusion_logical::expr::SimplifySparqlExpressionsRule;
11use rdf_fusion_logical::extend::ExtendLoweringRule;
12use rdf_fusion_logical::join::SparqlJoinLoweringRule;
13use rdf_fusion_logical::minus::MinusLoweringRule;
14use rdf_fusion_logical::paths::PropertyPathLoweringRule;
15use rdf_fusion_logical::patterns::PatternLoweringRule;
16use rdf_fusion_physical::join::NestedLoopJoinProjectionPushDown;
17use std::sync::Arc;
18
19pub fn create_optimizer_rules(
21 context: RdfFusionContextView,
22 optimization_level: OptimizationLevel,
23) -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
24 let lowering_rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
25 Arc::new(MinusLoweringRule::new(context.clone())),
26 Arc::new(ExtendLoweringRule::new()),
27 Arc::new(PropertyPathLoweringRule::new(context.clone())),
28 Arc::new(SparqlJoinLoweringRule::new(context.clone())),
29 Arc::new(PatternLoweringRule::new(context.clone())),
30 ];
31
32 match optimization_level {
33 OptimizationLevel::None => {
34 let mut rules = Vec::new();
35 rules.extend(lowering_rules);
36 rules.extend(create_essential_datafusion_optimizers());
37 rules
38 }
39 OptimizationLevel::Default => {
40 let mut rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = Vec::new();
41 rules.extend(lowering_rules);
45 rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
46
47 rules.extend(create_essential_datafusion_optimizers());
50
51 rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
52 rules
53 }
54 OptimizationLevel::Full => {
55 let mut rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = Vec::new();
56 rules.extend(lowering_rules);
60 rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
61
62 rules.extend(Optimizer::default().rules);
63
64 rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
65 rules
66 }
67 }
68}
69
70fn create_essential_datafusion_optimizers() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
71 vec![
72 Arc::new(ReplaceDistinctWithAggregate::new()),
73 Arc::new(DecorrelatePredicateSubquery::new()),
74 Arc::new(EliminateLimit::new()),
75 Arc::new(ScalarSubqueryToJoin::new()),
76 ]
77}
78
79pub fn create_pyhsical_optimizer_rules(
81 _optimization_level: OptimizationLevel,
82) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
83 let mut rules = PhysicalOptimizer::default().rules;
85 rules.push(Arc::new(NestedLoopJoinProjectionPushDown::new()));
86 rules
87}