datafusion_physical_plan/
coalesce_batches.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::projection::ProjectionExec;
28use crate::{
29 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
30};
31
32use arrow::datatypes::SchemaRef;
33use arrow::record_batch::RecordBatch;
34use datafusion_common::Result;
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::PhysicalExpr;
37
38use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
39use crate::execution_plan::CardinalityEffect;
40use crate::filter_pushdown::{
41 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
42 FilterPushdownPropagation,
43};
44use crate::sort_pushdown::SortOrderPushdownResult;
45use datafusion_common::config::ConfigOptions;
46use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
47use futures::ready;
48use futures::stream::{Stream, StreamExt};
49
50#[derive(Debug, Clone)]
61pub struct CoalesceBatchesExec {
62 input: Arc<dyn ExecutionPlan>,
64 target_batch_size: usize,
66 fetch: Option<usize>,
68 metrics: ExecutionPlanMetricsSet,
70 cache: PlanProperties,
71}
72
73impl CoalesceBatchesExec {
74 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
76 let cache = Self::compute_properties(&input);
77 Self {
78 input,
79 target_batch_size,
80 fetch: None,
81 metrics: ExecutionPlanMetricsSet::new(),
82 cache,
83 }
84 }
85
86 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
88 self.fetch = fetch;
89 self
90 }
91
92 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
94 &self.input
95 }
96
97 pub fn target_batch_size(&self) -> usize {
99 self.target_batch_size
100 }
101
102 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
104 PlanProperties::new(
107 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
110 input.boundedness(),
111 )
112 }
113}
114
115impl DisplayAs for CoalesceBatchesExec {
116 fn fmt_as(
117 &self,
118 t: DisplayFormatType,
119 f: &mut std::fmt::Formatter,
120 ) -> std::fmt::Result {
121 match t {
122 DisplayFormatType::Default | DisplayFormatType::Verbose => {
123 write!(
124 f,
125 "CoalesceBatchesExec: target_batch_size={}",
126 self.target_batch_size,
127 )?;
128 if let Some(fetch) = self.fetch {
129 write!(f, ", fetch={fetch}")?;
130 };
131
132 Ok(())
133 }
134 DisplayFormatType::TreeRender => {
135 writeln!(f, "target_batch_size={}", self.target_batch_size)?;
136 if let Some(fetch) = self.fetch {
137 write!(f, "limit={fetch}")?;
138 };
139 Ok(())
140 }
141 }
142 }
143}
144
145impl ExecutionPlan for CoalesceBatchesExec {
146 fn name(&self) -> &'static str {
147 "CoalesceBatchesExec"
148 }
149
150 fn as_any(&self) -> &dyn Any {
152 self
153 }
154
155 fn properties(&self) -> &PlanProperties {
156 &self.cache
157 }
158
159 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
160 vec![&self.input]
161 }
162
163 fn maintains_input_order(&self) -> Vec<bool> {
164 vec![true]
165 }
166
167 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
168 vec![false]
169 }
170
171 fn with_new_children(
172 self: Arc<Self>,
173 children: Vec<Arc<dyn ExecutionPlan>>,
174 ) -> Result<Arc<dyn ExecutionPlan>> {
175 Ok(Arc::new(
176 CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
177 .with_fetch(self.fetch),
178 ))
179 }
180
181 fn execute(
182 &self,
183 partition: usize,
184 context: Arc<TaskContext>,
185 ) -> Result<SendableRecordBatchStream> {
186 Ok(Box::pin(CoalesceBatchesStream {
187 input: self.input.execute(partition, context)?,
188 coalescer: LimitedBatchCoalescer::new(
189 self.input.schema(),
190 self.target_batch_size,
191 self.fetch,
192 ),
193 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
194 completed: false,
195 }))
196 }
197
198 fn metrics(&self) -> Option<MetricsSet> {
199 Some(self.metrics.clone_inner())
200 }
201
202 fn statistics(&self) -> Result<Statistics> {
203 self.partition_statistics(None)
204 }
205
206 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
207 self.input
208 .partition_statistics(partition)?
209 .with_fetch(self.fetch, 0, 1)
210 }
211
212 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
213 Some(Arc::new(CoalesceBatchesExec {
214 input: Arc::clone(&self.input),
215 target_batch_size: self.target_batch_size,
216 fetch: limit,
217 metrics: self.metrics.clone(),
218 cache: self.cache.clone(),
219 }))
220 }
221
222 fn fetch(&self) -> Option<usize> {
223 self.fetch
224 }
225
226 fn cardinality_effect(&self) -> CardinalityEffect {
227 CardinalityEffect::Equal
228 }
229
230 fn try_swapping_with_projection(
231 &self,
232 projection: &ProjectionExec,
233 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
234 match self.input.try_swapping_with_projection(projection)? {
235 Some(new_input) => Ok(Some(
236 Arc::new(self.clone()).with_new_children(vec![new_input])?,
237 )),
238 None => Ok(None),
239 }
240 }
241
242 fn gather_filters_for_pushdown(
243 &self,
244 _phase: FilterPushdownPhase,
245 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
246 _config: &ConfigOptions,
247 ) -> Result<FilterDescription> {
248 FilterDescription::from_children(parent_filters, &self.children())
249 }
250
251 fn handle_child_pushdown_result(
252 &self,
253 _phase: FilterPushdownPhase,
254 child_pushdown_result: ChildPushdownResult,
255 _config: &ConfigOptions,
256 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
257 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
258 }
259
260 fn try_pushdown_sort(
261 &self,
262 order: &[PhysicalSortExpr],
263 ) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
264 self.input.try_pushdown_sort(order)?.try_map(|new_input| {
267 Ok(Arc::new(
268 CoalesceBatchesExec::new(new_input, self.target_batch_size)
269 .with_fetch(self.fetch),
270 ) as Arc<dyn ExecutionPlan>)
271 })
272 }
273}
274
275struct CoalesceBatchesStream {
277 input: SendableRecordBatchStream,
279 coalescer: LimitedBatchCoalescer,
281 baseline_metrics: BaselineMetrics,
283 completed: bool,
285}
286
287impl Stream for CoalesceBatchesStream {
288 type Item = Result<RecordBatch>;
289
290 fn poll_next(
291 mut self: Pin<&mut Self>,
292 cx: &mut Context<'_>,
293 ) -> Poll<Option<Self::Item>> {
294 let poll = self.poll_next_inner(cx);
295 self.baseline_metrics.record_poll(poll)
296 }
297
298 fn size_hint(&self) -> (usize, Option<usize>) {
299 self.input.size_hint()
301 }
302}
303
304impl CoalesceBatchesStream {
305 fn poll_next_inner(
306 self: &mut Pin<&mut Self>,
307 cx: &mut Context<'_>,
308 ) -> Poll<Option<Result<RecordBatch>>> {
309 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
310 loop {
311 if let Some(batch) = self.coalescer.next_completed_batch() {
313 return Poll::Ready(Some(Ok(batch)));
314 }
315 if self.completed {
316 return Poll::Ready(None);
318 }
319 let input_batch = ready!(self.input.poll_next_unpin(cx));
321 let _timer = cloned_time.timer();
323
324 match input_batch {
325 None => {
326 self.completed = true;
328 self.coalescer.finish()?;
329 }
330 Some(Ok(batch)) => {
331 match self.coalescer.push_batch(batch)? {
332 PushBatchStatus::Continue => {
333 }
335 PushBatchStatus::LimitReached => {
336 self.completed = true;
338 self.coalescer.finish()?;
339 }
340 }
341 }
342 other => return Poll::Ready(other),
344 }
345 }
346 }
347}
348
349impl RecordBatchStream for CoalesceBatchesStream {
350 fn schema(&self) -> SchemaRef {
351 self.coalescer.schema()
352 }
353}