datafusion_physical_plan/
coalesce_batches.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::PhysicalExpr;
36
37use crate::coalesce::{BatchCoalescer, CoalescerState};
38use crate::execution_plan::CardinalityEffect;
39use crate::filter_pushdown::{
40 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
41 FilterPushdownPropagation,
42};
43use datafusion_common::config::ConfigOptions;
44use futures::ready;
45use futures::stream::{Stream, StreamExt};
46
47#[derive(Debug, Clone)]
58pub struct CoalesceBatchesExec {
59 input: Arc<dyn ExecutionPlan>,
61 target_batch_size: usize,
63 fetch: Option<usize>,
65 metrics: ExecutionPlanMetricsSet,
67 cache: PlanProperties,
68}
69
70impl CoalesceBatchesExec {
71 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
73 let cache = Self::compute_properties(&input);
74 Self {
75 input,
76 target_batch_size,
77 fetch: None,
78 metrics: ExecutionPlanMetricsSet::new(),
79 cache,
80 }
81 }
82
83 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85 self.fetch = fetch;
86 self
87 }
88
89 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
91 &self.input
92 }
93
94 pub fn target_batch_size(&self) -> usize {
96 self.target_batch_size
97 }
98
99 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
101 PlanProperties::new(
104 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
107 input.boundedness(),
108 )
109 }
110}
111
112impl DisplayAs for CoalesceBatchesExec {
113 fn fmt_as(
114 &self,
115 t: DisplayFormatType,
116 f: &mut std::fmt::Formatter,
117 ) -> std::fmt::Result {
118 match t {
119 DisplayFormatType::Default | DisplayFormatType::Verbose => {
120 write!(
121 f,
122 "CoalesceBatchesExec: target_batch_size={}",
123 self.target_batch_size,
124 )?;
125 if let Some(fetch) = self.fetch {
126 write!(f, ", fetch={fetch}")?;
127 };
128
129 Ok(())
130 }
131 DisplayFormatType::TreeRender => {
132 writeln!(f, "target_batch_size={}", self.target_batch_size)?;
133 if let Some(fetch) = self.fetch {
134 write!(f, "limit={fetch}")?;
135 };
136 Ok(())
137 }
138 }
139 }
140}
141
142impl ExecutionPlan for CoalesceBatchesExec {
143 fn name(&self) -> &'static str {
144 "CoalesceBatchesExec"
145 }
146
147 fn as_any(&self) -> &dyn Any {
149 self
150 }
151
152 fn properties(&self) -> &PlanProperties {
153 &self.cache
154 }
155
156 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157 vec![&self.input]
158 }
159
160 fn maintains_input_order(&self) -> Vec<bool> {
161 vec![true]
162 }
163
164 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
165 vec![false]
166 }
167
168 fn with_new_children(
169 self: Arc<Self>,
170 children: Vec<Arc<dyn ExecutionPlan>>,
171 ) -> Result<Arc<dyn ExecutionPlan>> {
172 Ok(Arc::new(
173 CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
174 .with_fetch(self.fetch),
175 ))
176 }
177
178 fn execute(
179 &self,
180 partition: usize,
181 context: Arc<TaskContext>,
182 ) -> Result<SendableRecordBatchStream> {
183 Ok(Box::pin(CoalesceBatchesStream {
184 input: self.input.execute(partition, context)?,
185 coalescer: BatchCoalescer::new(
186 self.input.schema(),
187 self.target_batch_size,
188 self.fetch,
189 ),
190 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
191 inner_state: CoalesceBatchesStreamState::Pull,
193 }))
194 }
195
196 fn metrics(&self) -> Option<MetricsSet> {
197 Some(self.metrics.clone_inner())
198 }
199
200 fn statistics(&self) -> Result<Statistics> {
201 self.partition_statistics(None)
202 }
203
204 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
205 self.input.partition_statistics(partition)?.with_fetch(
206 self.schema(),
207 self.fetch,
208 0,
209 1,
210 )
211 }
212
213 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
214 Some(Arc::new(CoalesceBatchesExec {
215 input: Arc::clone(&self.input),
216 target_batch_size: self.target_batch_size,
217 fetch: limit,
218 metrics: self.metrics.clone(),
219 cache: self.cache.clone(),
220 }))
221 }
222
223 fn fetch(&self) -> Option<usize> {
224 self.fetch
225 }
226
227 fn cardinality_effect(&self) -> CardinalityEffect {
228 CardinalityEffect::Equal
229 }
230
231 fn gather_filters_for_pushdown(
232 &self,
233 _phase: FilterPushdownPhase,
234 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
235 _config: &ConfigOptions,
236 ) -> Result<FilterDescription> {
237 FilterDescription::from_children(parent_filters, &self.children())
238 }
239
240 fn handle_child_pushdown_result(
241 &self,
242 _phase: FilterPushdownPhase,
243 child_pushdown_result: ChildPushdownResult,
244 _config: &ConfigOptions,
245 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
246 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
247 }
248}
249
250struct CoalesceBatchesStream {
252 input: SendableRecordBatchStream,
254 coalescer: BatchCoalescer,
256 baseline_metrics: BaselineMetrics,
258 inner_state: CoalesceBatchesStreamState,
261}
262
263impl Stream for CoalesceBatchesStream {
264 type Item = Result<RecordBatch>;
265
266 fn poll_next(
267 mut self: Pin<&mut Self>,
268 cx: &mut Context<'_>,
269 ) -> Poll<Option<Self::Item>> {
270 let poll = self.poll_next_inner(cx);
271 self.baseline_metrics.record_poll(poll)
272 }
273
274 fn size_hint(&self) -> (usize, Option<usize>) {
275 self.input.size_hint()
277 }
278}
279
280#[derive(Debug, Clone, Eq, PartialEq)]
315enum CoalesceBatchesStreamState {
316 Pull,
318 ReturnBuffer,
320 Exhausted,
322}
323
324impl CoalesceBatchesStream {
325 fn poll_next_inner(
326 self: &mut Pin<&mut Self>,
327 cx: &mut Context<'_>,
328 ) -> Poll<Option<Result<RecordBatch>>> {
329 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
330 loop {
331 match &self.inner_state {
332 CoalesceBatchesStreamState::Pull => {
333 let input_batch = ready!(self.input.poll_next_unpin(cx));
335 let _timer = cloned_time.timer();
337
338 match input_batch {
339 Some(Ok(batch)) => match self.coalescer.push_batch(batch) {
340 CoalescerState::Continue => {}
341 CoalescerState::LimitReached => {
342 self.inner_state = CoalesceBatchesStreamState::Exhausted;
343 }
344 CoalescerState::TargetReached => {
345 self.inner_state =
346 CoalesceBatchesStreamState::ReturnBuffer;
347 }
348 },
349 None => {
350 self.inner_state = CoalesceBatchesStreamState::Exhausted;
352 }
353 other => return Poll::Ready(other),
354 }
355 }
356 CoalesceBatchesStreamState::ReturnBuffer => {
357 let _timer = cloned_time.timer();
358 let batch = self.coalescer.finish_batch()?;
360 self.inner_state = CoalesceBatchesStreamState::Pull;
362 return Poll::Ready(Some(Ok(batch)));
363 }
364 CoalesceBatchesStreamState::Exhausted => {
365 return if self.coalescer.is_empty() {
367 Poll::Ready(None)
369 } else {
370 let _timer = cloned_time.timer();
371 let batch = self.coalescer.finish_batch()?;
373 Poll::Ready(Some(Ok(batch)))
374 };
375 }
376 }
377 }
378 }
379}
380
381impl RecordBatchStream for CoalesceBatchesStream {
382 fn schema(&self) -> SchemaRef {
383 self.coalescer.schema()
384 }
385}