datafusion_physical_expr/
partitioning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`Partitioning`] and [`Distribution`] for `ExecutionPlans`
19
20use crate::{
21    EquivalenceProperties, PhysicalExpr, equivalence::ProjectionMapping,
22    expressions::UnKnownColumn, physical_exprs_equal,
23};
24use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
25use std::fmt;
26use std::fmt::Display;
27use std::sync::Arc;
28
29/// Output partitioning supported by [`ExecutionPlan`]s.
30///
31/// Calling [`ExecutionPlan::execute`] produce one or more independent streams of
32/// [`RecordBatch`]es in parallel, referred to as partitions. The streams are Rust
33/// `async` [`Stream`]s (a special kind of future). The number of output
34/// partitions varies based on the input and the operation performed.
35///
36/// For example, an `ExecutionPlan` that has output partitioning of 3 will
37/// produce 3 distinct output streams as the result of calling
38/// `ExecutionPlan::execute(0)`, `ExecutionPlan::execute(1)`, and
39/// `ExecutionPlan::execute(2)`, as shown below:
40///
41/// ```text
42///                                                   ...         ...        ...
43///               ...                                  ▲           ▲           ▲
44///                                                    │           │           │
45///                ▲                                   │           │           │
46///                │                                   │           │           │
47///                │                               ┌───┴────┐  ┌───┴────┐  ┌───┴────┐
48///     ┌────────────────────┐                     │ Stream │  │ Stream │  │ Stream │
49///     │   ExecutionPlan    │                     │  (0)   │  │  (1)   │  │  (2)   │
50///     └────────────────────┘                     └────────┘  └────────┘  └────────┘
51///                ▲                                   ▲           ▲           ▲
52///                │                                   │           │           │
53///     ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          │           │           │
54///             Input        │                         │           │           │
55///     └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          │           │           │
56///                ▲                               ┌ ─ ─ ─ ─   ┌ ─ ─ ─ ─   ┌ ─ ─ ─ ─
57///                │                                 Input  │    Input  │    Input  │
58///                │                               │ Stream    │ Stream    │ Stream
59///                                                   (0)   │     (1)   │     (2)   │
60///               ...                              └ ─ ▲ ─ ─   └ ─ ▲ ─ ─   └ ─ ▲ ─ ─
61///                                                    │           │           │
62///                                                    │           │           │
63///                                                    │           │           │
64///
65/// ExecutionPlan with 1 input                      3 (async) streams, one for each
66/// that has 3 partitions, which itself             output partition
67/// has 3 output partitions
68/// ```
69///
70/// It is common (but not required) that an `ExecutionPlan` has the same number
71/// of input partitions as output partitions. However, some plans have different
72/// numbers such as the `RepartitionExec` that redistributes batches from some
73/// number of inputs to some number of outputs
74///
75/// ```text
76///               ...                                     ...         ...        ...
77///
78///                                                        ▲           ▲           ▲
79///                ▲                                       │           │           │
80///                │                                       │           │           │
81///       ┌────────┴───────────┐                           │           │           │
82///       │  RepartitionExec   │                      ┌────┴───┐  ┌────┴───┐  ┌────┴───┐
83///       └────────────────────┘                      │ Stream │  │ Stream │  │ Stream │
84///                ▲                                  │  (0)   │  │  (1)   │  │  (2)   │
85///                │                                  └────────┘  └────────┘  └────────┘
86///                │                                       ▲           ▲           ▲
87///                ...                                     │           │           │
88///                                                        └──────────┐│┌──────────┘
89///                                                                   │││
90///                                                                   │││
91/// RepartitionExec with 1 input
92/// partition and 3 output partitions                 3 (async) streams, that internally
93///                                                    pull from the same input stream
94///                                                                  ...
95/// ```
96///
97/// # Additional Examples
98///
99/// A simple `FileScanExec` might produce one output stream (partition) for each
100/// file (note the actual DataFusion file scanners can read individual files in
101/// parallel, potentially producing multiple partitions per file)
102///
103/// Plans such as `SortPreservingMerge` produce a single output stream
104/// (1 output partition) by combining some number of input streams (input partitions)
105///
106/// Plans such as `FilterExec` produce the same number of output streams
107/// (partitions) as input streams (partitions).
108///
109/// [`RecordBatch`]: arrow::record_batch::RecordBatch
110/// [`ExecutionPlan::execute`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.execute
111/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
112/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
113#[derive(Debug, Clone)]
114pub enum Partitioning {
115    /// Allocate batches using a round-robin algorithm and the specified number of partitions
116    RoundRobinBatch(usize),
117    /// Allocate rows based on a hash of one of more expressions and the specified number of
118    /// partitions
119    Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
120    /// Unknown partitioning scheme with a known number of partitions
121    UnknownPartitioning(usize),
122}
123
124impl Display for Partitioning {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        match self {
127            Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
128            Partitioning::Hash(phy_exprs, size) => {
129                let phy_exprs_str = phy_exprs
130                    .iter()
131                    .map(|e| format!("{e}"))
132                    .collect::<Vec<String>>()
133                    .join(", ");
134                write!(f, "Hash([{phy_exprs_str}], {size})")
135            }
136            Partitioning::UnknownPartitioning(size) => {
137                write!(f, "UnknownPartitioning({size})")
138            }
139        }
140    }
141}
142
143/// Represents how a [`Partitioning`] satisfies a [`Distribution`] requirement.
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum PartitioningSatisfaction {
146    /// The partitioning does not satisfy the distribution requirement
147    NotSatisfied,
148    /// The partitioning exactly matches the distribution requirement
149    Exact,
150    /// The partitioning satisfies the distribution requirement via subset logic
151    Subset,
152}
153
154impl PartitioningSatisfaction {
155    pub fn is_satisfied(&self) -> bool {
156        matches!(self, Self::Exact | Self::Subset)
157    }
158
159    pub fn is_subset(&self) -> bool {
160        matches!(self, Self::Subset)
161    }
162}
163
164impl Partitioning {
165    /// Returns the number of partitions in this partitioning scheme
166    pub fn partition_count(&self) -> usize {
167        use Partitioning::*;
168        match self {
169            RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
170        }
171    }
172
173    /// Returns true if `subset_exprs` is a subset of `exprs`.
174    /// For example: Hash(a, b) is subset of Hash(a) since a partition with all occurrences of
175    /// a distinct (a) must also contain all occurrences of a distinct (a, b) with the same (a).
176    fn is_subset_partitioning(
177        subset_exprs: &[Arc<dyn PhysicalExpr>],
178        superset_exprs: &[Arc<dyn PhysicalExpr>],
179    ) -> bool {
180        // Require strict subset: fewer expressions, not equal
181        if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() {
182            return false;
183        }
184
185        subset_exprs.iter().all(|subset_expr| {
186            superset_exprs
187                .iter()
188                .any(|superset_expr| subset_expr.eq(superset_expr))
189        })
190    }
191
192    #[deprecated(since = "52.0.0", note = "Use satisfaction instead")]
193    pub fn satisfy(
194        &self,
195        required: &Distribution,
196        eq_properties: &EquivalenceProperties,
197    ) -> bool {
198        self.satisfaction(required, eq_properties, false)
199            == PartitioningSatisfaction::Exact
200    }
201
202    /// Returns how this [`Partitioning`] satisfies the partitioning scheme mandated
203    /// by the `required` [`Distribution`].
204    pub fn satisfaction(
205        &self,
206        required: &Distribution,
207        eq_properties: &EquivalenceProperties,
208        allow_subset: bool,
209    ) -> PartitioningSatisfaction {
210        match required {
211            Distribution::UnspecifiedDistribution => PartitioningSatisfaction::Exact,
212            Distribution::SinglePartition if self.partition_count() == 1 => {
213                PartitioningSatisfaction::Exact
214            }
215            // When partition count is 1, hash requirement is satisfied.
216            Distribution::HashPartitioned(_) if self.partition_count() == 1 => {
217                PartitioningSatisfaction::Exact
218            }
219            Distribution::HashPartitioned(required_exprs) => match self {
220                // Here we do not check the partition count for hash partitioning and assumes the partition count
221                // and hash functions in the system are the same. In future if we plan to support storage partition-wise joins,
222                // then we need to have the partition count and hash functions validation.
223                Partitioning::Hash(partition_exprs, _) => {
224                    // Empty hash partitioning is invalid
225                    if partition_exprs.is_empty() || required_exprs.is_empty() {
226                        return PartitioningSatisfaction::NotSatisfied;
227                    }
228
229                    // Fast path: exact match
230                    if physical_exprs_equal(required_exprs, partition_exprs) {
231                        return PartitioningSatisfaction::Exact;
232                    }
233
234                    // Normalization path using equivalence groups
235                    let eq_groups = eq_properties.eq_group();
236                    if !eq_groups.is_empty() {
237                        let normalized_required_exprs = required_exprs
238                            .iter()
239                            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
240                            .collect::<Vec<_>>();
241                        let normalized_partition_exprs = partition_exprs
242                            .iter()
243                            .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
244                            .collect::<Vec<_>>();
245                        if physical_exprs_equal(
246                            &normalized_required_exprs,
247                            &normalized_partition_exprs,
248                        ) {
249                            return PartitioningSatisfaction::Exact;
250                        }
251
252                        if allow_subset
253                            && Self::is_subset_partitioning(
254                                &normalized_partition_exprs,
255                                &normalized_required_exprs,
256                            )
257                        {
258                            return PartitioningSatisfaction::Subset;
259                        }
260                    } else if allow_subset
261                        && Self::is_subset_partitioning(partition_exprs, required_exprs)
262                    {
263                        return PartitioningSatisfaction::Subset;
264                    }
265
266                    PartitioningSatisfaction::NotSatisfied
267                }
268                _ => PartitioningSatisfaction::NotSatisfied,
269            },
270            _ => PartitioningSatisfaction::NotSatisfied,
271        }
272    }
273
274    /// Calculate the output partitioning after applying the given projection.
275    pub fn project(
276        &self,
277        mapping: &ProjectionMapping,
278        input_eq_properties: &EquivalenceProperties,
279    ) -> Self {
280        if let Partitioning::Hash(exprs, part) = self {
281            let normalized_exprs = input_eq_properties
282                .project_expressions(exprs, mapping)
283                .zip(exprs)
284                .map(|(proj_expr, expr)| {
285                    proj_expr.unwrap_or_else(|| {
286                        Arc::new(UnKnownColumn::new(&expr.to_string()))
287                    })
288                })
289                .collect();
290            Partitioning::Hash(normalized_exprs, *part)
291        } else {
292            self.clone()
293        }
294    }
295}
296
297impl PartialEq for Partitioning {
298    fn eq(&self, other: &Partitioning) -> bool {
299        match (self, other) {
300            (
301                Partitioning::RoundRobinBatch(count1),
302                Partitioning::RoundRobinBatch(count2),
303            ) if count1 == count2 => true,
304            (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
305                if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
306            {
307                true
308            }
309            _ => false,
310        }
311    }
312}
313
314/// How data is distributed amongst partitions. See [`Partitioning`] for more
315/// details.
316#[derive(Debug, Clone)]
317pub enum Distribution {
318    /// Unspecified distribution
319    UnspecifiedDistribution,
320    /// A single partition is required
321    SinglePartition,
322    /// Requires children to be distributed in such a way that the same
323    /// values of the keys end up in the same partition
324    HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
325}
326
327impl Distribution {
328    /// Creates a `Partitioning` that satisfies this `Distribution`
329    pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
330        match self {
331            Distribution::UnspecifiedDistribution => {
332                Partitioning::UnknownPartitioning(partition_count)
333            }
334            Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
335            Distribution::HashPartitioned(expr) => {
336                Partitioning::Hash(expr, partition_count)
337            }
338        }
339    }
340}
341
342impl Display for Distribution {
343    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
344        match self {
345            Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
346            Distribution::SinglePartition => write!(f, "SinglePartition"),
347            Distribution::HashPartitioned(exprs) => {
348                write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
349            }
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356
357    use super::*;
358    use crate::expressions::Column;
359
360    use arrow::datatypes::{DataType, Field, Schema};
361    use datafusion_common::Result;
362
363    #[test]
364    fn partitioning_satisfy_distribution() -> Result<()> {
365        let schema = Arc::new(Schema::new(vec![
366            Field::new("column_1", DataType::Int64, false),
367            Field::new("column_2", DataType::Utf8, false),
368        ]));
369
370        let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
371            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
372            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
373        ];
374
375        let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
376            Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
377            Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
378        ];
379
380        let distribution_types = vec![
381            Distribution::UnspecifiedDistribution,
382            Distribution::SinglePartition,
383            Distribution::HashPartitioned(partition_exprs1.clone()),
384        ];
385
386        let single_partition = Partitioning::UnknownPartitioning(1);
387        let unspecified_partition = Partitioning::UnknownPartitioning(10);
388        let round_robin_partition = Partitioning::RoundRobinBatch(10);
389        let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
390        let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
391        let eq_properties = EquivalenceProperties::new(schema);
392
393        for distribution in distribution_types {
394            let result = (
395                single_partition
396                    .satisfaction(&distribution, &eq_properties, true)
397                    .is_satisfied(),
398                unspecified_partition
399                    .satisfaction(&distribution, &eq_properties, true)
400                    .is_satisfied(),
401                round_robin_partition
402                    .satisfaction(&distribution, &eq_properties, true)
403                    .is_satisfied(),
404                hash_partition1
405                    .satisfaction(&distribution, &eq_properties, true)
406                    .is_satisfied(),
407                hash_partition2
408                    .satisfaction(&distribution, &eq_properties, true)
409                    .is_satisfied(),
410            );
411
412            match distribution {
413                Distribution::UnspecifiedDistribution => {
414                    assert_eq!(result, (true, true, true, true, true))
415                }
416                Distribution::SinglePartition => {
417                    assert_eq!(result, (true, false, false, false, false))
418                }
419                Distribution::HashPartitioned(_) => {
420                    assert_eq!(result, (true, false, false, true, false))
421                }
422            }
423        }
424
425        Ok(())
426    }
427
428    #[test]
429    fn test_partitioning_satisfy_by_subset() -> Result<()> {
430        let schema = Arc::new(Schema::new(vec![
431            Field::new("a", DataType::Int64, false),
432            Field::new("b", DataType::Int64, false),
433            Field::new("c", DataType::Int64, false),
434        ]));
435
436        let col_a: Arc<dyn PhysicalExpr> =
437            Arc::new(Column::new_with_schema("a", &schema)?);
438        let col_b: Arc<dyn PhysicalExpr> =
439            Arc::new(Column::new_with_schema("b", &schema)?);
440        let col_c: Arc<dyn PhysicalExpr> =
441            Arc::new(Column::new_with_schema("c", &schema)?);
442        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
443
444        let test_cases = vec![
445            (
446                "Hash([a]) vs Hash([a, b])",
447                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
448                Distribution::HashPartitioned(vec![
449                    Arc::clone(&col_a),
450                    Arc::clone(&col_b),
451                ]),
452                PartitioningSatisfaction::Subset,
453                PartitioningSatisfaction::NotSatisfied,
454            ),
455            (
456                "Hash([a]) vs Hash([a, b, c])",
457                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
458                Distribution::HashPartitioned(vec![
459                    Arc::clone(&col_a),
460                    Arc::clone(&col_b),
461                    Arc::clone(&col_c),
462                ]),
463                PartitioningSatisfaction::Subset,
464                PartitioningSatisfaction::NotSatisfied,
465            ),
466            (
467                "Hash([a, b]) vs Hash([a, b, c])",
468                Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
469                Distribution::HashPartitioned(vec![
470                    Arc::clone(&col_a),
471                    Arc::clone(&col_b),
472                    Arc::clone(&col_c),
473                ]),
474                PartitioningSatisfaction::Subset,
475                PartitioningSatisfaction::NotSatisfied,
476            ),
477            (
478                "Hash([b]) vs Hash([a, b, c])",
479                Partitioning::Hash(vec![Arc::clone(&col_b)], 4),
480                Distribution::HashPartitioned(vec![
481                    Arc::clone(&col_a),
482                    Arc::clone(&col_b),
483                    Arc::clone(&col_c),
484                ]),
485                PartitioningSatisfaction::Subset,
486                PartitioningSatisfaction::NotSatisfied,
487            ),
488            (
489                "Hash([b, a]) vs Hash([a, b, c])",
490                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
491                Distribution::HashPartitioned(vec![
492                    Arc::clone(&col_a),
493                    Arc::clone(&col_b),
494                    Arc::clone(&col_c),
495                ]),
496                PartitioningSatisfaction::Subset,
497                PartitioningSatisfaction::NotSatisfied,
498            ),
499        ];
500
501        for (desc, partition, required, expected_with_subset, expected_without_subset) in
502            test_cases
503        {
504            let result = partition.satisfaction(&required, &eq_properties, true);
505            assert_eq!(
506                result, expected_with_subset,
507                "Failed for {desc} with subset enabled"
508            );
509
510            let result = partition.satisfaction(&required, &eq_properties, false);
511            assert_eq!(
512                result, expected_without_subset,
513                "Failed for {desc} with subset disabled"
514            );
515        }
516
517        Ok(())
518    }
519
520    #[test]
521    fn test_partitioning_current_superset() -> Result<()> {
522        let schema = Arc::new(Schema::new(vec![
523            Field::new("a", DataType::Int64, false),
524            Field::new("b", DataType::Int64, false),
525            Field::new("c", DataType::Int64, false),
526        ]));
527
528        let col_a: Arc<dyn PhysicalExpr> =
529            Arc::new(Column::new_with_schema("a", &schema)?);
530        let col_b: Arc<dyn PhysicalExpr> =
531            Arc::new(Column::new_with_schema("b", &schema)?);
532        let col_c: Arc<dyn PhysicalExpr> =
533            Arc::new(Column::new_with_schema("c", &schema)?);
534        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
535
536        let test_cases = vec![
537            (
538                "Hash([a, b]) vs Hash([a])",
539                Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
540                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
541                PartitioningSatisfaction::NotSatisfied,
542                PartitioningSatisfaction::NotSatisfied,
543            ),
544            (
545                "Hash([a, b, c]) vs Hash([a])",
546                Partitioning::Hash(
547                    vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
548                    4,
549                ),
550                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
551                PartitioningSatisfaction::NotSatisfied,
552                PartitioningSatisfaction::NotSatisfied,
553            ),
554            (
555                "Hash([a, b, c]) vs Hash([a, b])",
556                Partitioning::Hash(
557                    vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
558                    4,
559                ),
560                Distribution::HashPartitioned(vec![
561                    Arc::clone(&col_a),
562                    Arc::clone(&col_b),
563                ]),
564                PartitioningSatisfaction::NotSatisfied,
565                PartitioningSatisfaction::NotSatisfied,
566            ),
567        ];
568
569        for (desc, partition, required, expected_with_subset, expected_without_subset) in
570            test_cases
571        {
572            let result = partition.satisfaction(&required, &eq_properties, true);
573            assert_eq!(
574                result, expected_with_subset,
575                "Failed for {desc} with subset enabled"
576            );
577
578            let result = partition.satisfaction(&required, &eq_properties, false);
579            assert_eq!(
580                result, expected_without_subset,
581                "Failed for {desc} with subset disabled"
582            );
583        }
584
585        Ok(())
586    }
587
588    #[test]
589    fn test_partitioning_partial_overlap() -> Result<()> {
590        let schema = Arc::new(Schema::new(vec![
591            Field::new("a", DataType::Int64, false),
592            Field::new("b", DataType::Int64, false),
593            Field::new("c", DataType::Int64, false),
594        ]));
595
596        let col_a: Arc<dyn PhysicalExpr> =
597            Arc::new(Column::new_with_schema("a", &schema)?);
598        let col_b: Arc<dyn PhysicalExpr> =
599            Arc::new(Column::new_with_schema("b", &schema)?);
600        let col_c: Arc<dyn PhysicalExpr> =
601            Arc::new(Column::new_with_schema("c", &schema)?);
602        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
603
604        let test_cases = vec![(
605            "Partial overlap: Hash([a, c]) vs Hash([a, b])",
606            Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_c)], 4),
607            Distribution::HashPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)]),
608            PartitioningSatisfaction::NotSatisfied,
609            PartitioningSatisfaction::NotSatisfied,
610        )];
611
612        for (desc, partition, required, expected_with_subset, expected_without_subset) in
613            test_cases
614        {
615            let result = partition.satisfaction(&required, &eq_properties, true);
616            assert_eq!(
617                result, expected_with_subset,
618                "Failed for {desc} with subset enabled"
619            );
620
621            let result = partition.satisfaction(&required, &eq_properties, false);
622            assert_eq!(
623                result, expected_without_subset,
624                "Failed for {desc} with subset disabled"
625            );
626        }
627
628        Ok(())
629    }
630
631    #[test]
632    fn test_partitioning_no_overlap() -> Result<()> {
633        let schema = Arc::new(Schema::new(vec![
634            Field::new("a", DataType::Int64, false),
635            Field::new("b", DataType::Int64, false),
636            Field::new("c", DataType::Int64, false),
637        ]));
638
639        let col_a: Arc<dyn PhysicalExpr> =
640            Arc::new(Column::new_with_schema("a", &schema)?);
641        let col_b: Arc<dyn PhysicalExpr> =
642            Arc::new(Column::new_with_schema("b", &schema)?);
643        let col_c: Arc<dyn PhysicalExpr> =
644            Arc::new(Column::new_with_schema("c", &schema)?);
645        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
646
647        let test_cases = vec![
648            (
649                "Hash([a]) vs Hash([b, c])",
650                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
651                Distribution::HashPartitioned(vec![
652                    Arc::clone(&col_b),
653                    Arc::clone(&col_c),
654                ]),
655                PartitioningSatisfaction::NotSatisfied,
656                PartitioningSatisfaction::NotSatisfied,
657            ),
658            (
659                "Hash([a, b]) vs Hash([c])",
660                Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
661                Distribution::HashPartitioned(vec![Arc::clone(&col_c)]),
662                PartitioningSatisfaction::NotSatisfied,
663                PartitioningSatisfaction::NotSatisfied,
664            ),
665        ];
666
667        for (desc, partition, required, expected_with_subset, expected_without_subset) in
668            test_cases
669        {
670            let result = partition.satisfaction(&required, &eq_properties, true);
671            assert_eq!(
672                result, expected_with_subset,
673                "Failed for {desc} with subset enabled"
674            );
675
676            let result = partition.satisfaction(&required, &eq_properties, false);
677            assert_eq!(
678                result, expected_without_subset,
679                "Failed for {desc} with subset disabled"
680            );
681        }
682
683        Ok(())
684    }
685
686    #[test]
687    fn test_partitioning_exact_match() -> Result<()> {
688        let schema = Arc::new(Schema::new(vec![
689            Field::new("a", DataType::Int64, false),
690            Field::new("b", DataType::Int64, false),
691        ]));
692
693        let col_a: Arc<dyn PhysicalExpr> =
694            Arc::new(Column::new_with_schema("a", &schema)?);
695        let col_b: Arc<dyn PhysicalExpr> =
696            Arc::new(Column::new_with_schema("b", &schema)?);
697        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
698
699        let test_cases = vec![
700            (
701                "Hash([a, b]) vs Hash([a, b])",
702                Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
703                Distribution::HashPartitioned(vec![
704                    Arc::clone(&col_a),
705                    Arc::clone(&col_b),
706                ]),
707                PartitioningSatisfaction::Exact,
708                PartitioningSatisfaction::Exact,
709            ),
710            (
711                "Hash([a]) vs Hash([a])",
712                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
713                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
714                PartitioningSatisfaction::Exact,
715                PartitioningSatisfaction::Exact,
716            ),
717        ];
718
719        for (desc, partition, required, expected_with_subset, expected_without_subset) in
720            test_cases
721        {
722            let result = partition.satisfaction(&required, &eq_properties, true);
723            assert_eq!(
724                result, expected_with_subset,
725                "Failed for {desc} with subset enabled"
726            );
727
728            let result = partition.satisfaction(&required, &eq_properties, false);
729            assert_eq!(
730                result, expected_without_subset,
731                "Failed for {desc} with subset disabled"
732            );
733        }
734
735        Ok(())
736    }
737
738    #[test]
739    fn test_partitioning_unknown() -> Result<()> {
740        let schema = Arc::new(Schema::new(vec![
741            Field::new("a", DataType::Int64, false),
742            Field::new("b", DataType::Int64, false),
743        ]));
744
745        let col_a: Arc<dyn PhysicalExpr> =
746            Arc::new(Column::new_with_schema("a", &schema)?);
747        let col_b: Arc<dyn PhysicalExpr> =
748            Arc::new(Column::new_with_schema("b", &schema)?);
749        let unknown: Arc<dyn PhysicalExpr> = Arc::new(UnKnownColumn::new("dropped"));
750        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
751
752        let test_cases = vec![
753            (
754                "Hash([unknown]) vs Hash([a, b])",
755                Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
756                Distribution::HashPartitioned(vec![
757                    Arc::clone(&col_a),
758                    Arc::clone(&col_b),
759                ]),
760                PartitioningSatisfaction::NotSatisfied,
761                PartitioningSatisfaction::NotSatisfied,
762            ),
763            (
764                "Hash([a, b]) vs Hash([unknown])",
765                Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
766                Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
767                PartitioningSatisfaction::NotSatisfied,
768                PartitioningSatisfaction::NotSatisfied,
769            ),
770            (
771                "Hash([unknown]) vs Hash([unknown])",
772                Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
773                Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
774                PartitioningSatisfaction::NotSatisfied,
775                PartitioningSatisfaction::NotSatisfied,
776            ),
777        ];
778
779        for (desc, partition, required, expected_with_subset, expected_without_subset) in
780            test_cases
781        {
782            let result = partition.satisfaction(&required, &eq_properties, true);
783            assert_eq!(
784                result, expected_with_subset,
785                "Failed for {desc} with subset enabled"
786            );
787
788            let result = partition.satisfaction(&required, &eq_properties, false);
789            assert_eq!(
790                result, expected_without_subset,
791                "Failed for {desc} with subset disabled"
792            );
793        }
794
795        Ok(())
796    }
797
798    #[test]
799    fn test_partitioning_empty_hash() -> Result<()> {
800        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
801
802        let col_a: Arc<dyn PhysicalExpr> =
803            Arc::new(Column::new_with_schema("a", &schema)?);
804        let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
805
806        let test_cases = vec![
807            (
808                "Hash([]) vs Hash([a])",
809                Partitioning::Hash(vec![], 4),
810                Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
811                PartitioningSatisfaction::NotSatisfied,
812                PartitioningSatisfaction::NotSatisfied,
813            ),
814            (
815                "Hash([a]) vs Hash([])",
816                Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
817                Distribution::HashPartitioned(vec![]),
818                PartitioningSatisfaction::NotSatisfied,
819                PartitioningSatisfaction::NotSatisfied,
820            ),
821            (
822                "Hash([]) vs Hash([])",
823                Partitioning::Hash(vec![], 4),
824                Distribution::HashPartitioned(vec![]),
825                PartitioningSatisfaction::NotSatisfied,
826                PartitioningSatisfaction::NotSatisfied,
827            ),
828        ];
829
830        for (desc, partition, required, expected_with_subset, expected_without_subset) in
831            test_cases
832        {
833            let result = partition.satisfaction(&required, &eq_properties, true);
834            assert_eq!(
835                result, expected_with_subset,
836                "Failed for {desc} with subset enabled"
837            );
838
839            let result = partition.satisfaction(&required, &eq_properties, false);
840            assert_eq!(
841                result, expected_without_subset,
842                "Failed for {desc} with subset disabled"
843            );
844        }
845
846        Ok(())
847    }
848}