rdf_fusion_execution/sparql/
optimizer.rs

1use crate::sparql::OptimizationLevel;
2use datafusion::optimizer::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
3use datafusion::optimizer::eliminate_limit::EliminateLimit;
4use datafusion::optimizer::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
5use datafusion::optimizer::scalar_subquery_to_join::ScalarSubqueryToJoin;
6use datafusion::optimizer::{Optimizer, OptimizerRule};
7use datafusion::physical_optimizer::PhysicalOptimizerRule;
8use datafusion::physical_optimizer::optimizer::PhysicalOptimizer;
9use rdf_fusion_extensions::RdfFusionContextView;
10use rdf_fusion_logical::expr::SimplifySparqlExpressionsRule;
11use rdf_fusion_logical::extend::ExtendLoweringRule;
12use rdf_fusion_logical::join::SparqlJoinLoweringRule;
13use rdf_fusion_logical::minus::MinusLoweringRule;
14use rdf_fusion_logical::paths::PropertyPathLoweringRule;
15use rdf_fusion_logical::patterns::PatternLoweringRule;
16use rdf_fusion_physical::join::NestedLoopJoinProjectionPushDown;
17use std::sync::Arc;
18
19/// Creates a list of optimizer rules based on the given `optimization_level`.
20pub fn create_optimizer_rules(
21    context: RdfFusionContextView,
22    optimization_level: OptimizationLevel,
23) -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
24    let lowering_rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = vec![
25        Arc::new(MinusLoweringRule::new(context.clone())),
26        Arc::new(ExtendLoweringRule::new()),
27        Arc::new(PropertyPathLoweringRule::new(context.clone())),
28        Arc::new(SparqlJoinLoweringRule::new(context.clone())),
29        Arc::new(PatternLoweringRule::new(context.clone())),
30    ];
31
32    match optimization_level {
33        OptimizationLevel::None => {
34            let mut rules = Vec::new();
35            rules.extend(lowering_rules);
36            rules.extend(create_essential_datafusion_optimizers());
37            rules
38        }
39        OptimizationLevel::Default => {
40            let mut rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = Vec::new();
41            // rules.push(Arc::new(SparqlJoinReorderingRule::new(
42            //     context.encodings().clone(),
43            // )));
44            rules.extend(lowering_rules);
45            rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
46
47            // DataFusion Optimizers
48            // TODO: Replace with a good subset
49            rules.extend(create_essential_datafusion_optimizers());
50
51            rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
52            rules
53        }
54        OptimizationLevel::Full => {
55            let mut rules: Vec<Arc<dyn OptimizerRule + Send + Sync>> = Vec::new();
56            // rules.push(Arc::new(SparqlJoinReorderingRule::new(
57            //     context.encodings().clone(),
58            // )));
59            rules.extend(lowering_rules);
60            rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
61
62            rules.extend(Optimizer::default().rules);
63
64            rules.push(Arc::new(SimplifySparqlExpressionsRule::new()));
65            rules
66        }
67    }
68}
69
70fn create_essential_datafusion_optimizers() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
71    vec![
72        Arc::new(ReplaceDistinctWithAggregate::new()),
73        Arc::new(DecorrelatePredicateSubquery::new()),
74        Arc::new(EliminateLimit::new()),
75        Arc::new(ScalarSubqueryToJoin::new()),
76    ]
77}
78
79/// Creates a list of optimizer rules based on the given `optimization_level`.
80pub fn create_pyhsical_optimizer_rules(
81    _optimization_level: OptimizationLevel,
82) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
83    // TODO: build based on optimization level
84    let mut rules = PhysicalOptimizer::default().rules;
85    rules.push(Arc::new(NestedLoopJoinProjectionPushDown::new()));
86    rules
87}