datafusion_physical_plan/
coalesce_batches.rs1use std::any::Any;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
26use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
27use crate::{
28 DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
29};
30
31use arrow::datatypes::SchemaRef;
32use arrow::record_batch::RecordBatch;
33use datafusion_common::Result;
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::PhysicalExpr;
36
37use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus};
38use crate::execution_plan::CardinalityEffect;
39use crate::filter_pushdown::{
40 ChildPushdownResult, FilterDescription, FilterPushdownPhase,
41 FilterPushdownPropagation,
42};
43use datafusion_common::config::ConfigOptions;
44use futures::ready;
45use futures::stream::{Stream, StreamExt};
46
47#[derive(Debug, Clone)]
58pub struct CoalesceBatchesExec {
59 input: Arc<dyn ExecutionPlan>,
61 target_batch_size: usize,
63 fetch: Option<usize>,
65 metrics: ExecutionPlanMetricsSet,
67 cache: PlanProperties,
68}
69
70impl CoalesceBatchesExec {
71 pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
73 let cache = Self::compute_properties(&input);
74 Self {
75 input,
76 target_batch_size,
77 fetch: None,
78 metrics: ExecutionPlanMetricsSet::new(),
79 cache,
80 }
81 }
82
83 pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
85 self.fetch = fetch;
86 self
87 }
88
89 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
91 &self.input
92 }
93
94 pub fn target_batch_size(&self) -> usize {
96 self.target_batch_size
97 }
98
99 fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
101 PlanProperties::new(
104 input.equivalence_properties().clone(), input.output_partitioning().clone(), input.pipeline_behavior(),
107 input.boundedness(),
108 )
109 }
110}
111
112impl DisplayAs for CoalesceBatchesExec {
113 fn fmt_as(
114 &self,
115 t: DisplayFormatType,
116 f: &mut std::fmt::Formatter,
117 ) -> std::fmt::Result {
118 match t {
119 DisplayFormatType::Default | DisplayFormatType::Verbose => {
120 write!(
121 f,
122 "CoalesceBatchesExec: target_batch_size={}",
123 self.target_batch_size,
124 )?;
125 if let Some(fetch) = self.fetch {
126 write!(f, ", fetch={fetch}")?;
127 };
128
129 Ok(())
130 }
131 DisplayFormatType::TreeRender => {
132 writeln!(f, "target_batch_size={}", self.target_batch_size)?;
133 if let Some(fetch) = self.fetch {
134 write!(f, "limit={fetch}")?;
135 };
136 Ok(())
137 }
138 }
139 }
140}
141
142impl ExecutionPlan for CoalesceBatchesExec {
143 fn name(&self) -> &'static str {
144 "CoalesceBatchesExec"
145 }
146
147 fn as_any(&self) -> &dyn Any {
149 self
150 }
151
152 fn properties(&self) -> &PlanProperties {
153 &self.cache
154 }
155
156 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157 vec![&self.input]
158 }
159
160 fn maintains_input_order(&self) -> Vec<bool> {
161 vec![true]
162 }
163
164 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
165 vec![false]
166 }
167
168 fn with_new_children(
169 self: Arc<Self>,
170 children: Vec<Arc<dyn ExecutionPlan>>,
171 ) -> Result<Arc<dyn ExecutionPlan>> {
172 Ok(Arc::new(
173 CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size)
174 .with_fetch(self.fetch),
175 ))
176 }
177
178 fn execute(
179 &self,
180 partition: usize,
181 context: Arc<TaskContext>,
182 ) -> Result<SendableRecordBatchStream> {
183 Ok(Box::pin(CoalesceBatchesStream {
184 input: self.input.execute(partition, context)?,
185 coalescer: LimitedBatchCoalescer::new(
186 self.input.schema(),
187 self.target_batch_size,
188 self.fetch,
189 ),
190 baseline_metrics: BaselineMetrics::new(&self.metrics, partition),
191 completed: false,
192 }))
193 }
194
195 fn metrics(&self) -> Option<MetricsSet> {
196 Some(self.metrics.clone_inner())
197 }
198
199 fn statistics(&self) -> Result<Statistics> {
200 self.partition_statistics(None)
201 }
202
203 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
204 self.input
205 .partition_statistics(partition)?
206 .with_fetch(self.fetch, 0, 1)
207 }
208
209 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
210 Some(Arc::new(CoalesceBatchesExec {
211 input: Arc::clone(&self.input),
212 target_batch_size: self.target_batch_size,
213 fetch: limit,
214 metrics: self.metrics.clone(),
215 cache: self.cache.clone(),
216 }))
217 }
218
219 fn fetch(&self) -> Option<usize> {
220 self.fetch
221 }
222
223 fn cardinality_effect(&self) -> CardinalityEffect {
224 CardinalityEffect::Equal
225 }
226
227 fn gather_filters_for_pushdown(
228 &self,
229 _phase: FilterPushdownPhase,
230 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
231 _config: &ConfigOptions,
232 ) -> Result<FilterDescription> {
233 FilterDescription::from_children(parent_filters, &self.children())
234 }
235
236 fn handle_child_pushdown_result(
237 &self,
238 _phase: FilterPushdownPhase,
239 child_pushdown_result: ChildPushdownResult,
240 _config: &ConfigOptions,
241 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
242 Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
243 }
244}
245
246struct CoalesceBatchesStream {
248 input: SendableRecordBatchStream,
250 coalescer: LimitedBatchCoalescer,
252 baseline_metrics: BaselineMetrics,
254 completed: bool,
256}
257
258impl Stream for CoalesceBatchesStream {
259 type Item = Result<RecordBatch>;
260
261 fn poll_next(
262 mut self: Pin<&mut Self>,
263 cx: &mut Context<'_>,
264 ) -> Poll<Option<Self::Item>> {
265 let poll = self.poll_next_inner(cx);
266 self.baseline_metrics.record_poll(poll)
267 }
268
269 fn size_hint(&self) -> (usize, Option<usize>) {
270 self.input.size_hint()
272 }
273}
274
275impl CoalesceBatchesStream {
276 fn poll_next_inner(
277 self: &mut Pin<&mut Self>,
278 cx: &mut Context<'_>,
279 ) -> Poll<Option<Result<RecordBatch>>> {
280 let cloned_time = self.baseline_metrics.elapsed_compute().clone();
281 loop {
282 if let Some(batch) = self.coalescer.next_completed_batch() {
284 return Poll::Ready(Some(Ok(batch)));
285 }
286 if self.completed {
287 return Poll::Ready(None);
289 }
290 let input_batch = ready!(self.input.poll_next_unpin(cx));
292 let _timer = cloned_time.timer();
294
295 match input_batch {
296 None => {
297 self.completed = true;
299 self.coalescer.finish()?;
300 }
301 Some(Ok(batch)) => {
302 match self.coalescer.push_batch(batch)? {
303 PushBatchStatus::Continue => {
304 }
306 PushBatchStatus::LimitReached => {
307 self.completed = true;
309 self.coalescer.finish()?;
310 }
311 }
312 }
313 other => return Poll::Ready(other),
315 }
316 }
317 }
318}
319
320impl RecordBatchStream for CoalesceBatchesStream {
321 fn schema(&self) -> SchemaRef {
322 self.coalescer.schema()
323 }
324}