datafusion_physical_plan/
coalesce_partitions.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the merge plan for executing partitions in parallel and then merging the results
19//! into a single partition
20
21use std::any::Any;
22use std::sync::Arc;
23
24use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
25use super::stream::{ObservedStream, RecordBatchReceiverStream};
26use super::{
27    DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream,
28    Statistics,
29};
30use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
31use crate::projection::{make_with_child, ProjectionExec};
32use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
33
34use datafusion_common::{internal_err, Result};
35use datafusion_execution::TaskContext;
36
37/// Merge execution plan executes partitions in parallel and combines them into a single
38/// partition. No guarantees are made about the order of the resulting partition.
39#[derive(Debug, Clone)]
40pub struct CoalescePartitionsExec {
41    /// Input execution plan
42    input: Arc<dyn ExecutionPlan>,
43    /// Execution metrics
44    metrics: ExecutionPlanMetricsSet,
45    cache: PlanProperties,
46    /// Optional number of rows to fetch. Stops producing rows after this fetch
47    pub(crate) fetch: Option<usize>,
48}
49
50impl CoalescePartitionsExec {
51    /// Create a new CoalescePartitionsExec
52    pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
53        let cache = Self::compute_properties(&input);
54        CoalescePartitionsExec {
55            input,
56            metrics: ExecutionPlanMetricsSet::new(),
57            cache,
58            fetch: None,
59        }
60    }
61
62    /// Update fetch with the argument
63    pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
64        self.fetch = fetch;
65        self
66    }
67
68    /// Input execution plan
69    pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
70        &self.input
71    }
72
73    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
74    fn compute_properties(input: &Arc<dyn ExecutionPlan>) -> PlanProperties {
75        let input_partitions = input.output_partitioning().partition_count();
76        let (drive, scheduling) = if input_partitions > 1 {
77            (EvaluationType::Eager, SchedulingType::Cooperative)
78        } else {
79            (
80                input.properties().evaluation_type,
81                input.properties().scheduling_type,
82            )
83        };
84
85        // Coalescing partitions loses existing orderings:
86        let mut eq_properties = input.equivalence_properties().clone();
87        eq_properties.clear_orderings();
88        eq_properties.clear_per_partition_constants();
89        PlanProperties::new(
90            eq_properties,                        // Equivalence Properties
91            Partitioning::UnknownPartitioning(1), // Output Partitioning
92            input.pipeline_behavior(),
93            input.boundedness(),
94        )
95        .with_evaluation_type(drive)
96        .with_scheduling_type(scheduling)
97    }
98}
99
100impl DisplayAs for CoalescePartitionsExec {
101    fn fmt_as(
102        &self,
103        t: DisplayFormatType,
104        f: &mut std::fmt::Formatter,
105    ) -> std::fmt::Result {
106        match t {
107            DisplayFormatType::Default | DisplayFormatType::Verbose => match self.fetch {
108                Some(fetch) => {
109                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
110                }
111                None => write!(f, "CoalescePartitionsExec"),
112            },
113            DisplayFormatType::TreeRender => match self.fetch {
114                Some(fetch) => {
115                    write!(f, "limit: {fetch}")
116                }
117                None => write!(f, ""),
118            },
119        }
120    }
121}
122
123impl ExecutionPlan for CoalescePartitionsExec {
124    fn name(&self) -> &'static str {
125        "CoalescePartitionsExec"
126    }
127
128    /// Return a reference to Any that can be used for downcasting
129    fn as_any(&self) -> &dyn Any {
130        self
131    }
132
133    fn properties(&self) -> &PlanProperties {
134        &self.cache
135    }
136
137    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
138        vec![&self.input]
139    }
140
141    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
142        vec![false]
143    }
144
145    fn with_new_children(
146        self: Arc<Self>,
147        children: Vec<Arc<dyn ExecutionPlan>>,
148    ) -> Result<Arc<dyn ExecutionPlan>> {
149        let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
150        plan.fetch = self.fetch;
151        Ok(Arc::new(plan))
152    }
153
154    fn execute(
155        &self,
156        partition: usize,
157        context: Arc<TaskContext>,
158    ) -> Result<SendableRecordBatchStream> {
159        // CoalescePartitionsExec produces a single partition
160        if 0 != partition {
161            return internal_err!("CoalescePartitionsExec invalid partition {partition}");
162        }
163
164        let input_partitions = self.input.output_partitioning().partition_count();
165        match input_partitions {
166            0 => internal_err!(
167                "CoalescePartitionsExec requires at least one input partition"
168            ),
169            1 => {
170                // bypass any threading / metrics if there is a single partition
171                self.input.execute(0, context)
172            }
173            _ => {
174                let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
175                // record the (very) minimal work done so that
176                // elapsed_compute is not reported as 0
177                let elapsed_compute = baseline_metrics.elapsed_compute().clone();
178                let _timer = elapsed_compute.timer();
179
180                // use a stream that allows each sender to put in at
181                // least one result in an attempt to maximize
182                // parallelism.
183                let mut builder =
184                    RecordBatchReceiverStream::builder(self.schema(), input_partitions);
185
186                // spawn independent tasks whose resulting streams (of batches)
187                // are sent to the channel for consumption.
188                for part_i in 0..input_partitions {
189                    builder.run_input(
190                        Arc::clone(&self.input),
191                        part_i,
192                        Arc::clone(&context),
193                    );
194                }
195
196                let stream = builder.build();
197                Ok(Box::pin(ObservedStream::new(
198                    stream,
199                    baseline_metrics,
200                    self.fetch,
201                )))
202            }
203        }
204    }
205
206    fn metrics(&self) -> Option<MetricsSet> {
207        Some(self.metrics.clone_inner())
208    }
209
210    fn statistics(&self) -> Result<Statistics> {
211        self.partition_statistics(None)
212    }
213
214    fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
215        self.input
216            .partition_statistics(None)?
217            .with_fetch(self.schema(), self.fetch, 0, 1)
218    }
219
220    fn supports_limit_pushdown(&self) -> bool {
221        true
222    }
223
224    fn cardinality_effect(&self) -> CardinalityEffect {
225        CardinalityEffect::Equal
226    }
227
228    /// Tries to swap `projection` with its input, which is known to be a
229    /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns
230    /// [`CoalescePartitionsExec`] as the top plan. Otherwise, returns `None`.
231    fn try_swapping_with_projection(
232        &self,
233        projection: &ProjectionExec,
234    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
235        // If the projection does not narrow the schema, we should not try to push it down:
236        if projection.expr().len() >= projection.input().schema().fields().len() {
237            return Ok(None);
238        }
239        // CoalescePartitionsExec always has a single child, so zero indexing is safe.
240        make_with_child(projection, projection.input().children()[0]).map(|e| {
241            if self.fetch.is_some() {
242                let mut plan = CoalescePartitionsExec::new(e);
243                plan.fetch = self.fetch;
244                Some(Arc::new(plan) as _)
245            } else {
246                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
247            }
248        })
249    }
250
251    fn fetch(&self) -> Option<usize> {
252        self.fetch
253    }
254
255    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
256        Some(Arc::new(CoalescePartitionsExec {
257            input: Arc::clone(&self.input),
258            fetch: limit,
259            metrics: self.metrics.clone(),
260            cache: self.cache.clone(),
261        }))
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use crate::test::exec::{
269        assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
270    };
271    use crate::test::{self, assert_is_pending};
272    use crate::{collect, common};
273
274    use arrow::datatypes::{DataType, Field, Schema};
275
276    use futures::FutureExt;
277
278    #[tokio::test]
279    async fn merge() -> Result<()> {
280        let task_ctx = Arc::new(TaskContext::default());
281
282        let num_partitions = 4;
283        let csv = test::scan_partitioned(num_partitions);
284
285        // input should have 4 partitions
286        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
287
288        let merge = CoalescePartitionsExec::new(csv);
289
290        // output of CoalescePartitionsExec should have a single partition
291        assert_eq!(
292            merge.properties().output_partitioning().partition_count(),
293            1
294        );
295
296        // the result should contain 4 batches (one per input partition)
297        let iter = merge.execute(0, task_ctx)?;
298        let batches = common::collect(iter).await?;
299        assert_eq!(batches.len(), num_partitions);
300
301        // there should be a total of 400 rows (100 per each partition)
302        let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
303        assert_eq!(row_count, 400);
304
305        Ok(())
306    }
307
308    #[tokio::test]
309    async fn test_drop_cancel() -> Result<()> {
310        let task_ctx = Arc::new(TaskContext::default());
311        let schema =
312            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
313
314        let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 2));
315        let refs = blocking_exec.refs();
316        let coalesce_partitions_exec =
317            Arc::new(CoalescePartitionsExec::new(blocking_exec));
318
319        let fut = collect(coalesce_partitions_exec, task_ctx);
320        let mut fut = fut.boxed();
321
322        assert_is_pending(&mut fut);
323        drop(fut);
324        assert_strong_count_converges_to_zero(refs).await;
325
326        Ok(())
327    }
328
329    #[tokio::test]
330    #[should_panic(expected = "PanickingStream did panic")]
331    async fn test_panic() {
332        let task_ctx = Arc::new(TaskContext::default());
333        let schema =
334            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
335
336        let panicking_exec = Arc::new(PanicExec::new(Arc::clone(&schema), 2));
337        let coalesce_partitions_exec =
338            Arc::new(CoalescePartitionsExec::new(panicking_exec));
339
340        collect(coalesce_partitions_exec, task_ctx).await.unwrap();
341    }
342}