1use crate::{
21 EquivalenceProperties, PhysicalExpr, equivalence::ProjectionMapping,
22 expressions::UnKnownColumn, physical_exprs_equal,
23};
24use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
25use std::fmt;
26use std::fmt::Display;
27use std::sync::Arc;
28
29#[derive(Debug, Clone)]
114pub enum Partitioning {
115 RoundRobinBatch(usize),
117 Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
120 UnknownPartitioning(usize),
122}
123
124impl Display for Partitioning {
125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126 match self {
127 Partitioning::RoundRobinBatch(size) => write!(f, "RoundRobinBatch({size})"),
128 Partitioning::Hash(phy_exprs, size) => {
129 let phy_exprs_str = phy_exprs
130 .iter()
131 .map(|e| format!("{e}"))
132 .collect::<Vec<String>>()
133 .join(", ");
134 write!(f, "Hash([{phy_exprs_str}], {size})")
135 }
136 Partitioning::UnknownPartitioning(size) => {
137 write!(f, "UnknownPartitioning({size})")
138 }
139 }
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
145pub enum PartitioningSatisfaction {
146 NotSatisfied,
148 Exact,
150 Subset,
152}
153
154impl PartitioningSatisfaction {
155 pub fn is_satisfied(&self) -> bool {
156 matches!(self, Self::Exact | Self::Subset)
157 }
158
159 pub fn is_subset(&self) -> bool {
160 matches!(self, Self::Subset)
161 }
162}
163
164impl Partitioning {
165 pub fn partition_count(&self) -> usize {
167 use Partitioning::*;
168 match self {
169 RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
170 }
171 }
172
173 fn is_subset_partitioning(
177 subset_exprs: &[Arc<dyn PhysicalExpr>],
178 superset_exprs: &[Arc<dyn PhysicalExpr>],
179 ) -> bool {
180 if subset_exprs.is_empty() || subset_exprs.len() >= superset_exprs.len() {
182 return false;
183 }
184
185 subset_exprs.iter().all(|subset_expr| {
186 superset_exprs
187 .iter()
188 .any(|superset_expr| subset_expr.eq(superset_expr))
189 })
190 }
191
192 #[deprecated(since = "52.0.0", note = "Use satisfaction instead")]
193 pub fn satisfy(
194 &self,
195 required: &Distribution,
196 eq_properties: &EquivalenceProperties,
197 ) -> bool {
198 self.satisfaction(required, eq_properties, false)
199 == PartitioningSatisfaction::Exact
200 }
201
202 pub fn satisfaction(
205 &self,
206 required: &Distribution,
207 eq_properties: &EquivalenceProperties,
208 allow_subset: bool,
209 ) -> PartitioningSatisfaction {
210 match required {
211 Distribution::UnspecifiedDistribution => PartitioningSatisfaction::Exact,
212 Distribution::SinglePartition if self.partition_count() == 1 => {
213 PartitioningSatisfaction::Exact
214 }
215 Distribution::HashPartitioned(_) if self.partition_count() == 1 => {
217 PartitioningSatisfaction::Exact
218 }
219 Distribution::HashPartitioned(required_exprs) => match self {
220 Partitioning::Hash(partition_exprs, _) => {
224 if partition_exprs.is_empty() || required_exprs.is_empty() {
226 return PartitioningSatisfaction::NotSatisfied;
227 }
228
229 if physical_exprs_equal(required_exprs, partition_exprs) {
231 return PartitioningSatisfaction::Exact;
232 }
233
234 let eq_groups = eq_properties.eq_group();
236 if !eq_groups.is_empty() {
237 let normalized_required_exprs = required_exprs
238 .iter()
239 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
240 .collect::<Vec<_>>();
241 let normalized_partition_exprs = partition_exprs
242 .iter()
243 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
244 .collect::<Vec<_>>();
245 if physical_exprs_equal(
246 &normalized_required_exprs,
247 &normalized_partition_exprs,
248 ) {
249 return PartitioningSatisfaction::Exact;
250 }
251
252 if allow_subset
253 && Self::is_subset_partitioning(
254 &normalized_partition_exprs,
255 &normalized_required_exprs,
256 )
257 {
258 return PartitioningSatisfaction::Subset;
259 }
260 } else if allow_subset
261 && Self::is_subset_partitioning(partition_exprs, required_exprs)
262 {
263 return PartitioningSatisfaction::Subset;
264 }
265
266 PartitioningSatisfaction::NotSatisfied
267 }
268 _ => PartitioningSatisfaction::NotSatisfied,
269 },
270 _ => PartitioningSatisfaction::NotSatisfied,
271 }
272 }
273
274 pub fn project(
276 &self,
277 mapping: &ProjectionMapping,
278 input_eq_properties: &EquivalenceProperties,
279 ) -> Self {
280 if let Partitioning::Hash(exprs, part) = self {
281 let normalized_exprs = input_eq_properties
282 .project_expressions(exprs, mapping)
283 .zip(exprs)
284 .map(|(proj_expr, expr)| {
285 proj_expr.unwrap_or_else(|| {
286 Arc::new(UnKnownColumn::new(&expr.to_string()))
287 })
288 })
289 .collect();
290 Partitioning::Hash(normalized_exprs, *part)
291 } else {
292 self.clone()
293 }
294 }
295}
296
297impl PartialEq for Partitioning {
298 fn eq(&self, other: &Partitioning) -> bool {
299 match (self, other) {
300 (
301 Partitioning::RoundRobinBatch(count1),
302 Partitioning::RoundRobinBatch(count2),
303 ) if count1 == count2 => true,
304 (Partitioning::Hash(exprs1, count1), Partitioning::Hash(exprs2, count2))
305 if physical_exprs_equal(exprs1, exprs2) && (count1 == count2) =>
306 {
307 true
308 }
309 _ => false,
310 }
311 }
312}
313
314#[derive(Debug, Clone)]
317pub enum Distribution {
318 UnspecifiedDistribution,
320 SinglePartition,
322 HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
325}
326
327impl Distribution {
328 pub fn create_partitioning(self, partition_count: usize) -> Partitioning {
330 match self {
331 Distribution::UnspecifiedDistribution => {
332 Partitioning::UnknownPartitioning(partition_count)
333 }
334 Distribution::SinglePartition => Partitioning::UnknownPartitioning(1),
335 Distribution::HashPartitioned(expr) => {
336 Partitioning::Hash(expr, partition_count)
337 }
338 }
339 }
340}
341
342impl Display for Distribution {
343 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
344 match self {
345 Distribution::UnspecifiedDistribution => write!(f, "Unspecified"),
346 Distribution::SinglePartition => write!(f, "SinglePartition"),
347 Distribution::HashPartitioned(exprs) => {
348 write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs))
349 }
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356
357 use super::*;
358 use crate::expressions::Column;
359
360 use arrow::datatypes::{DataType, Field, Schema};
361 use datafusion_common::Result;
362
363 #[test]
364 fn partitioning_satisfy_distribution() -> Result<()> {
365 let schema = Arc::new(Schema::new(vec![
366 Field::new("column_1", DataType::Int64, false),
367 Field::new("column_2", DataType::Utf8, false),
368 ]));
369
370 let partition_exprs1: Vec<Arc<dyn PhysicalExpr>> = vec![
371 Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
372 Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
373 ];
374
375 let partition_exprs2: Vec<Arc<dyn PhysicalExpr>> = vec![
376 Arc::new(Column::new_with_schema("column_2", &schema).unwrap()),
377 Arc::new(Column::new_with_schema("column_1", &schema).unwrap()),
378 ];
379
380 let distribution_types = vec![
381 Distribution::UnspecifiedDistribution,
382 Distribution::SinglePartition,
383 Distribution::HashPartitioned(partition_exprs1.clone()),
384 ];
385
386 let single_partition = Partitioning::UnknownPartitioning(1);
387 let unspecified_partition = Partitioning::UnknownPartitioning(10);
388 let round_robin_partition = Partitioning::RoundRobinBatch(10);
389 let hash_partition1 = Partitioning::Hash(partition_exprs1, 10);
390 let hash_partition2 = Partitioning::Hash(partition_exprs2, 10);
391 let eq_properties = EquivalenceProperties::new(schema);
392
393 for distribution in distribution_types {
394 let result = (
395 single_partition
396 .satisfaction(&distribution, &eq_properties, true)
397 .is_satisfied(),
398 unspecified_partition
399 .satisfaction(&distribution, &eq_properties, true)
400 .is_satisfied(),
401 round_robin_partition
402 .satisfaction(&distribution, &eq_properties, true)
403 .is_satisfied(),
404 hash_partition1
405 .satisfaction(&distribution, &eq_properties, true)
406 .is_satisfied(),
407 hash_partition2
408 .satisfaction(&distribution, &eq_properties, true)
409 .is_satisfied(),
410 );
411
412 match distribution {
413 Distribution::UnspecifiedDistribution => {
414 assert_eq!(result, (true, true, true, true, true))
415 }
416 Distribution::SinglePartition => {
417 assert_eq!(result, (true, false, false, false, false))
418 }
419 Distribution::HashPartitioned(_) => {
420 assert_eq!(result, (true, false, false, true, false))
421 }
422 }
423 }
424
425 Ok(())
426 }
427
428 #[test]
429 fn test_partitioning_satisfy_by_subset() -> Result<()> {
430 let schema = Arc::new(Schema::new(vec![
431 Field::new("a", DataType::Int64, false),
432 Field::new("b", DataType::Int64, false),
433 Field::new("c", DataType::Int64, false),
434 ]));
435
436 let col_a: Arc<dyn PhysicalExpr> =
437 Arc::new(Column::new_with_schema("a", &schema)?);
438 let col_b: Arc<dyn PhysicalExpr> =
439 Arc::new(Column::new_with_schema("b", &schema)?);
440 let col_c: Arc<dyn PhysicalExpr> =
441 Arc::new(Column::new_with_schema("c", &schema)?);
442 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
443
444 let test_cases = vec![
445 (
446 "Hash([a]) vs Hash([a, b])",
447 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
448 Distribution::HashPartitioned(vec![
449 Arc::clone(&col_a),
450 Arc::clone(&col_b),
451 ]),
452 PartitioningSatisfaction::Subset,
453 PartitioningSatisfaction::NotSatisfied,
454 ),
455 (
456 "Hash([a]) vs Hash([a, b, c])",
457 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
458 Distribution::HashPartitioned(vec![
459 Arc::clone(&col_a),
460 Arc::clone(&col_b),
461 Arc::clone(&col_c),
462 ]),
463 PartitioningSatisfaction::Subset,
464 PartitioningSatisfaction::NotSatisfied,
465 ),
466 (
467 "Hash([a, b]) vs Hash([a, b, c])",
468 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
469 Distribution::HashPartitioned(vec![
470 Arc::clone(&col_a),
471 Arc::clone(&col_b),
472 Arc::clone(&col_c),
473 ]),
474 PartitioningSatisfaction::Subset,
475 PartitioningSatisfaction::NotSatisfied,
476 ),
477 (
478 "Hash([b]) vs Hash([a, b, c])",
479 Partitioning::Hash(vec![Arc::clone(&col_b)], 4),
480 Distribution::HashPartitioned(vec![
481 Arc::clone(&col_a),
482 Arc::clone(&col_b),
483 Arc::clone(&col_c),
484 ]),
485 PartitioningSatisfaction::Subset,
486 PartitioningSatisfaction::NotSatisfied,
487 ),
488 (
489 "Hash([b, a]) vs Hash([a, b, c])",
490 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
491 Distribution::HashPartitioned(vec![
492 Arc::clone(&col_a),
493 Arc::clone(&col_b),
494 Arc::clone(&col_c),
495 ]),
496 PartitioningSatisfaction::Subset,
497 PartitioningSatisfaction::NotSatisfied,
498 ),
499 ];
500
501 for (desc, partition, required, expected_with_subset, expected_without_subset) in
502 test_cases
503 {
504 let result = partition.satisfaction(&required, &eq_properties, true);
505 assert_eq!(
506 result, expected_with_subset,
507 "Failed for {desc} with subset enabled"
508 );
509
510 let result = partition.satisfaction(&required, &eq_properties, false);
511 assert_eq!(
512 result, expected_without_subset,
513 "Failed for {desc} with subset disabled"
514 );
515 }
516
517 Ok(())
518 }
519
520 #[test]
521 fn test_partitioning_current_superset() -> Result<()> {
522 let schema = Arc::new(Schema::new(vec![
523 Field::new("a", DataType::Int64, false),
524 Field::new("b", DataType::Int64, false),
525 Field::new("c", DataType::Int64, false),
526 ]));
527
528 let col_a: Arc<dyn PhysicalExpr> =
529 Arc::new(Column::new_with_schema("a", &schema)?);
530 let col_b: Arc<dyn PhysicalExpr> =
531 Arc::new(Column::new_with_schema("b", &schema)?);
532 let col_c: Arc<dyn PhysicalExpr> =
533 Arc::new(Column::new_with_schema("c", &schema)?);
534 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
535
536 let test_cases = vec![
537 (
538 "Hash([a, b]) vs Hash([a])",
539 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
540 Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
541 PartitioningSatisfaction::NotSatisfied,
542 PartitioningSatisfaction::NotSatisfied,
543 ),
544 (
545 "Hash([a, b, c]) vs Hash([a])",
546 Partitioning::Hash(
547 vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
548 4,
549 ),
550 Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
551 PartitioningSatisfaction::NotSatisfied,
552 PartitioningSatisfaction::NotSatisfied,
553 ),
554 (
555 "Hash([a, b, c]) vs Hash([a, b])",
556 Partitioning::Hash(
557 vec![Arc::clone(&col_a), Arc::clone(&col_b), Arc::clone(&col_c)],
558 4,
559 ),
560 Distribution::HashPartitioned(vec![
561 Arc::clone(&col_a),
562 Arc::clone(&col_b),
563 ]),
564 PartitioningSatisfaction::NotSatisfied,
565 PartitioningSatisfaction::NotSatisfied,
566 ),
567 ];
568
569 for (desc, partition, required, expected_with_subset, expected_without_subset) in
570 test_cases
571 {
572 let result = partition.satisfaction(&required, &eq_properties, true);
573 assert_eq!(
574 result, expected_with_subset,
575 "Failed for {desc} with subset enabled"
576 );
577
578 let result = partition.satisfaction(&required, &eq_properties, false);
579 assert_eq!(
580 result, expected_without_subset,
581 "Failed for {desc} with subset disabled"
582 );
583 }
584
585 Ok(())
586 }
587
588 #[test]
589 fn test_partitioning_partial_overlap() -> Result<()> {
590 let schema = Arc::new(Schema::new(vec![
591 Field::new("a", DataType::Int64, false),
592 Field::new("b", DataType::Int64, false),
593 Field::new("c", DataType::Int64, false),
594 ]));
595
596 let col_a: Arc<dyn PhysicalExpr> =
597 Arc::new(Column::new_with_schema("a", &schema)?);
598 let col_b: Arc<dyn PhysicalExpr> =
599 Arc::new(Column::new_with_schema("b", &schema)?);
600 let col_c: Arc<dyn PhysicalExpr> =
601 Arc::new(Column::new_with_schema("c", &schema)?);
602 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
603
604 let test_cases = vec![(
605 "Partial overlap: Hash([a, c]) vs Hash([a, b])",
606 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_c)], 4),
607 Distribution::HashPartitioned(vec![Arc::clone(&col_a), Arc::clone(&col_b)]),
608 PartitioningSatisfaction::NotSatisfied,
609 PartitioningSatisfaction::NotSatisfied,
610 )];
611
612 for (desc, partition, required, expected_with_subset, expected_without_subset) in
613 test_cases
614 {
615 let result = partition.satisfaction(&required, &eq_properties, true);
616 assert_eq!(
617 result, expected_with_subset,
618 "Failed for {desc} with subset enabled"
619 );
620
621 let result = partition.satisfaction(&required, &eq_properties, false);
622 assert_eq!(
623 result, expected_without_subset,
624 "Failed for {desc} with subset disabled"
625 );
626 }
627
628 Ok(())
629 }
630
631 #[test]
632 fn test_partitioning_no_overlap() -> Result<()> {
633 let schema = Arc::new(Schema::new(vec![
634 Field::new("a", DataType::Int64, false),
635 Field::new("b", DataType::Int64, false),
636 Field::new("c", DataType::Int64, false),
637 ]));
638
639 let col_a: Arc<dyn PhysicalExpr> =
640 Arc::new(Column::new_with_schema("a", &schema)?);
641 let col_b: Arc<dyn PhysicalExpr> =
642 Arc::new(Column::new_with_schema("b", &schema)?);
643 let col_c: Arc<dyn PhysicalExpr> =
644 Arc::new(Column::new_with_schema("c", &schema)?);
645 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
646
647 let test_cases = vec![
648 (
649 "Hash([a]) vs Hash([b, c])",
650 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
651 Distribution::HashPartitioned(vec![
652 Arc::clone(&col_b),
653 Arc::clone(&col_c),
654 ]),
655 PartitioningSatisfaction::NotSatisfied,
656 PartitioningSatisfaction::NotSatisfied,
657 ),
658 (
659 "Hash([a, b]) vs Hash([c])",
660 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
661 Distribution::HashPartitioned(vec![Arc::clone(&col_c)]),
662 PartitioningSatisfaction::NotSatisfied,
663 PartitioningSatisfaction::NotSatisfied,
664 ),
665 ];
666
667 for (desc, partition, required, expected_with_subset, expected_without_subset) in
668 test_cases
669 {
670 let result = partition.satisfaction(&required, &eq_properties, true);
671 assert_eq!(
672 result, expected_with_subset,
673 "Failed for {desc} with subset enabled"
674 );
675
676 let result = partition.satisfaction(&required, &eq_properties, false);
677 assert_eq!(
678 result, expected_without_subset,
679 "Failed for {desc} with subset disabled"
680 );
681 }
682
683 Ok(())
684 }
685
686 #[test]
687 fn test_partitioning_exact_match() -> Result<()> {
688 let schema = Arc::new(Schema::new(vec![
689 Field::new("a", DataType::Int64, false),
690 Field::new("b", DataType::Int64, false),
691 ]));
692
693 let col_a: Arc<dyn PhysicalExpr> =
694 Arc::new(Column::new_with_schema("a", &schema)?);
695 let col_b: Arc<dyn PhysicalExpr> =
696 Arc::new(Column::new_with_schema("b", &schema)?);
697 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
698
699 let test_cases = vec![
700 (
701 "Hash([a, b]) vs Hash([a, b])",
702 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
703 Distribution::HashPartitioned(vec![
704 Arc::clone(&col_a),
705 Arc::clone(&col_b),
706 ]),
707 PartitioningSatisfaction::Exact,
708 PartitioningSatisfaction::Exact,
709 ),
710 (
711 "Hash([a]) vs Hash([a])",
712 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
713 Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
714 PartitioningSatisfaction::Exact,
715 PartitioningSatisfaction::Exact,
716 ),
717 ];
718
719 for (desc, partition, required, expected_with_subset, expected_without_subset) in
720 test_cases
721 {
722 let result = partition.satisfaction(&required, &eq_properties, true);
723 assert_eq!(
724 result, expected_with_subset,
725 "Failed for {desc} with subset enabled"
726 );
727
728 let result = partition.satisfaction(&required, &eq_properties, false);
729 assert_eq!(
730 result, expected_without_subset,
731 "Failed for {desc} with subset disabled"
732 );
733 }
734
735 Ok(())
736 }
737
738 #[test]
739 fn test_partitioning_unknown() -> Result<()> {
740 let schema = Arc::new(Schema::new(vec![
741 Field::new("a", DataType::Int64, false),
742 Field::new("b", DataType::Int64, false),
743 ]));
744
745 let col_a: Arc<dyn PhysicalExpr> =
746 Arc::new(Column::new_with_schema("a", &schema)?);
747 let col_b: Arc<dyn PhysicalExpr> =
748 Arc::new(Column::new_with_schema("b", &schema)?);
749 let unknown: Arc<dyn PhysicalExpr> = Arc::new(UnKnownColumn::new("dropped"));
750 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
751
752 let test_cases = vec![
753 (
754 "Hash([unknown]) vs Hash([a, b])",
755 Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
756 Distribution::HashPartitioned(vec![
757 Arc::clone(&col_a),
758 Arc::clone(&col_b),
759 ]),
760 PartitioningSatisfaction::NotSatisfied,
761 PartitioningSatisfaction::NotSatisfied,
762 ),
763 (
764 "Hash([a, b]) vs Hash([unknown])",
765 Partitioning::Hash(vec![Arc::clone(&col_a), Arc::clone(&col_b)], 4),
766 Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
767 PartitioningSatisfaction::NotSatisfied,
768 PartitioningSatisfaction::NotSatisfied,
769 ),
770 (
771 "Hash([unknown]) vs Hash([unknown])",
772 Partitioning::Hash(vec![Arc::clone(&unknown)], 4),
773 Distribution::HashPartitioned(vec![Arc::clone(&unknown)]),
774 PartitioningSatisfaction::NotSatisfied,
775 PartitioningSatisfaction::NotSatisfied,
776 ),
777 ];
778
779 for (desc, partition, required, expected_with_subset, expected_without_subset) in
780 test_cases
781 {
782 let result = partition.satisfaction(&required, &eq_properties, true);
783 assert_eq!(
784 result, expected_with_subset,
785 "Failed for {desc} with subset enabled"
786 );
787
788 let result = partition.satisfaction(&required, &eq_properties, false);
789 assert_eq!(
790 result, expected_without_subset,
791 "Failed for {desc} with subset disabled"
792 );
793 }
794
795 Ok(())
796 }
797
798 #[test]
799 fn test_partitioning_empty_hash() -> Result<()> {
800 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
801
802 let col_a: Arc<dyn PhysicalExpr> =
803 Arc::new(Column::new_with_schema("a", &schema)?);
804 let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
805
806 let test_cases = vec![
807 (
808 "Hash([]) vs Hash([a])",
809 Partitioning::Hash(vec![], 4),
810 Distribution::HashPartitioned(vec![Arc::clone(&col_a)]),
811 PartitioningSatisfaction::NotSatisfied,
812 PartitioningSatisfaction::NotSatisfied,
813 ),
814 (
815 "Hash([a]) vs Hash([])",
816 Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
817 Distribution::HashPartitioned(vec![]),
818 PartitioningSatisfaction::NotSatisfied,
819 PartitioningSatisfaction::NotSatisfied,
820 ),
821 (
822 "Hash([]) vs Hash([])",
823 Partitioning::Hash(vec![], 4),
824 Distribution::HashPartitioned(vec![]),
825 PartitioningSatisfaction::NotSatisfied,
826 PartitioningSatisfaction::NotSatisfied,
827 ),
828 ];
829
830 for (desc, partition, required, expected_with_subset, expected_without_subset) in
831 test_cases
832 {
833 let result = partition.satisfaction(&required, &eq_properties, true);
834 assert_eq!(
835 result, expected_with_subset,
836 "Failed for {desc} with subset enabled"
837 );
838
839 let result = partition.satisfaction(&required, &eq_properties, false);
840 assert_eq!(
841 result, expected_without_subset,
842 "Failed for {desc} with subset disabled"
843 );
844 }
845
846 Ok(())
847 }
848}