datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::PhysicalExpr;
36
37use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
38use crate::execution_plan::CardinalityEffect;
39use crate::filter_pushdown::{
40    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
41    FilterPushdownPropagation,
42};
43use datafusion_common::config::ConfigOptions;
44use futures::ready;
45use futures::stream::{Stream, StreamExt};
46
47/// `CoalesceBatchesExec` combines small batches into larger batches for more
48/// efficient vectorized processing by later operators.
49///
50/// The operator buffers batches until it collects `target_batch_size` rows and
51/// then emits a single concatenated batch. When only a limited number of rows
52/// are necessary (specified by the `fetch` parameter), the operator will stop
53/// buffering and returns the final batch once the number of collected rows
54/// reaches the `fetch` value.
55///
56/// See [`LimitedBatchCoalescer`] for more information
57#[derive(Debug, Clone)]
58pub struct CoalesceBatchesExec {
59    /// The input plan
60    input: Arc<dyn ExecutionPlan>,
61    /// Minimum number of rows for coalescing batches
62    target_batch_size: usize,
63    /// Maximum number of rows to fetch, `None` means fetching all rows
64    fetch: Option<usize>,
65    /// Execution metrics
66    metrics: ExecutionPlanMetricsSet,
67    cache: PlanProperties,
68}
69
70impl CoalesceBatchesExec {
71    /// Create a new CoalesceBatchesExec
72    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
73        let cache = Self::compute_properties(&input);
74        Self {
75            input,
76            target_batch_size,
77            fetch: None,
78            metrics: ExecutionPlanMetricsSet::new(),
79            cache,
80        }
81    }
82
83    /// Update fetch with the argument
84    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85        self.fetch = fetch;
86        self
87    }
88
89    /// The input plan
90    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
91        &self.input
92    }
93
94    /// Minimum number of rows for coalesces batches
95    pub fn target_batch_size(&self) -> usize {
96        self.target_batch_size
97    }
98
99    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
101        // The coalesce batches operator does not make any changes to the
102        // partitioning of its input.
103        PlanProperties::new(
104            input.equivalence_properties().clone(), // Equivalence Properties
105            input.output_partitioning().clone(),    // Output Partitioning
106            input.pipeline_behavior(),
107            input.boundedness(),
108        )
109    }
110}
111
112impl DisplayAs for CoalesceBatchesExec {
113    fn fmt_as(
114        &self,
115        t: DisplayFormatType,
116        f: &mut std::fmt::Formatter,
117    ) -> std::fmt::Result {
118        match t {
119            DisplayFormatType::Default | DisplayFormatType::Verbose => {
120                write!(
121                    f,
122                    "CoalesceBatchesExec: target_batch_size={}",
123                    self.target_batch_size,
124                )?;
125                if let Some(fetch) = self.fetch {
126                    write!(f, ", fetch={fetch}")?;
127                };
128
129                Ok(())
130            }
131            DisplayFormatType::TreeRender => {
132                writeln!(f, "target_batch_size={}", self.target_batch_size)?;
133                if let Some(fetch) = self.fetch {
134                    write!(f, "limit={fetch}")?;
135                };
136                Ok(())
137            }
138        }
139    }
140}
141
142impl ExecutionPlan for CoalesceBatchesExec {
143    fn name(&self) -> &'static str {
144        "CoalesceBatchesExec"
145    }
146
147    /// Return a reference to Any that can be used for downcasting
148    fn as_any(&self) -> &dyn Any {
149        self
150    }
151
152    fn properties(&self) -> &PlanProperties {
153        &self.cache
154    }
155
156    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157        vec![&self.input]
158    }
159
160    fn maintains_input_order(&self) -> Vec<bool> {
161        vec![true]
162    }
163
164    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
165        vec![false]
166    }
167
168    fn with_new_children(
169        self: Arc<Self>,
170        children: Vec<Arc<dyn ExecutionPlan>>,
171    ) -> Result<Arc<dyn ExecutionPlan>> {
172        Ok(Arc::new(
173            CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
174                .with_fetch(self.fetch),
175        ))
176    }
177
178    fn execute(
179        &self,
180        partition: usize,
181        context: Arc<TaskContext>,
182    ) -> Result<SendableRecordBatchStream> {
183        Ok(Box::pin(CoalesceBatchesStream {
184            input: self.input.execute(partition, context)?,
185            coalescer: LimitedBatchCoalescer::new(
186                self.input.schema(),
187                self.target_batch_size,
188                self.fetch,
189            ),
190            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
191            completed: false,
192        }))
193    }
194
195    fn metrics(&self) -> Option<MetricsSet> {
196        Some(self.metrics.clone_inner())
197    }
198
199    fn statistics(&self) -> Result<Statistics> {
200        self.partition_statistics(None)
201    }
202
203    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
204        self.input
205            .partition_statistics(partition)?
206            .with_fetch(self.fetch, 0, 1)
207    }
208
209    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
210        Some(Arc::new(CoalesceBatchesExec {
211            input: Arc::clone(&self.input),
212            target_batch_size: self.target_batch_size,
213            fetch: limit,
214            metrics: self.metrics.clone(),
215            cache: self.cache.clone(),
216        }))
217    }
218
219    fn fetch(&self) -> Option<usize> {
220        self.fetch
221    }
222
223    fn cardinality_effect(&self) -> CardinalityEffect {
224        CardinalityEffect::Equal
225    }
226
227    fn gather_filters_for_pushdown(
228        &self,
229        _phase: FilterPushdownPhase,
230        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
231        _config: &ConfigOptions,
232    ) -> Result<FilterDescription> {
233        FilterDescription::from_children(parent_filters, &self.children())
234    }
235
236    fn handle_child_pushdown_result(
237        &self,
238        _phase: FilterPushdownPhase,
239        child_pushdown_result: ChildPushdownResult,
240        _config: &ConfigOptions,
241    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
242        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
243    }
244}
245
246/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
247struct CoalesceBatchesStream {
248    /// The input plan
249    input: SendableRecordBatchStream,
250    /// Buffer for combining batches
251    coalescer: LimitedBatchCoalescer,
252    /// Execution metrics
253    baseline_metrics: BaselineMetrics,
254    /// is the input stream exhausted or limit reached?
255    completed: bool,
256}
257
258impl Stream for CoalesceBatchesStream {
259    type Item = Result<RecordBatch>;
260
261    fn poll_next(
262        mut self: Pin<&mut Self>,
263        cx: &mut Context<'_>,
264    ) -> Poll<Option<Self::Item>> {
265        let poll = self.poll_next_inner(cx);
266        self.baseline_metrics.record_poll(poll)
267    }
268
269    fn size_hint(&self) -> (usize, Option<usize>) {
270        // we can't predict the size of incoming batches so re-use the size hint from the input
271        self.input.size_hint()
272    }
273}
274
275impl CoalesceBatchesStream {
276    fn poll_next_inner(
277        self: &mut Pin<&mut Self>,
278        cx: &mut Context<'_>,
279    ) -> Poll<Option<Result<RecordBatch>>> {
280        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
281        loop {
282            // If there is any completed batch ready, return it
283            if let Some(batch) = self.coalescer.next_completed_batch() {
284                return Poll::Ready(Some(Ok(batch)));
285            }
286            if self.completed {
287                // If input is done and no batches are ready, return None to signal end of stream.
288                return Poll::Ready(None);
289            }
290            // Attempt to pull the next batch from the input stream.
291            let input_batch = ready!(self.input.poll_next_unpin(cx));
292            // Start timing the operation. The timer records time upon being dropped.
293            let _timer = cloned_time.timer();
294
295            match input_batch {
296                None => {
297                    // Input stream is exhausted, finalize any remaining batches
298                    self.completed = true;
299                    self.coalescer.finish()?;
300                }
301                Some(Ok(batch)) => {
302                    match self.coalescer.push_batch(batch)? {
303                        PushBatchStatus::Continue => {
304                            // Keep pushing more batches
305                        }
306                        PushBatchStatus::LimitReached => {
307                            // limit was reached, so stop early
308                            self.completed = true;
309                            self.coalescer.finish()?;
310                        }
311                    }
312                }
313                // Error case
314                other => return Poll::Ready(other),
315            }
316        }
317    }
318}
319
320impl RecordBatchStream for CoalesceBatchesStream {
321    fn schema(&self) -> SchemaRef {
322        self.coalescer.schema()
323    }
324}