Skip to main content

datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::projection::ProjectionExec;
28use crate::{
29    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30};
31
32use arrow::datatypes::SchemaRef;
33use arrow::record_batch::RecordBatch;
34use datafusion_common::Result;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::PhysicalExpr;
37
38use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
39use crate::execution_plan::CardinalityEffect;
40use crate::filter_pushdown::{
41    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
42    FilterPushdownPropagation,
43};
44use crate::sort_pushdown::SortOrderPushdownResult;
45use datafusion_common::config::ConfigOptions;
46use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
47use futures::ready;
48use futures::stream::{Stream, StreamExt};
49
50/// `CoalesceBatchesExec` combines small batches into larger batches for more
51/// efficient vectorized processing by later operators.
52///
53/// The operator buffers batches until it collects `target_batch_size` rows and
54/// then emits a single concatenated batch. When only a limited number of rows
55/// are necessary (specified by the `fetch` parameter), the operator will stop
56/// buffering and returns the final batch once the number of collected rows
57/// reaches the `fetch` value.
58///
59/// See [`LimitedBatchCoalescer`] for more information
60#[derive(Debug, Clone)]
61pub struct CoalesceBatchesExec {
62    /// The input plan
63    input: Arc<dyn ExecutionPlan>,
64    /// Minimum number of rows for coalescing batches
65    target_batch_size: usize,
66    /// Maximum number of rows to fetch, `None` means fetching all rows
67    fetch: Option<usize>,
68    /// Execution metrics
69    metrics: ExecutionPlanMetricsSet,
70    cache: PlanProperties,
71}
72
73impl CoalesceBatchesExec {
74    /// Create a new CoalesceBatchesExec
75    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
76        let cache = Self::compute_properties(&input);
77        Self {
78            input,
79            target_batch_size,
80            fetch: None,
81            metrics: ExecutionPlanMetricsSet::new(),
82            cache,
83        }
84    }
85
86    /// Update fetch with the argument
87    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
88        self.fetch = fetch;
89        self
90    }
91
92    /// The input plan
93    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
94        &self.input
95    }
96
97    /// Minimum number of rows for coalesces batches
98    pub fn target_batch_size(&self) -> usize {
99        self.target_batch_size
100    }
101
102    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
103    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
104        // The coalesce batches operator does not make any changes to the
105        // partitioning of its input.
106        PlanProperties::new(
107            input.equivalence_properties().clone(), // Equivalence Properties
108            input.output_partitioning().clone(),    // Output Partitioning
109            input.pipeline_behavior(),
110            input.boundedness(),
111        )
112    }
113}
114
115impl DisplayAs for CoalesceBatchesExec {
116    fn fmt_as(
117        &self,
118        t: DisplayFormatType,
119        f: &mut std::fmt::Formatter,
120    ) -> std::fmt::Result {
121        match t {
122            DisplayFormatType::Default | DisplayFormatType::Verbose => {
123                write!(
124                    f,
125                    "CoalesceBatchesExec: target_batch_size={}",
126                    self.target_batch_size,
127                )?;
128                if let Some(fetch) = self.fetch {
129                    write!(f, ", fetch={fetch}")?;
130                };
131
132                Ok(())
133            }
134            DisplayFormatType::TreeRender => {
135                writeln!(f, "target_batch_size={}", self.target_batch_size)?;
136                if let Some(fetch) = self.fetch {
137                    write!(f, "limit={fetch}")?;
138                };
139                Ok(())
140            }
141        }
142    }
143}
144
145impl ExecutionPlan for CoalesceBatchesExec {
146    fn name(&self) -> &'static str {
147        "CoalesceBatchesExec"
148    }
149
150    /// Return a reference to Any that can be used for downcasting
151    fn as_any(&self) -> &dyn Any {
152        self
153    }
154
155    fn properties(&self) -> &PlanProperties {
156        &self.cache
157    }
158
159    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
160        vec![&self.input]
161    }
162
163    fn maintains_input_order(&self) -> Vec<bool> {
164        vec![true]
165    }
166
167    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
168        vec![false]
169    }
170
171    fn with_new_children(
172        self: Arc<Self>,
173        children: Vec<Arc<dyn ExecutionPlan>>,
174    ) -> Result<Arc<dyn ExecutionPlan>> {
175        Ok(Arc::new(
176            CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
177                .with_fetch(self.fetch),
178        ))
179    }
180
181    fn execute(
182        &self,
183        partition: usize,
184        context: Arc<TaskContext>,
185    ) -> Result<SendableRecordBatchStream> {
186        Ok(Box::pin(CoalesceBatchesStream {
187            input: self.input.execute(partition, context)?,
188            coalescer: LimitedBatchCoalescer::new(
189                self.input.schema(),
190                self.target_batch_size,
191                self.fetch,
192            ),
193            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
194            completed: false,
195        }))
196    }
197
198    fn metrics(&self) -> Option<MetricsSet> {
199        Some(self.metrics.clone_inner())
200    }
201
202    fn statistics(&self) -> Result<Statistics> {
203        self.partition_statistics(None)
204    }
205
206    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
207        self.input
208            .partition_statistics(partition)?
209            .with_fetch(self.fetch, 0, 1)
210    }
211
212    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
213        Some(Arc::new(CoalesceBatchesExec {
214            input: Arc::clone(&self.input),
215            target_batch_size: self.target_batch_size,
216            fetch: limit,
217            metrics: self.metrics.clone(),
218            cache: self.cache.clone(),
219        }))
220    }
221
222    fn fetch(&self) -> Option<usize> {
223        self.fetch
224    }
225
226    fn cardinality_effect(&self) -> CardinalityEffect {
227        CardinalityEffect::Equal
228    }
229
230    fn try_swapping_with_projection(
231        &self,
232        projection: &ProjectionExec,
233    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
234        match self.input.try_swapping_with_projection(projection)? {
235            Some(new_input) => Ok(Some(
236                Arc::new(self.clone()).with_new_children(vec![new_input])?,
237            )),
238            None => Ok(None),
239        }
240    }
241
242    fn gather_filters_for_pushdown(
243        &self,
244        _phase: FilterPushdownPhase,
245        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
246        _config: &ConfigOptions,
247    ) -> Result<FilterDescription> {
248        FilterDescription::from_children(parent_filters, &self.children())
249    }
250
251    fn handle_child_pushdown_result(
252        &self,
253        _phase: FilterPushdownPhase,
254        child_pushdown_result: ChildPushdownResult,
255        _config: &ConfigOptions,
256    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
257        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
258    }
259
260    fn try_pushdown_sort(
261        &self,
262        order: &[PhysicalSortExpr],
263    ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
264        // CoalesceBatchesExec is transparent for sort ordering - it preserves order
265        // Delegate to the child and wrap with a new CoalesceBatchesExec
266        self.input.try_pushdown_sort(order)?.try_map(|new_input| {
267            Ok(Arc::new(
268                CoalesceBatchesExec::new(new_input, self.target_batch_size)
269                    .with_fetch(self.fetch),
270            ) as Arc<dyn ExecutionPlan>)
271        })
272    }
273}
274
275/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
276struct CoalesceBatchesStream {
277    /// The input plan
278    input: SendableRecordBatchStream,
279    /// Buffer for combining batches
280    coalescer: LimitedBatchCoalescer,
281    /// Execution metrics
282    baseline_metrics: BaselineMetrics,
283    /// is the input stream exhausted or limit reached?
284    completed: bool,
285}
286
287impl Stream for CoalesceBatchesStream {
288    type Item = Result<RecordBatch>;
289
290    fn poll_next(
291        mut self: Pin<&mut Self>,
292        cx: &mut Context<'_>,
293    ) -> Poll<Option<Self::Item>> {
294        let poll = self.poll_next_inner(cx);
295        self.baseline_metrics.record_poll(poll)
296    }
297
298    fn size_hint(&self) -> (usize, Option<usize>) {
299        // we can't predict the size of incoming batches so re-use the size hint from the input
300        self.input.size_hint()
301    }
302}
303
304impl CoalesceBatchesStream {
305    fn poll_next_inner(
306        self: &mut Pin<&mut Self>,
307        cx: &mut Context<'_>,
308    ) -> Poll<Option<Result<RecordBatch>>> {
309        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
310        loop {
311            // If there is any completed batch ready, return it
312            if let Some(batch) = self.coalescer.next_completed_batch() {
313                return Poll::Ready(Some(Ok(batch)));
314            }
315            if self.completed {
316                // If input is done and no batches are ready, return None to signal end of stream.
317                return Poll::Ready(None);
318            }
319            // Attempt to pull the next batch from the input stream.
320            let input_batch = ready!(self.input.poll_next_unpin(cx));
321            // Start timing the operation. The timer records time upon being dropped.
322            let _timer = cloned_time.timer();
323
324            match input_batch {
325                None => {
326                    // Input stream is exhausted, finalize any remaining batches
327                    self.completed = true;
328                    self.coalescer.finish()?;
329                }
330                Some(Ok(batch)) => {
331                    match self.coalescer.push_batch(batch)? {
332                        PushBatchStatus::Continue => {
333                            // Keep pushing more batches
334                        }
335                        PushBatchStatus::LimitReached => {
336                            // limit was reached, so stop early
337                            self.completed = true;
338                            self.coalescer.finish()?;
339                        }
340                    }
341                }
342                // Error case
343                other => return Poll::Ready(other),
344            }
345        }
346    }
347}
348
349impl RecordBatchStream for CoalesceBatchesStream {
350    fn schema(&self) -> SchemaRef {
351        self.coalescer.schema()
352    }
353}