datafusion_physical_plan/
coalesce_batches.rs1use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
26use crate::projection::ProjectionExec;
27use crate::stream::EmptyRecordBatchStream;
28use crate::{
29 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30 check_if_same_properties,
31};
32
33use arrow::datatypes::SchemaRef;
34use arrow::record_batch::RecordBatch;
35use datafusion_common::Result;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::PhysicalExpr;
38
39use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
40use crate::execution_plan::CardinalityEffect;
41use crate::filter_pushdown::{
42 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
43 FilterPushdownPropagation,
44};
45use crate::sort_pushdown::SortOrderPushdownResult;
46use datafusion_common::config::ConfigOptions;
47use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
48use futures::ready;
49use futures::stream::{Stream, StreamExt};
50
51#[deprecated(
62 since = "52.0.0",
63 note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator"
64)]
65#[derive(Debug, Clone)]
66pub struct CoalesceBatchesExec {
67 input: Arc<dyn ExecutionPlan>,
69 target_batch_size: usize,
71 fetch: Option<usize>,
73 metrics: ExecutionPlanMetricsSet,
75 cache: Arc<PlanProperties>,
76}
77
78#[expect(deprecated)]
79impl CoalesceBatchesExec {
80 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
82 let cache = Self::compute_properties(&input);
83 Self {
84 input,
85 target_batch_size,
86 fetch: None,
87 metrics: ExecutionPlanMetricsSet::new(),
88 cache: Arc::new(cache),
89 }
90 }
91
92 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
94 self.fetch = fetch;
95 self
96 }
97
98 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
100 &self.input
101 }
102
103 pub fn target_batch_size(&self) -> usize {
105 self.target_batch_size
106 }
107
108 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
110 PlanProperties::new(
113 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
116 input.boundedness(),
117 )
118 }
119
120 fn with_new_children_and_same_properties(
121 &self,
122 mut children: Vec<Arc<dyn ExecutionPlan>>,
123 ) -> Self {
124 Self {
125 input: children.swap_remove(0),
126 metrics: ExecutionPlanMetricsSet::new(),
127 ..Self::clone(self)
128 }
129 }
130}
131
132#[expect(deprecated)]
133impl DisplayAs for CoalesceBatchesExec {
134 fn fmt_as(
135 &self,
136 t: DisplayFormatType,
137 f: &mut std::fmt::Formatter,
138 ) -> std::fmt::Result {
139 match t {
140 DisplayFormatType::Default | DisplayFormatType::Verbose => {
141 write!(
142 f,
143 "CoalesceBatchesExec: target_batch_size={}",
144 self.target_batch_size,
145 )?;
146 if let Some(fetch) = self.fetch {
147 write!(f, ", fetch={fetch}")?;
148 };
149
150 Ok(())
151 }
152 DisplayFormatType::TreeRender => {
153 writeln!(f, "target_batch_size={}", self.target_batch_size)?;
154 if let Some(fetch) = self.fetch {
155 write!(f, "limit={fetch}")?;
156 };
157 Ok(())
158 }
159 }
160 }
161}
162
163#[expect(deprecated)]
164impl ExecutionPlan for CoalesceBatchesExec {
165 fn name(&self) -> &'static str {
166 "CoalesceBatchesExec"
167 }
168
169 fn properties(&self) -> &Arc<PlanProperties> {
171 &self.cache
172 }
173
174 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
175 vec![&self.input]
176 }
177
178 fn maintains_input_order(&self) -> Vec<bool> {
179 vec![true]
180 }
181
182 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
183 vec![false]
184 }
185
186 fn with_new_children(
187 self: Arc<Self>,
188 mut children: Vec<Arc<dyn ExecutionPlan>>,
189 ) -> Result<Arc<dyn ExecutionPlan>> {
190 check_if_same_properties!(self, children);
191 Ok(Arc::new(
192 CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
193 .with_fetch(self.fetch),
194 ))
195 }
196
197 fn execute(
198 &self,
199 partition: usize,
200 context: Arc<TaskContext>,
201 ) -> Result<SendableRecordBatchStream> {
202 Ok(Box::pin(CoalesceBatchesStream {
203 input: self.input.execute(partition, context)?,
204 coalescer: LimitedBatchCoalescer::new(
205 self.input.schema(),
206 self.target_batch_size,
207 self.fetch,
208 ),
209 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
210 completed: false,
211 }))
212 }
213
214 fn metrics(&self) -> Option<MetricsSet> {
215 Some(self.metrics.clone_inner())
216 }
217
218 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
219 let stats = Arc::unwrap_or_clone(self.input.partition_statistics(partition)?);
220 Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
221 }
222
223 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
224 Some(Arc::new(CoalesceBatchesExec {
225 input: Arc::clone(&self.input),
226 target_batch_size: self.target_batch_size,
227 fetch: limit,
228 metrics: self.metrics.clone(),
229 cache: Arc::clone(&self.cache),
230 }))
231 }
232
233 fn fetch(&self) -> Option<usize> {
234 self.fetch
235 }
236
237 fn cardinality_effect(&self) -> CardinalityEffect {
238 CardinalityEffect::Equal
239 }
240
241 fn try_swapping_with_projection(
242 &self,
243 projection: &ProjectionExec,
244 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
245 match self.input.try_swapping_with_projection(projection)? {
246 Some(new_input) => Ok(Some(
247 Arc::new(self.clone()).with_new_children(vec![new_input])?,
248 )),
249 None => Ok(None),
250 }
251 }
252
253 fn gather_filters_for_pushdown(
254 &self,
255 _phase: FilterPushdownPhase,
256 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
257 _config: &ConfigOptions,
258 ) -> Result<FilterDescription> {
259 FilterDescription::from_children(parent_filters, &self.children())
260 }
261
262 fn handle_child_pushdown_result(
263 &self,
264 _phase: FilterPushdownPhase,
265 child_pushdown_result: ChildPushdownResult,
266 _config: &ConfigOptions,
267 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
268 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
269 }
270
271 fn try_pushdown_sort(
272 &self,
273 order: &[PhysicalSortExpr],
274 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
275 self.input.try_pushdown_sort(order)?.try_map(|new_input| {
278 Ok(Arc::new(
279 CoalesceBatchesExec::new(new_input, self.target_batch_size)
280 .with_fetch(self.fetch),
281 ) as Arc<dyn ExecutionPlan>)
282 })
283 }
284}
285
286struct CoalesceBatchesStream {
288 input: SendableRecordBatchStream,
290 coalescer: LimitedBatchCoalescer,
292 baseline_metrics: BaselineMetrics,
294 completed: bool,
296}
297
298impl Stream for CoalesceBatchesStream {
299 type Item = Result<RecordBatch>;
300
301 fn poll_next(
302 mut self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 ) -> Poll<Option<Self::Item>> {
305 let poll = self.poll_next_inner(cx);
306 self.baseline_metrics.record_poll(poll)
307 }
308
309 fn size_hint(&self) -> (usize, Option<usize>) {
310 self.input.size_hint()
312 }
313}
314
315impl CoalesceBatchesStream {
316 fn poll_next_inner(
317 self: &mut Pin<&mut Self>,
318 cx: &mut Context<'_>,
319 ) -> Poll<Option<Result<RecordBatch>>> {
320 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
321 loop {
322 if let Some(batch) = self.coalescer.next_completed_batch() {
324 return Poll::Ready(Some(Ok(batch)));
325 }
326 if self.completed {
327 return Poll::Ready(None);
329 }
330 let input_batch = ready!(self.input.poll_next_unpin(cx));
332 let _timer = cloned_time.timer();
334
335 match input_batch {
336 None => {
337 self.completed = true;
339 self.input =
340 Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema()));
341 self.coalescer.finish()?;
342 }
343 Some(Ok(batch)) => {
344 match self.coalescer.push_batch(batch)? {
345 PushBatchStatus::Continue => {
346 }
348 PushBatchStatus::LimitReached => {
349 self.completed = true;
351 self.input = Box::pin(EmptyRecordBatchStream::new(
352 self.coalescer.schema(),
353 ));
354 self.coalescer.finish()?;
355 }
356 }
357 }
358 other => return Poll::Ready(other),
360 }
361 }
362 }
363}
364
365impl RecordBatchStream for CoalesceBatchesStream {
366 fn schema(&self) -> SchemaRef {
367 self.coalescer.schema()
368 }
369}