datafusion_physical_optimizer/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! CoalesceBatches optimizer that groups batches together rows
19//! in bigger batches to avoid overhead with small batches
20
21use crate::PhysicalOptimizerRule;
22
23use std::sync::Arc;
24
25use datafusion_common::assert_eq_or_internal_err;
26use datafusion_common::config::ConfigOptions;
27use datafusion_common::error::Result;
28use datafusion_physical_plan::{
29    ExecutionPlan, async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
30};
31
32use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
33
34/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
35/// are produced by highly selective filters
36#[derive(Default, Debug)]
37pub struct CoalesceBatches {}
38
39impl CoalesceBatches {
40    #[expect(missing_docs)]
41    pub fn new() -> Self {
42        Self::default()
43    }
44}
45impl PhysicalOptimizerRule for CoalesceBatches {
46    fn optimize(
47        &self,
48        plan: Arc<dyn ExecutionPlan>,
49        config: &ConfigOptions,
50    ) -> Result<Arc<dyn ExecutionPlan>> {
51        if !config.execution.coalesce_batches {
52            return Ok(plan);
53        }
54
55        let target_batch_size = config.execution.batch_size;
56        plan.transform_up(|plan| {
57            let plan_any = plan.as_any();
58            if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() {
59                // Coalesce inputs to async functions to reduce number of async function invocations
60                let children = async_exec.children();
61                assert_eq_or_internal_err!(
62                    children.len(),
63                    1,
64                    "Expected AsyncFuncExec to have exactly one child"
65                );
66
67                let coalesce_exec = Arc::new(CoalesceBatchesExec::new(
68                    Arc::clone(children[0]),
69                    target_batch_size,
70                ));
71                let new_plan = plan.with_new_children(vec![coalesce_exec])?;
72                Ok(Transformed::yes(new_plan))
73            } else {
74                Ok(Transformed::no(plan))
75            }
76        })
77        .data()
78    }
79
80    fn name(&self) -> &str {
81        "coalesce_batches"
82    }
83
84    fn schema_check(&self) -> bool {
85        true
86    }
87}