datafusion_physical_plan/
coalesce_batches.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::projection::ProjectionExec;
28use crate::{
29 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30 check_if_same_properties,
31};
32
33use arrow::datatypes::SchemaRef;
34use arrow::record_batch::RecordBatch;
35use datafusion_common::Result;
36use datafusion_execution::TaskContext;
37use datafusion_physical_expr::PhysicalExpr;
38
39use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
40use crate::execution_plan::CardinalityEffect;
41use crate::filter_pushdown::{
42 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
43 FilterPushdownPropagation,
44};
45use crate::sort_pushdown::SortOrderPushdownResult;
46use datafusion_common::config::ConfigOptions;
47use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
48use futures::ready;
49use futures::stream::{Stream, StreamExt};
50
51#[deprecated(
62 since = "52.0.0",
63 note = "We now use BatchCoalescer from arrow-rs instead of a dedicated operator"
64)]
65#[derive(Debug, Clone)]
66pub struct CoalesceBatchesExec {
67 input: Arc<dyn ExecutionPlan>,
69 target_batch_size: usize,
71 fetch: Option<usize>,
73 metrics: ExecutionPlanMetricsSet,
75 cache: Arc<PlanProperties>,
76}
77
78#[expect(deprecated)]
79impl CoalesceBatchesExec {
80 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
82 let cache = Self::compute_properties(&input);
83 Self {
84 input,
85 target_batch_size,
86 fetch: None,
87 metrics: ExecutionPlanMetricsSet::new(),
88 cache: Arc::new(cache),
89 }
90 }
91
92 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
94 self.fetch = fetch;
95 self
96 }
97
98 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
100 &self.input
101 }
102
103 pub fn target_batch_size(&self) -> usize {
105 self.target_batch_size
106 }
107
108 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
110 PlanProperties::new(
113 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
116 input.boundedness(),
117 )
118 }
119
120 fn with_new_children_and_same_properties(
121 &self,
122 mut children: Vec<Arc<dyn ExecutionPlan>>,
123 ) -> Self {
124 Self {
125 input: children.swap_remove(0),
126 metrics: ExecutionPlanMetricsSet::new(),
127 ..Self::clone(self)
128 }
129 }
130}
131
132#[expect(deprecated)]
133impl DisplayAs for CoalesceBatchesExec {
134 fn fmt_as(
135 &self,
136 t: DisplayFormatType,
137 f: &mut std::fmt::Formatter,
138 ) -> std::fmt::Result {
139 match t {
140 DisplayFormatType::Default | DisplayFormatType::Verbose => {
141 write!(
142 f,
143 "CoalesceBatchesExec: target_batch_size={}",
144 self.target_batch_size,
145 )?;
146 if let Some(fetch) = self.fetch {
147 write!(f, ", fetch={fetch}")?;
148 };
149
150 Ok(())
151 }
152 DisplayFormatType::TreeRender => {
153 writeln!(f, "target_batch_size={}", self.target_batch_size)?;
154 if let Some(fetch) = self.fetch {
155 write!(f, "limit={fetch}")?;
156 };
157 Ok(())
158 }
159 }
160 }
161}
162
163#[expect(deprecated)]
164impl ExecutionPlan for CoalesceBatchesExec {
165 fn name(&self) -> &'static str {
166 "CoalesceBatchesExec"
167 }
168
169 fn as_any(&self) -> &dyn Any {
171 self
172 }
173
174 fn properties(&self) -> &Arc<PlanProperties> {
175 &self.cache
176 }
177
178 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
179 vec![&self.input]
180 }
181
182 fn maintains_input_order(&self) -> Vec<bool> {
183 vec![true]
184 }
185
186 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
187 vec![false]
188 }
189
190 fn with_new_children(
191 self: Arc<Self>,
192 mut children: Vec<Arc<dyn ExecutionPlan>>,
193 ) -> Result<Arc<dyn ExecutionPlan>> {
194 check_if_same_properties!(self, children);
195 Ok(Arc::new(
196 CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size)
197 .with_fetch(self.fetch),
198 ))
199 }
200
201 fn execute(
202 &self,
203 partition: usize,
204 context: Arc<TaskContext>,
205 ) -> Result<SendableRecordBatchStream> {
206 Ok(Box::pin(CoalesceBatchesStream {
207 input: self.input.execute(partition, context)?,
208 coalescer: LimitedBatchCoalescer::new(
209 self.input.schema(),
210 self.target_batch_size,
211 self.fetch,
212 ),
213 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
214 completed: false,
215 }))
216 }
217
218 fn metrics(&self) -> Option<MetricsSet> {
219 Some(self.metrics.clone_inner())
220 }
221
222 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
223 self.input
224 .partition_statistics(partition)?
225 .with_fetch(self.fetch, 0, 1)
226 }
227
228 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
229 Some(Arc::new(CoalesceBatchesExec {
230 input: Arc::clone(&self.input),
231 target_batch_size: self.target_batch_size,
232 fetch: limit,
233 metrics: self.metrics.clone(),
234 cache: Arc::clone(&self.cache),
235 }))
236 }
237
238 fn fetch(&self) -> Option<usize> {
239 self.fetch
240 }
241
242 fn cardinality_effect(&self) -> CardinalityEffect {
243 CardinalityEffect::Equal
244 }
245
246 fn try_swapping_with_projection(
247 &self,
248 projection: &ProjectionExec,
249 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
250 match self.input.try_swapping_with_projection(projection)? {
251 Some(new_input) => Ok(Some(
252 Arc::new(self.clone()).with_new_children(vec![new_input])?,
253 )),
254 None => Ok(None),
255 }
256 }
257
258 fn gather_filters_for_pushdown(
259 &self,
260 _phase: FilterPushdownPhase,
261 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
262 _config: &ConfigOptions,
263 ) -> Result<FilterDescription> {
264 FilterDescription::from_children(parent_filters, &self.children())
265 }
266
267 fn handle_child_pushdown_result(
268 &self,
269 _phase: FilterPushdownPhase,
270 child_pushdown_result: ChildPushdownResult,
271 _config: &ConfigOptions,
272 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
273 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
274 }
275
276 fn try_pushdown_sort(
277 &self,
278 order: &[PhysicalSortExpr],
279 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
280 self.input.try_pushdown_sort(order)?.try_map(|new_input| {
283 Ok(Arc::new(
284 CoalesceBatchesExec::new(new_input, self.target_batch_size)
285 .with_fetch(self.fetch),
286 ) as Arc<dyn ExecutionPlan>)
287 })
288 }
289}
290
291struct CoalesceBatchesStream {
293 input: SendableRecordBatchStream,
295 coalescer: LimitedBatchCoalescer,
297 baseline_metrics: BaselineMetrics,
299 completed: bool,
301}
302
303impl Stream for CoalesceBatchesStream {
304 type Item = Result<RecordBatch>;
305
306 fn poll_next(
307 mut self: Pin<&mut Self>,
308 cx: &mut Context<'_>,
309 ) -> Poll<Option<Self::Item>> {
310 let poll = self.poll_next_inner(cx);
311 self.baseline_metrics.record_poll(poll)
312 }
313
314 fn size_hint(&self) -> (usize, Option<usize>) {
315 self.input.size_hint()
317 }
318}
319
320impl CoalesceBatchesStream {
321 fn poll_next_inner(
322 self: &mut Pin<&mut Self>,
323 cx: &mut Context<'_>,
324 ) -> Poll<Option<Result<RecordBatch>>> {
325 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
326 loop {
327 if let Some(batch) = self.coalescer.next_completed_batch() {
329 return Poll::Ready(Some(Ok(batch)));
330 }
331 if self.completed {
332 return Poll::Ready(None);
334 }
335 let input_batch = ready!(self.input.poll_next_unpin(cx));
337 let _timer = cloned_time.timer();
339
340 match input_batch {
341 None => {
342 self.completed = true;
344 self.coalescer.finish()?;
345 }
346 Some(Ok(batch)) => {
347 match self.coalescer.push_batch(batch)? {
348 PushBatchStatus::Continue => {
349 }
351 PushBatchStatus::LimitReached => {
352 self.completed = true;
354 self.coalescer.finish()?;
355 }
356 }
357 }
358 other => return Poll::Ready(other),
360 }
361 }
362 }
363}
364
365impl RecordBatchStream for CoalesceBatchesStream {
366 fn schema(&self) -> SchemaRef {
367 self.coalescer.schema()
368 }
369}