1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use super::optimizer::PhysicalOptimizerRule;
use crate::{
    error::Result,
    physical_plan::{
        coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
        hash_join::HashJoinExec, repartition::RepartitionExec,
    },
};
use std::sync::Arc;

/// Optimizer that introduces CoalesceBatchesExec to avoid overhead with small batches
pub struct CoalesceBatches {}

impl CoalesceBatches {
    #[allow(missing_docs)]
    pub fn new() -> Self {
        Self {}
    }
}
impl PhysicalOptimizerRule for CoalesceBatches {
    fn optimize(
        &self,
        plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
        config: &crate::execution::context::ExecutionConfig,
    ) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
        // wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
        // highly selective filters
        let children = plan
            .children()
            .iter()
            .map(|child| self.optimize(child.clone(), config))
            .collect::<Result<Vec<_>>>()?;

        let plan_any = plan.as_any();
        //TODO we should do this in a more generic way either by wrapping all operators
        // or having an API so that operators can declare when their inputs or outputs
        // need to be wrapped in a coalesce batches operator.
        // See https://issues.apache.org/jira/browse/ARROW-11068
        let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
            || plan_any.downcast_ref::<HashJoinExec>().is_some()
            || plan_any.downcast_ref::<RepartitionExec>().is_some();

        //TODO we should also do this for HashAggregateExec but we need to update tests
        // as part of this work - see https://issues.apache.org/jira/browse/ARROW-11068
        // || plan_any.downcast_ref::<HashAggregateExec>().is_some();

        if plan.children().is_empty() {
            // leaf node, children cannot be replaced
            Ok(plan.clone())
        } else {
            let plan = plan.with_new_children(children)?;
            Ok(if wrap_in_coalesce {
                //TODO we should add specific configuration settings for coalescing batches and
                // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
                // implemented. For now, we choose half the configured batch size to avoid copies
                // when a small number of rows are removed from a batch
                let target_batch_size = config.batch_size / 2;
                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))
            } else {
                plan.clone()
            })
        }
    }

    fn name(&self) -> &str {
        "coalesce_batches"
    }
}