datafusion_physical_expr/partitioning.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`
19
20use crate::{
21 equivalence::ProjectionMapping, expressions::UnKnownColumn, physical_exprs_equal,
22 EquivalenceProperties, PhysicalExpr,
23};
24use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
25use std::fmt;
26use std::fmt::Display;
27use std::sync::Arc;
28
29/// Output partitioning supported by [`ExecutionPlan`]s.
30///
31/// Calling [`ExecutionPlan::execute`] produce one or more independent streams of
32/// [`RecordBatch`]es in parallel, referred to as partitions. The streams are Rust
33/// `async` [`Stream`]s (a special kind of future). The number of output
34/// partitions varies based on the input and the operation performed.
35///
36/// For example, an `ExecutionPlan` that has output partitioning of 3 will
37/// produce 3 distinct output streams as the result of calling
38/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
39/// `ExecutionPlan::execute(2)`, as shown below:
40///
41/// ```text
42/// ... ... ...
43/// ... ▲ ▲ ▲
44/// │ │ │
45/// ▲ │ │ │
46/// │ │ │ │
47/// │ ┌───┴────┐ ┌───┴────┐ ┌───┴────┐
48/// ┌────────────────────┐ │ Stream │ │ Stream │ │ Stream │
49/// │ ExecutionPlan │ │ (0) │ │ (1) │ │ (2) │
50/// └────────────────────┘ └────────┘ └────────┘ └────────┘
51/// ▲ ▲ ▲ ▲
52/// │ │ │ │
53/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
54/// Input │ │ │ │
55/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ │ │ │
56/// ▲ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ┌ ─ ─ ─ ─
57/// │ Input │ Input │ Input │
58/// │ │ Stream │ Stream │ Stream
59/// (0) │ (1) │ (2) │
60/// ... └ ─ ▲ ─ ─ └ ─ ▲ ─ ─ └ ─ ▲ ─ ─
61/// │ │ │
62/// │ │ │
63/// │ │ │
64///
65/// ExecutionPlan with 1 input 3 (async) streams, one for each
66/// that has 3 partitions, which itself output partition
67/// has 3 output partitions
68/// ```
69///
70/// It is common (but not required) that an `ExecutionPlan` has the same number
71/// of input partitions as output partitions. However, some plans have different
72/// numbers such as the `RepartitionExec` that redistributes batches from some
73/// number of inputs to some number of outputs
74///
75/// ```text
76/// ... ... ... ...
77///
78/// ▲ ▲ ▲
79/// ▲ │ │ │
80/// │ │ │ │
81/// ┌────────┴───────────┐ │ │ │
82/// │ RepartitionExec │ ┌────┴───┐ ┌────┴───┐ ┌────┴───┐
83/// └────────────────────┘ │ Stream │ │ Stream │ │ Stream │
84/// ▲ │ (0) │ │ (1) │ │ (2) │
85/// │ └────────┘ └────────┘ └────────┘
86/// │ ▲ ▲ ▲
87/// ... │ │ │
88/// └──────────┐│┌──────────┘
89/// │││
90/// │││
91/// RepartitionExec with 1 input
92/// partition and 3 output partitions 3 (async) streams, that internally
93/// pull from the same input stream
94/// ...
95/// ```
96///
97/// # Additional Examples
98///
99/// A simple `FileScanExec` might produce one output stream (partition) for each
100/// file (note the actual DataFusion file scanners can read individual files in
101/// parallel, potentially producing multiple partitions per file)
102///
103/// Plans such as `SortPreservingMerge` produce a single output stream
104/// (1 output partition) by combining some number of input streams (input partitions)
105///
106/// Plans such as `FilterExec` produce the same number of output streams
107/// (partitions) as input streams (partitions).
108///
109/// [`RecordBatch`]: arrow::record_batch::RecordBatch
110/// [`ExecutionPlan::execute`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute
111/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
112/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
113#[derive(Debug, Clone)]
114pub enum Partitioning {
115 /// Allocate batches using a round-robin algorithm and the specified number of partitions
116 RoundRobinBatch(usize),
117 /// Allocate rows based on a hash of one of more expressions and the specified number of
118 /// partitions
119 Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
120 /// Unknown partitioning scheme with a known number of partitions
121 UnknownPartitioning(usize),
122}
123
124impl Display for Partitioning {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 match self {
127 Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
128 Partitioning::Hash(phy_exprs, size) => {
129 let phy_exprs_str = phy_exprs
130 .iter()
131 .map(|e| format!("{e}"))
132 .collect::<Vec<String>>()
133 .join(", ");
134 write!(f, "Hash([{phy_exprs_str}], {size})")
135 }
136 Partitioning::UnknownPartitioning(size) => {
137 write!(f, "UnknownPartitioning({size})")
138 }
139 }
140 }
141}
142impl Partitioning {
143 /// Returns the number of partitions in this partitioning scheme
144 pub fn partition_count(&self) -> usize {
145 use Partitioning::*;
146 match self {
147 RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
148 }
149 }
150
151 /// Returns true when the guarantees made by this [`Partitioning`] are sufficient to
152 /// satisfy the partitioning scheme mandated by the `required` [`Distribution`].
153 pub fn satisfy(
154 &self,
155 required: &Distribution,
156 eq_properties: &EquivalenceProperties,
157 ) -> bool {
158 match required {
159 Distribution::UnspecifiedDistribution => true,
160 Distribution::SinglePartition if self.partition_count() == 1 => true,
161 // When partition count is 1, hash requirement is satisfied.
162 Distribution::HashPartitioned(_) if self.partition_count() == 1 => true,
163 Distribution::HashPartitioned(required_exprs) => {
164 match self {
165 // Here we do not check the partition count for hash partitioning and assumes the partition count
166 // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
167 // then we need to have the partition count and hash functions validation.
168 Partitioning::Hash(partition_exprs, _) => {
169 let fast_match =
170 physical_exprs_equal(required_exprs, partition_exprs);
171 // If the required exprs do not match, need to leverage the eq_properties provided by the child
172 // and normalize both exprs based on the equivalent groups.
173 if !fast_match {
174 let eq_groups = eq_properties.eq_group();
175 if !eq_groups.is_empty() {
176 let normalized_required_exprs = required_exprs
177 .iter()
178 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
179 .collect::<Vec<_>>();
180 let normalized_partition_exprs = partition_exprs
181 .iter()
182 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
183 .collect::<Vec<_>>();
184 return physical_exprs_equal(
185 &normalized_required_exprs,
186 &normalized_partition_exprs,
187 );
188 }
189 }
190 fast_match
191 }
192 _ => false,
193 }
194 }
195 _ => false,
196 }
197 }
198
199 /// Calculate the output partitioning after applying the given projection.
200 pub fn project(
201 &self,
202 mapping: &ProjectionMapping,
203 input_eq_properties: &EquivalenceProperties,
204 ) -> Self {
205 if let Partitioning::Hash(exprs, part) = self {
206 let normalized_exprs = input_eq_properties
207 .project_expressions(exprs, mapping)
208 .zip(exprs)
209 .map(|(proj_expr, expr)| {
210 proj_expr.unwrap_or_else(|| {
211 Arc::new(UnKnownColumn::new(&expr.to_string()))
212 })
213 })
214 .collect();
215 Partitioning::Hash(normalized_exprs, *part)
216 } else {
217 self.clone()
218 }
219 }
220}
221
222impl PartialEq for Partitioning {
223 fn eq(&self, other: &Partitioning) -> bool {
224 match (self, other) {
225 (
226 Partitioning::RoundRobinBatch(count1),
227 Partitioning::RoundRobinBatch(count2),
228 ) if count1 == count2 => true,
229 (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
230 if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
231 {
232 true
233 }
234 _ => false,
235 }
236 }
237}
238
239/// How data is distributed amongst partitions. See [`Partitioning`] for more
240/// details.
241#[derive(Debug, Clone)]
242pub enum Distribution {
243 /// Unspecified distribution
244 UnspecifiedDistribution,
245 /// A single partition is required
246 SinglePartition,
247 /// Requires children to be distributed in such a way that the same
248 /// values of the keys end up in the same partition
249 HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
250}
251
252impl Distribution {
253 /// Creates a `Partitioning` that satisfies this `Distribution`
254 pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
255 match self {
256 Distribution::UnspecifiedDistribution => {
257 Partitioning::UnknownPartitioning(partition_count)
258 }
259 Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
260 Distribution::HashPartitioned(expr) => {
261 Partitioning::Hash(expr, partition_count)
262 }
263 }
264 }
265}
266
267impl Display for Distribution {
268 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
269 match self {
270 Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
271 Distribution::SinglePartition => write!(f, "SinglePartition"),
272 Distribution::HashPartitioned(exprs) => {
273 write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
274 }
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281
282 use super::*;
283 use crate::expressions::Column;
284
285 use arrow::datatypes::{DataType, Field, Schema};
286 use datafusion_common::Result;
287
288 #[test]
289 fn partitioning_satisfy_distribution() -> Result<()> {
290 let schema = Arc::new(Schema::new(vec![
291 Field::new("column_1", DataType::Int64, false),
292 Field::new("column_2", DataType::Utf8, false),
293 ]));
294
295 let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
296 Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
297 Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
298 ];
299
300 let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
301 Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
302 Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
303 ];
304
305 let distribution_types = vec![
306 Distribution::UnspecifiedDistribution,
307 Distribution::SinglePartition,
308 Distribution::HashPartitioned(partition_exprs1.clone()),
309 ];
310
311 let single_partition = Partitioning::UnknownPartitioning(1);
312 let unspecified_partition = Partitioning::UnknownPartitioning(10);
313 let round_robin_partition = Partitioning::RoundRobinBatch(10);
314 let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
315 let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
316 let eq_properties = EquivalenceProperties::new(schema);
317
318 for distribution in distribution_types {
319 let result = (
320 single_partition.satisfy(&distribution, &eq_properties),
321 unspecified_partition.satisfy(&distribution, &eq_properties),
322 round_robin_partition.satisfy(&distribution, &eq_properties),
323 hash_partition1.satisfy(&distribution, &eq_properties),
324 hash_partition2.satisfy(&distribution, &eq_properties),
325 );
326
327 match distribution {
328 Distribution::UnspecifiedDistribution => {
329 assert_eq!(result, (true, true, true, true, true))
330 }
331 Distribution::SinglePartition => {
332 assert_eq!(result, (true, false, false, false, false))
333 }
334 Distribution::HashPartitioned(_) => {
335 assert_eq!(result, (true, false, false, true, false))
336 }
337 }
338 }
339
340 Ok(())
341 }
342}