datafusion_physical_plan/
coalesce_batches.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`CoalesceBatchesExec`] combines small batches into larger batches.
19
20use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::PhysicalExpr;
36
37use crate::coalesce::{BatchCoalescer, CoalescerState};
38use crate::execution_plan::CardinalityEffect;
39use crate::filter_pushdown::{
40    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
41    FilterPushdownPropagation,
42};
43use datafusion_common::config::ConfigOptions;
44use futures::ready;
45use futures::stream::{Stream, StreamExt};
46
47/// `CoalesceBatchesExec` combines small batches into larger batches for more
48/// efficient vectorized processing by later operators.
49///
50/// The operator buffers batches until it collects `target_batch_size` rows and
51/// then emits a single concatenated batch. When only a limited number of rows
52/// are necessary (specified by the `fetch` parameter), the operator will stop
53/// buffering and returns the final batch once the number of collected rows
54/// reaches the `fetch` value.
55///
56/// See [`BatchCoalescer`] for more information
57#[derive(Debug, Clone)]
58pub struct CoalesceBatchesExec {
59    /// The input plan
60    input: Arc<dyn ExecutionPlan>,
61    /// Minimum number of rows for coalescing batches
62    target_batch_size: usize,
63    /// Maximum number of rows to fetch, `None` means fetching all rows
64    fetch: Option<usize>,
65    /// Execution metrics
66    metrics: ExecutionPlanMetricsSet,
67    cache: PlanProperties,
68}
69
70impl CoalesceBatchesExec {
71    /// Create a new CoalesceBatchesExec
72    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
73        let cache = Self::compute_properties(&input);
74        Self {
75            input,
76            target_batch_size,
77            fetch: None,
78            metrics: ExecutionPlanMetricsSet::new(),
79            cache,
80        }
81    }
82
83    /// Update fetch with the argument
84    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85        self.fetch = fetch;
86        self
87    }
88
89    /// The input plan
90    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
91        &self.input
92    }
93
94    /// Minimum number of rows for coalesces batches
95    pub fn target_batch_size(&self) -> usize {
96        self.target_batch_size
97    }
98
99    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
100    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
101        // The coalesce batches operator does not make any changes to the
102        // partitioning of its input.
103        PlanProperties::new(
104            input.equivalence_properties().clone(), // Equivalence Properties
105            input.output_partitioning().clone(),    // Output Partitioning
106            input.pipeline_behavior(),
107            input.boundedness(),
108        )
109    }
110}
111
112impl DisplayAs for CoalesceBatchesExec {
113    fn fmt_as(
114        &self,
115        t: DisplayFormatType,
116        f: &mut std::fmt::Formatter,
117    ) -> std::fmt::Result {
118        match t {
119            DisplayFormatType::Default | DisplayFormatType::Verbose => {
120                write!(
121                    f,
122                    "CoalesceBatchesExec: target_batch_size={}",
123                    self.target_batch_size,
124                )?;
125                if let Some(fetch) = self.fetch {
126                    write!(f, ", fetch={fetch}")?;
127                };
128
129                Ok(())
130            }
131            DisplayFormatType::TreeRender => {
132                writeln!(f, "target_batch_size={}", self.target_batch_size)?;
133                if let Some(fetch) = self.fetch {
134                    write!(f, "limit={fetch}")?;
135                };
136                Ok(())
137            }
138        }
139    }
140}
141
142impl ExecutionPlan for CoalesceBatchesExec {
143    fn name(&self) -> &'static str {
144        "CoalesceBatchesExec"
145    }
146
147    /// Return a reference to Any that can be used for downcasting
148    fn as_any(&self) -> &dyn Any {
149        self
150    }
151
152    fn properties(&self) -> &PlanProperties {
153        &self.cache
154    }
155
156    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157        vec![&self.input]
158    }
159
160    fn maintains_input_order(&self) -> Vec<bool> {
161        vec![true]
162    }
163
164    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
165        vec![false]
166    }
167
168    fn with_new_children(
169        self: Arc<Self>,
170        children: Vec<Arc<dyn ExecutionPlan>>,
171    ) -> Result<Arc<dyn ExecutionPlan>> {
172        Ok(Arc::new(
173            CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
174                .with_fetch(self.fetch),
175        ))
176    }
177
178    fn execute(
179        &self,
180        partition: usize,
181        context: Arc<TaskContext>,
182    ) -> Result<SendableRecordBatchStream> {
183        Ok(Box::pin(CoalesceBatchesStream {
184            input: self.input.execute(partition, context)?,
185            coalescer: BatchCoalescer::new(
186                self.input.schema(),
187                self.target_batch_size,
188                self.fetch,
189            ),
190            baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
191            // Start by pulling data
192            inner_state: CoalesceBatchesStreamState::Pull,
193        }))
194    }
195
196    fn metrics(&self) -> Option<MetricsSet> {
197        Some(self.metrics.clone_inner())
198    }
199
200    fn statistics(&self) -> Result<Statistics> {
201        self.partition_statistics(None)
202    }
203
204    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
205        self.input.partition_statistics(partition)?.with_fetch(
206            self.schema(),
207            self.fetch,
208            0,
209            1,
210        )
211    }
212
213    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
214        Some(Arc::new(CoalesceBatchesExec {
215            input: Arc::clone(&self.input),
216            target_batch_size: self.target_batch_size,
217            fetch: limit,
218            metrics: self.metrics.clone(),
219            cache: self.cache.clone(),
220        }))
221    }
222
223    fn fetch(&self) -> Option<usize> {
224        self.fetch
225    }
226
227    fn cardinality_effect(&self) -> CardinalityEffect {
228        CardinalityEffect::Equal
229    }
230
231    fn gather_filters_for_pushdown(
232        &self,
233        _phase: FilterPushdownPhase,
234        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
235        _config: &ConfigOptions,
236    ) -> Result<FilterDescription> {
237        FilterDescription::from_children(parent_filters, &self.children())
238    }
239
240    fn handle_child_pushdown_result(
241        &self,
242        _phase: FilterPushdownPhase,
243        child_pushdown_result: ChildPushdownResult,
244        _config: &ConfigOptions,
245    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
246        Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
247    }
248}
249
250/// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details.
251struct CoalesceBatchesStream {
252    /// The input plan
253    input: SendableRecordBatchStream,
254    /// Buffer for combining batches
255    coalescer: BatchCoalescer,
256    /// Execution metrics
257    baseline_metrics: BaselineMetrics,
258    /// The current inner state of the stream. This state dictates the current
259    /// action or operation to be performed in the streaming process.
260    inner_state: CoalesceBatchesStreamState,
261}
262
263impl Stream for CoalesceBatchesStream {
264    type Item = Result<RecordBatch>;
265
266    fn poll_next(
267        mut self: Pin<&mut Self>,
268        cx: &mut Context<'_>,
269    ) -> Poll<Option<Self::Item>> {
270        let poll = self.poll_next_inner(cx);
271        self.baseline_metrics.record_poll(poll)
272    }
273
274    fn size_hint(&self) -> (usize, Option<usize>) {
275        // we can't predict the size of incoming batches so re-use the size hint from the input
276        self.input.size_hint()
277    }
278}
279
280/// Enumeration of possible states for `CoalesceBatchesStream`.
281/// It represents different stages in the lifecycle of a stream of record batches.
282///
283/// An example of state transition:
284/// Notation:
285/// `[3000]`: A batch with size 3000
286/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches buffered
287/// Input of `CoalesceBatchStream` will generate three batches `[2000], [3000], [4000]`
288/// The coalescing procedure will go through the following steps with 4096 coalescing threshold:
289/// 1. Read the first batch and get it buffered.
290/// - initial state: `Pull`
291/// - initial buffer: `{}`
292/// - updated buffer: `{[2000]}`
293/// - next state: `Pull`
294/// 2. Read the second batch, the coalescing target is reached since 2000 + 3000 > 4096
295/// - initial state: `Pull`
296/// - initial buffer: `{[2000]}`
297/// - updated buffer: `{[2000], [3000]}`
298/// - next state: `ReturnBuffer`
299/// 4. Two batches in the batch get merged and consumed by the upstream operator.
300/// - initial state: `ReturnBuffer`
301/// - initial buffer: `{[2000], [3000]}`
302/// - updated buffer: `{}`
303/// - next state: `Pull`
304/// 5. Read the third input batch.
305/// - initial state: `Pull`
306/// - initial buffer: `{}`
307/// - updated buffer: `{[4000]}`
308/// - next state: `Pull`
309/// 5. The input is ended now. Jump to exhaustion state preparing the finalized data.
310/// - initial state: `Pull`
311/// - initial buffer: `{[4000]}`
312/// - updated buffer: `{[4000]}`
313/// - next state: `Exhausted`
314#[derive(Debug, Clone, Eq, PartialEq)]
315enum CoalesceBatchesStreamState {
316    /// State to pull a new batch from the input stream.
317    Pull,
318    /// State to return a buffered batch.
319    ReturnBuffer,
320    /// State indicating that the stream is exhausted.
321    Exhausted,
322}
323
324impl CoalesceBatchesStream {
325    fn poll_next_inner(
326        self: &mut Pin<&mut Self>,
327        cx: &mut Context<'_>,
328    ) -> Poll<Option<Result<RecordBatch>>> {
329        let cloned_time = self.baseline_metrics.elapsed_compute().clone();
330        loop {
331            match &self.inner_state {
332                CoalesceBatchesStreamState::Pull => {
333                    // Attempt to pull the next batch from the input stream.
334                    let input_batch = ready!(self.input.poll_next_unpin(cx));
335                    // Start timing the operation. The timer records time upon being dropped.
336                    let _timer = cloned_time.timer();
337
338                    match input_batch {
339                        Some(Ok(batch)) => match self.coalescer.push_batch(batch) {
340                            CoalescerState::Continue => {}
341                            CoalescerState::LimitReached => {
342                                self.inner_state = CoalesceBatchesStreamState::Exhausted;
343                            }
344                            CoalescerState::TargetReached => {
345                                self.inner_state =
346                                    CoalesceBatchesStreamState::ReturnBuffer;
347                            }
348                        },
349                        None => {
350                            // End of input stream, but buffered batches might still be present.
351                            self.inner_state = CoalesceBatchesStreamState::Exhausted;
352                        }
353                        other => return Poll::Ready(other),
354                    }
355                }
356                CoalesceBatchesStreamState::ReturnBuffer => {
357                    let _timer = cloned_time.timer();
358                    // Combine buffered batches into one batch and return it.
359                    let batch = self.coalescer.finish_batch()?;
360                    // Set to pull state for the next iteration.
361                    self.inner_state = CoalesceBatchesStreamState::Pull;
362                    return Poll::Ready(Some(Ok(batch)));
363                }
364                CoalesceBatchesStreamState::Exhausted => {
365                    // Handle the end of the input stream.
366                    return if self.coalescer.is_empty() {
367                        // If buffer is empty, return None indicating the stream is fully consumed.
368                        Poll::Ready(None)
369                    } else {
370                        let _timer = cloned_time.timer();
371                        // If the buffer still contains batches, prepare to return them.
372                        let batch = self.coalescer.finish_batch()?;
373                        Poll::Ready(Some(Ok(batch)))
374                    };
375                }
376            }
377        }
378    }
379}
380
381impl RecordBatchStream for CoalesceBatchesStream {
382    fn schema(&self) -> SchemaRef {
383        self.coalescer.schema()
384    }
385}