Skip to main content

datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
26use crate::projection::ProjectionExec;
27use crate::stream::EmptyRecordBatchStream;
28use crate::{
29    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30    check_if_same_properties,
31};
32
33use arrow::datatypes::SchemaRef;
34use arrow::record_batch::RecordBatch;
35use datafusion_common::Result;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::PhysicalExpr;
38
39use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
40use crate::execution_plan::CardinalityEffect;
41use crate::filter_pushdown::{
42    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
43    FilterPushdownPropagation,
44};
45use crate::sort_pushdown::SortOrderPushdownResult;
46use datafusion_common::config::ConfigOptions;
47use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
48use futures::ready;
49use futures::stream::{Stream, StreamExt};
50
51/// `CoalesceBatchesExec` combines small batches into larger batches for more
52/// efficient vectorized processing by later operators.
53///
54/// The operator buffers batches until it collects `target_batch_size` rows and
55/// then emits a single concatenated batch. When only a limited number of rows
56/// are necessary (specified by the `fetch` parameter), the operator will stop
57/// buffering and returns the final batch once the number of collected rows
58/// reaches the `fetch` value.
59///
60/// See [`LimitedBatchCoalescer`] for more information
61#[deprecated(
62    since = "52.0.0",
63    note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator"
64)]
65#[derive(Debug, Clone)]
66pub struct CoalesceBatchesExec {
67    /// The input plan
68    input: Arc<dyn ExecutionPlan>,
69    /// Minimum number of rows for coalescing batches
70    target_batch_size: usize,
71    /// Maximum number of rows to fetch, `None` means fetching all rows
72    fetch: Option<usize>,
73    /// Execution metrics
74    metrics: ExecutionPlanMetricsSet,
75    cache: Arc<PlanProperties>,
76}
77
78#[expect(deprecated)]
79impl CoalesceBatchesExec {
80    /// Create a new CoalesceBatchesExec
81    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
82        let cache = Self::compute_properties(&input);
83        Self {
84            input,
85            target_batch_size,
86            fetch: None,
87            metrics: ExecutionPlanMetricsSet::new(),
88            cache: Arc::new(cache),
89        }
90    }
91
92    /// Update fetch with the argument
93    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
94        self.fetch = fetch;
95        self
96    }
97
98    /// The input plan
99    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
100        &self.input
101    }
102
103    /// Minimum number of rows for coalesces batches
104    pub fn target_batch_size(&self) -> usize {
105        self.target_batch_size
106    }
107
108    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
109    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
110        // The coalesce batches operator does not make any changes to the
111        // partitioning of its input.
112        PlanProperties::new(
113            input.equivalence_properties().clone(), // Equivalence Properties
114            input.output_partitioning().clone(),    // Output Partitioning
115            input.pipeline_behavior(),
116            input.boundedness(),
117        )
118    }
119
120    fn with_new_children_and_same_properties(
121        &self,
122        mut children: Vec<Arc<dyn ExecutionPlan>>,
123    ) -> Self {
124        Self {
125            input: children.swap_remove(0),
126            metrics: ExecutionPlanMetricsSet::new(),
127            ..Self::clone(self)
128        }
129    }
130}
131
132#[expect(deprecated)]
133impl DisplayAs for CoalesceBatchesExec {
134    fn fmt_as(
135        &self,
136        t: DisplayFormatType,
137        f: &mut std::fmt::Formatter,
138    ) -> std::fmt::Result {
139        match t {
140            DisplayFormatType::Default | DisplayFormatType::Verbose => {
141                write!(
142                    f,
143                    "CoalesceBatchesExec: target_batch_size={}",
144                    self.target_batch_size,
145                )?;
146                if let Some(fetch) = self.fetch {
147                    write!(f, ", fetch={fetch}")?;
148                };
149
150                Ok(())
151            }
152            DisplayFormatType::TreeRender => {
153                writeln!(f, "target_batch_size={}", self.target_batch_size)?;
154                if let Some(fetch) = self.fetch {
155                    write!(f, "limit={fetch}")?;
156                };
157                Ok(())
158            }
159        }
160    }
161}
162
163#[expect(deprecated)]
164impl ExecutionPlan for CoalesceBatchesExec {
165    fn name(&self) -> &'static str {
166        "CoalesceBatchesExec"
167    }
168
169    /// Return a reference to Any that can be used for downcasting
170    fn properties(&self) -> &Arc<PlanProperties> {
171        &self.cache
172    }
173
174    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
175        vec![&self.input]
176    }
177
178    fn maintains_input_order(&self) -> Vec<bool> {
179        vec![true]
180    }
181
182    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
183        vec![false]
184    }
185
186    fn with_new_children(
187        self: Arc<Self>,
188        mut children: Vec<Arc<dyn ExecutionPlan>>,
189    ) -> Result<Arc<dyn ExecutionPlan>> {
190        check_if_same_properties!(self, children);
191        Ok(Arc::new(
192            CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
193                .with_fetch(self.fetch),
194        ))
195    }
196
197    fn execute(
198        &self,
199        partition: usize,
200        context: Arc<TaskContext>,
201    ) -> Result<SendableRecordBatchStream> {
202        Ok(Box::pin(CoalesceBatchesStream {
203            input: self.input.execute(partition, context)?,
204            coalescer: LimitedBatchCoalescer::new(
205                self.input.schema(),
206                self.target_batch_size,
207                self.fetch,
208            ),
209            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
210            completed: false,
211        }))
212    }
213
214    fn metrics(&self) -> Option<MetricsSet> {
215        Some(self.metrics.clone_inner())
216    }
217
218    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
219        let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
220        Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
221    }
222
223    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
224        Some(Arc::new(CoalesceBatchesExec {
225            input: Arc::clone(&self.input),
226            target_batch_size: self.target_batch_size,
227            fetch: limit,
228            metrics: self.metrics.clone(),
229            cache: Arc::clone(&self.cache),
230        }))
231    }
232
233    fn fetch(&self) -> Option<usize> {
234        self.fetch
235    }
236
237    fn cardinality_effect(&self) -> CardinalityEffect {
238        CardinalityEffect::Equal
239    }
240
241    fn try_swapping_with_projection(
242        &self,
243        projection: &ProjectionExec,
244    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
245        match self.input.try_swapping_with_projection(projection)? {
246            Some(new_input) => Ok(Some(
247                Arc::new(self.clone()).with_new_children(vec![new_input])?,
248            )),
249            None => Ok(None),
250        }
251    }
252
253    fn gather_filters_for_pushdown(
254        &self,
255        _phase: FilterPushdownPhase,
256        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
257        _config: &ConfigOptions,
258    ) -> Result<FilterDescription> {
259        FilterDescription::from_children(parent_filters, &self.children())
260    }
261
262    fn handle_child_pushdown_result(
263        &self,
264        _phase: FilterPushdownPhase,
265        child_pushdown_result: ChildPushdownResult,
266        _config: &ConfigOptions,
267    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
268        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
269    }
270
271    fn try_pushdown_sort(
272        &self,
273        order: &[PhysicalSortExpr],
274    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
275        // CoalesceBatchesExec is transparent for sort ordering - it preserves order
276        // Delegate to the child and wrap with a new CoalesceBatchesExec
277        self.input.try_pushdown_sort(order)?.try_map(|new_input| {
278            Ok(Arc::new(
279                CoalesceBatchesExec::new(new_input, self.target_batch_size)
280                    .with_fetch(self.fetch),
281            ) as Arc<dyn ExecutionPlan>)
282        })
283    }
284}
285
286/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
287struct CoalesceBatchesStream {
288    /// The input plan
289    input: SendableRecordBatchStream,
290    /// Buffer for combining batches
291    coalescer: LimitedBatchCoalescer,
292    /// Execution metrics
293    baseline_metrics: BaselineMetrics,
294    /// is the input stream exhausted or limit reached?
295    completed: bool,
296}
297
298impl Stream for CoalesceBatchesStream {
299    type Item = Result<RecordBatch>;
300
301    fn poll_next(
302        mut self: Pin<&mut Self>,
303        cx: &mut Context<'_>,
304    ) -> Poll<Option<Self::Item>> {
305        let poll = self.poll_next_inner(cx);
306        self.baseline_metrics.record_poll(poll)
307    }
308
309    fn size_hint(&self) -> (usize, Option<usize>) {
310        // we can't predict the size of incoming batches so re-use the size hint from the input
311        self.input.size_hint()
312    }
313}
314
315impl CoalesceBatchesStream {
316    fn poll_next_inner(
317        self: &mut Pin<&mut Self>,
318        cx: &mut Context<'_>,
319    ) -> Poll<Option<Result<RecordBatch>>> {
320        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
321        loop {
322            // If there is any completed batch ready, return it
323            if let Some(batch) = self.coalescer.next_completed_batch() {
324                return Poll::Ready(Some(Ok(batch)));
325            }
326            if self.completed {
327                // If input is done and no batches are ready, return None to signal end of stream.
328                return Poll::Ready(None);
329            }
330            // Attempt to pull the next batch from the input stream.
331            let input_batch = ready!(self.input.poll_next_unpin(cx));
332            // Start timing the operation. The timer records time upon being dropped.
333            let _timer = cloned_time.timer();
334
335            match input_batch {
336                None => {
337                    // Input stream is exhausted, finalize any remaining batches
338                    self.completed = true;
339                    self.input =
340                        Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema()));
341                    self.coalescer.finish()?;
342                }
343                Some(Ok(batch)) => {
344                    match self.coalescer.push_batch(batch)? {
345                        PushBatchStatus::Continue => {
346                            // Keep pushing more batches
347                        }
348                        PushBatchStatus::LimitReached => {
349                            // limit was reached, so stop early
350                            self.completed = true;
351                            self.input = Box::pin(EmptyRecordBatchStream::new(
352                                self.coalescer.schema(),
353                            ));
354                            self.coalescer.finish()?;
355                        }
356                    }
357                }
358                // Error case
359                other => return Poll::Ready(other),
360            }
361        }
362    }
363}
364
365impl RecordBatchStream for CoalesceBatchesStream {
366    fn schema(&self) -> SchemaRef {
367        self.coalescer.schema()
368    }
369}