Skip to main content

datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::projection::ProjectionExec;
28use crate::{
29    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30    check_if_same_properties,
31};
32
33use arrow::datatypes::SchemaRef;
34use arrow::record_batch::RecordBatch;
35use datafusion_common::Result;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::PhysicalExpr;
38
39use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
40use crate::execution_plan::CardinalityEffect;
41use crate::filter_pushdown::{
42    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
43    FilterPushdownPropagation,
44};
45use crate::sort_pushdown::SortOrderPushdownResult;
46use datafusion_common::config::ConfigOptions;
47use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
48use futures::ready;
49use futures::stream::{Stream, StreamExt};
50
51/// `CoalesceBatchesExec` combines small batches into larger batches for more
52/// efficient vectorized processing by later operators.
53///
54/// The operator buffers batches until it collects `target_batch_size` rows and
55/// then emits a single concatenated batch. When only a limited number of rows
56/// are necessary (specified by the `fetch` parameter), the operator will stop
57/// buffering and returns the final batch once the number of collected rows
58/// reaches the `fetch` value.
59///
60/// See [`LimitedBatchCoalescer`] for more information
61#[deprecated(
62    since = "52.0.0",
63    note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator"
64)]
65#[derive(Debug, Clone)]
66pub struct CoalesceBatchesExec {
67    /// The input plan
68    input: Arc<dyn ExecutionPlan>,
69    /// Minimum number of rows for coalescing batches
70    target_batch_size: usize,
71    /// Maximum number of rows to fetch, `None` means fetching all rows
72    fetch: Option<usize>,
73    /// Execution metrics
74    metrics: ExecutionPlanMetricsSet,
75    cache: Arc<PlanProperties>,
76}
77
78#[expect(deprecated)]
79impl CoalesceBatchesExec {
80    /// Create a new CoalesceBatchesExec
81    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
82        let cache = Self::compute_properties(&input);
83        Self {
84            input,
85            target_batch_size,
86            fetch: None,
87            metrics: ExecutionPlanMetricsSet::new(),
88            cache: Arc::new(cache),
89        }
90    }
91
92    /// Update fetch with the argument
93    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
94        self.fetch = fetch;
95        self
96    }
97
98    /// The input plan
99    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
100        &self.input
101    }
102
103    /// Minimum number of rows for coalesces batches
104    pub fn target_batch_size(&self) -> usize {
105        self.target_batch_size
106    }
107
108    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
109    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
110        // The coalesce batches operator does not make any changes to the
111        // partitioning of its input.
112        PlanProperties::new(
113            input.equivalence_properties().clone(), // Equivalence Properties
114            input.output_partitioning().clone(),    // Output Partitioning
115            input.pipeline_behavior(),
116            input.boundedness(),
117        )
118    }
119
120    fn with_new_children_and_same_properties(
121        &self,
122        mut children: Vec<Arc<dyn ExecutionPlan>>,
123    ) -> Self {
124        Self {
125            input: children.swap_remove(0),
126            metrics: ExecutionPlanMetricsSet::new(),
127            ..Self::clone(self)
128        }
129    }
130}
131
132#[expect(deprecated)]
133impl DisplayAs for CoalesceBatchesExec {
134    fn fmt_as(
135        &self,
136        t: DisplayFormatType,
137        f: &mut std::fmt::Formatter,
138    ) -> std::fmt::Result {
139        match t {
140            DisplayFormatType::Default | DisplayFormatType::Verbose => {
141                write!(
142                    f,
143                    "CoalesceBatchesExec: target_batch_size={}",
144                    self.target_batch_size,
145                )?;
146                if let Some(fetch) = self.fetch {
147                    write!(f, ", fetch={fetch}")?;
148                };
149
150                Ok(())
151            }
152            DisplayFormatType::TreeRender => {
153                writeln!(f, "target_batch_size={}", self.target_batch_size)?;
154                if let Some(fetch) = self.fetch {
155                    write!(f, "limit={fetch}")?;
156                };
157                Ok(())
158            }
159        }
160    }
161}
162
163#[expect(deprecated)]
164impl ExecutionPlan for CoalesceBatchesExec {
165    fn name(&self) -> &'static str {
166        "CoalesceBatchesExec"
167    }
168
169    /// Return a reference to Any that can be used for downcasting
170    fn as_any(&self) -> &dyn Any {
171        self
172    }
173
174    fn properties(&self) -> &Arc<PlanProperties> {
175        &self.cache
176    }
177
178    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
179        vec![&self.input]
180    }
181
182    fn maintains_input_order(&self) -> Vec<bool> {
183        vec![true]
184    }
185
186    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
187        vec![false]
188    }
189
190    fn with_new_children(
191        self: Arc<Self>,
192        mut children: Vec<Arc<dyn ExecutionPlan>>,
193    ) -> Result<Arc<dyn ExecutionPlan>> {
194        check_if_same_properties!(self, children);
195        Ok(Arc::new(
196            CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
197                .with_fetch(self.fetch),
198        ))
199    }
200
201    fn execute(
202        &self,
203        partition: usize,
204        context: Arc<TaskContext>,
205    ) -> Result<SendableRecordBatchStream> {
206        Ok(Box::pin(CoalesceBatchesStream {
207            input: self.input.execute(partition, context)?,
208            coalescer: LimitedBatchCoalescer::new(
209                self.input.schema(),
210                self.target_batch_size,
211                self.fetch,
212            ),
213            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
214            completed: false,
215        }))
216    }
217
218    fn metrics(&self) -> Option<MetricsSet> {
219        Some(self.metrics.clone_inner())
220    }
221
222    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
223        self.input
224            .partition_statistics(partition)?
225            .with_fetch(self.fetch, 0, 1)
226    }
227
228    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
229        Some(Arc::new(CoalesceBatchesExec {
230            input: Arc::clone(&self.input),
231            target_batch_size: self.target_batch_size,
232            fetch: limit,
233            metrics: self.metrics.clone(),
234            cache: Arc::clone(&self.cache),
235        }))
236    }
237
238    fn fetch(&self) -> Option<usize> {
239        self.fetch
240    }
241
242    fn cardinality_effect(&self) -> CardinalityEffect {
243        CardinalityEffect::Equal
244    }
245
246    fn try_swapping_with_projection(
247        &self,
248        projection: &ProjectionExec,
249    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
250        match self.input.try_swapping_with_projection(projection)? {
251            Some(new_input) => Ok(Some(
252                Arc::new(self.clone()).with_new_children(vec![new_input])?,
253            )),
254            None => Ok(None),
255        }
256    }
257
258    fn gather_filters_for_pushdown(
259        &self,
260        _phase: FilterPushdownPhase,
261        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
262        _config: &ConfigOptions,
263    ) -> Result<FilterDescription> {
264        FilterDescription::from_children(parent_filters, &self.children())
265    }
266
267    fn handle_child_pushdown_result(
268        &self,
269        _phase: FilterPushdownPhase,
270        child_pushdown_result: ChildPushdownResult,
271        _config: &ConfigOptions,
272    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
273        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
274    }
275
276    fn try_pushdown_sort(
277        &self,
278        order: &[PhysicalSortExpr],
279    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
280        // CoalesceBatchesExec is transparent for sort ordering - it preserves order
281        // Delegate to the child and wrap with a new CoalesceBatchesExec
282        self.input.try_pushdown_sort(order)?.try_map(|new_input| {
283            Ok(Arc::new(
284                CoalesceBatchesExec::new(new_input, self.target_batch_size)
285                    .with_fetch(self.fetch),
286            ) as Arc<dyn ExecutionPlan>)
287        })
288    }
289}
290
291/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
292struct CoalesceBatchesStream {
293    /// The input plan
294    input: SendableRecordBatchStream,
295    /// Buffer for combining batches
296    coalescer: LimitedBatchCoalescer,
297    /// Execution metrics
298    baseline_metrics: BaselineMetrics,
299    /// is the input stream exhausted or limit reached?
300    completed: bool,
301}
302
303impl Stream for CoalesceBatchesStream {
304    type Item = Result<RecordBatch>;
305
306    fn poll_next(
307        mut self: Pin<&mut Self>,
308        cx: &mut Context<'_>,
309    ) -> Poll<Option<Self::Item>> {
310        let poll = self.poll_next_inner(cx);
311        self.baseline_metrics.record_poll(poll)
312    }
313
314    fn size_hint(&self) -> (usize, Option<usize>) {
315        // we can't predict the size of incoming batches so re-use the size hint from the input
316        self.input.size_hint()
317    }
318}
319
320impl CoalesceBatchesStream {
321    fn poll_next_inner(
322        self: &mut Pin<&mut Self>,
323        cx: &mut Context<'_>,
324    ) -> Poll<Option<Result<RecordBatch>>> {
325        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
326        loop {
327            // If there is any completed batch ready, return it
328            if let Some(batch) = self.coalescer.next_completed_batch() {
329                return Poll::Ready(Some(Ok(batch)));
330            }
331            if self.completed {
332                // If input is done and no batches are ready, return None to signal end of stream.
333                return Poll::Ready(None);
334            }
335            // Attempt to pull the next batch from the input stream.
336            let input_batch = ready!(self.input.poll_next_unpin(cx));
337            // Start timing the operation. The timer records time upon being dropped.
338            let _timer = cloned_time.timer();
339
340            match input_batch {
341                None => {
342                    // Input stream is exhausted, finalize any remaining batches
343                    self.completed = true;
344                    self.coalescer.finish()?;
345                }
346                Some(Ok(batch)) => {
347                    match self.coalescer.push_batch(batch)? {
348                        PushBatchStatus::Continue => {
349                            // Keep pushing more batches
350                        }
351                        PushBatchStatus::LimitReached => {
352                            // limit was reached, so stop early
353                            self.completed = true;
354                            self.coalescer.finish()?;
355                        }
356                    }
357                }
358                // Error case
359                other => return Poll::Ready(other),
360            }
361        }
362    }
363}
364
365impl RecordBatchStream for CoalesceBatchesStream {
366    fn schema(&self) -> SchemaRef {
367        self.coalescer.schema()
368    }
369}