Skip to main content

datafusion_distributed/execution_plans/
partition_isolator.rs

1use crate::DistributedTaskContext;
2use crate::common::require_one_child;
3use datafusion::execution::TaskContext;
4use datafusion::physical_expr_common::metrics::MetricsSet;
5use datafusion::physical_plan::ExecutionPlanProperties;
6use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder};
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::{
9    error::Result,
10    execution::SendableRecordBatchStream,
11    physical_plan::{
12        DisplayAs, DisplayFormatType, EmptyRecordBatchStream, ExecutionPlan, Partitioning,
13        PlanProperties,
14    },
15};
16use futures::TryStreamExt;
17use std::{fmt::Formatter, sync::Arc};
18
19/// This is a simple [ExecutionPlan] that isolates a set of N partitions from an input
20/// [ExecutionPlan] with M partitions, where N < M.
21///
22/// It will advertise to upper nodes that only N partitions are available, even though the child
23/// plan might have more.
24///
25/// The partitions exposed to upper nodes depend on:
26/// 1. the amount of tasks in the stage in which [PartitionIsolatorExec] is in.
27/// 2. the task index executing the [PartitionIsolatorExec] node.
28///
29/// ```text
30///                                ┌───────────────────────────┐                                   ■
31///                                │    NetworkCoalesceExec    │                                   │
32///                                │         (task 1)          │                                   │
33///                                └┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┘                                Stage N+1
34///                                 │1││2││3││4││5││6││7││8││9│                                    │
35///                                 └─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘                                    │
36///                                 ▲  ▲  ▲   ▲  ▲  ▲   ▲  ▲  ▲                                    ■
37///   ┌──┬──┬───────────────────────┴──┴──┘   │  │  │   └──┴──┴──────────────────────┬──┬──┐
38///   │  │  │                                 │  │  │                                │  │  │       ■
39///  ┌─┐┌─┐┌─┐                               ┌─┐┌─┐┌─┐                              ┌─┐┌─┐┌─┐      │
40///  │1││2││3│                               │4││5││6│                              │7││8││9│      │
41/// ┌┴─┴┴─┴┴─┴──────────────────┐  ┌─────────┴─┴┴─┴┴─┴─────────┐ ┌──────────────────┴─┴┴─┴┴─┴┐     │
42/// │   PartitionIsolatorExec   │  │   PartitionIsolatorExec   │ │   PartitionIsolatorExec   │     │
43/// │         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
44/// └─▲──▲──▲───────────────────┘  └──────────▲──▲──▲──────────┘ └───────────────────▲──▲──▲─┘     │
45///   │  │  │  ◌  ◌  ◌  ◌  ◌  ◌      ◌  ◌  ◌  │  │  │  ◌  ◌  ◌     ◌  ◌  ◌  ◌  ◌  ◌  │  │  │    Stage N
46///   │  │  │  │  │  │  │  │  │      │  │  │  │  │  │  │  │  │     │  │  │  │  │  │  │  │  │       │
47///  ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐    ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐   ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐      │
48///  │1││2││3││4││5││6││7││8││9│    │1││2││3││4││5││6││7││8││9│   │1││2││3││4││5││6││7││8││9│      │
49/// ┌┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┐  ┌┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┐ ┌┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┐     │
50/// │      DataSourceExec       │  │      DataSourceExec       │ │      DataSourceExec       │     │
51/// │         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
52/// └───────────────────────────┘  └───────────────────────────┘ └───────────────────────────┘     ■
53/// ```
54#[derive(Debug)]
55pub struct PartitionIsolatorExec {
56    pub(crate) input: Arc<dyn ExecutionPlan>,
57    pub(crate) properties: Arc<PlanProperties>,
58    pub(crate) n_tasks: usize,
59    pub(crate) metrics: ExecutionPlanMetricsSet,
60}
61
62impl PartitionIsolatorExec {
63    pub fn new(input: Arc<dyn ExecutionPlan>, n_tasks: usize) -> Self {
64        let input_partitions = input.properties().partitioning.partition_count();
65
66        let partition_count = Self::partition_groups(input_partitions, n_tasks)[0].len();
67
68        let properties = <PlanProperties as Clone>::clone(&input.properties().clone())
69            .with_partitioning(Partitioning::UnknownPartitioning(partition_count));
70
71        Self {
72            input: input.clone(),
73            properties: Arc::new(properties),
74            n_tasks,
75            metrics: ExecutionPlanMetricsSet::new(),
76        }
77    }
78
79    fn partition_groups(input_partitions: usize, n_tasks: usize) -> Vec<Vec<usize>> {
80        let q = input_partitions / n_tasks;
81        let r = input_partitions % n_tasks;
82
83        let mut off = 0;
84        (0..n_tasks)
85            .map(|i| q + if i < r { 1 } else { 0 })
86            .map(|n| {
87                let result = (off..(off + n)).collect();
88                off += n;
89                result
90            })
91            .collect()
92    }
93
94    pub(crate) fn partition_group(
95        input_partitions: usize,
96        task_i: usize,
97        n_tasks: usize,
98    ) -> Vec<usize> {
99        Self::partition_groups(input_partitions, n_tasks)[task_i].clone()
100    }
101}
102
103impl DisplayAs for PartitionIsolatorExec {
104    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
105        let input_partitions = self.input.output_partitioning().partition_count();
106        write!(f, "PartitionIsolatorExec: ")?;
107
108        match t {
109            DisplayFormatType::Verbose => {
110                let partition_groups = Self::partition_groups(input_partitions, self.n_tasks);
111
112                let n: usize = partition_groups.iter().map(|v| v.len()).sum();
113                let mut partitions = vec![];
114                for _ in 0..self.n_tasks {
115                    partitions.push(vec!["__".to_string(); n]);
116                }
117
118                for (i, partition_group) in partition_groups.iter().enumerate() {
119                    for (j, p) in partition_group.iter().enumerate() {
120                        partitions[i][*p] = format!("p{j}")
121                    }
122                    write!(f, "t{i}:[{}] ", partitions[i].join(","))?;
123                }
124            }
125            _ => {
126                write!(f, "tasks={} partitions={}", self.n_tasks, input_partitions)?;
127            }
128        }
129        Ok(())
130    }
131}
132
133impl ExecutionPlan for PartitionIsolatorExec {
134    fn name(&self) -> &str {
135        "PartitionIsolatorExec"
136    }
137
138    fn as_any(&self) -> &dyn std::any::Any {
139        self
140    }
141
142    fn properties(&self) -> &Arc<PlanProperties> {
143        &self.properties
144    }
145
146    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
147        vec![&self.input]
148    }
149
150    fn with_new_children(
151        self: Arc<Self>,
152        children: Vec<Arc<dyn ExecutionPlan>>,
153    ) -> Result<Arc<dyn ExecutionPlan>> {
154        let input = require_one_child(children)?;
155        Ok(Arc::new(Self::new(input, self.n_tasks)))
156    }
157
158    fn execute(
159        &self,
160        partition: usize,
161        context: Arc<TaskContext>,
162    ) -> Result<SendableRecordBatchStream> {
163        let task_context = DistributedTaskContext::from_ctx(&context);
164
165        let metric = MetricBuilder::new(&self.metrics).output_rows(partition);
166        let input_partitions = self.input.output_partitioning().partition_count();
167
168        let partition_group = Self::partition_group(
169            input_partitions,
170            task_context.task_index,
171            task_context.task_count,
172        );
173
174        // if our partition group is [7,8,9] and we are asked for parittion 1,
175        // then look up that index in our group and execute that partition, in this
176        // example partition 8
177
178        match partition_group.get(partition) {
179            Some(actual_partition_number) => {
180                if *actual_partition_number >= input_partitions {
181                    //trace!("{} returning empty stream", ctx_name);
182                    Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
183                        as SendableRecordBatchStream)
184                } else {
185                    Ok(Box::pin(RecordBatchStreamAdapter::new(
186                        self.schema(),
187                        self.input
188                            .execute(*actual_partition_number, context)?
189                            .inspect_ok(move |v| metric.add(v.num_rows())),
190                    )))
191                }
192            }
193            None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
194                as SendableRecordBatchStream),
195        }
196    }
197
198    fn metrics(&self) -> Option<MetricsSet> {
199        Some(self.metrics.clone_inner())
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206
207    #[test]
208    fn test_partition_groups() {
209        assert_eq!(
210            PartitionIsolatorExec::partition_groups(2, 1),
211            vec![vec![0, 1]]
212        );
213        assert_eq!(
214            PartitionIsolatorExec::partition_groups(6, 2),
215            vec![vec![0, 1, 2], vec![3, 4, 5]]
216        );
217        assert_eq!(
218            PartitionIsolatorExec::partition_groups(6, 3),
219            vec![vec![0, 1], vec![2, 3], vec![4, 5]]
220        );
221        assert_eq!(
222            PartitionIsolatorExec::partition_groups(6, 4),
223            vec![vec![0, 1], vec![2, 3], vec![4], vec![5]]
224        );
225        assert_eq!(
226            PartitionIsolatorExec::partition_groups(10, 3),
227            vec![vec![0, 1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]
228        );
229        assert_eq!(
230            PartitionIsolatorExec::partition_groups(10, 4),
231            vec![vec![0, 1, 2], vec![3, 4, 5], vec![6, 7], vec![8, 9]]
232        );
233    }
234}