1use crate::{FileRange, PartitionedFile};
21use arrow::compute::SortOptions;
22use datafusion_common::Statistics;
23use datafusion_common::utils::compare_rows;
24use itertools::Itertools;
25use std::cmp::{Ordering, min};
26use std::collections::{BinaryHeap, HashMap};
27use std::iter::repeat_with;
28use std::mem;
29use std::ops::{Deref, DerefMut, Index, IndexMut};
30use std::sync::Arc;
31
32#[derive(Debug, Clone, Copy)]
131pub struct FileGroupPartitioner {
132 target_partitions: usize,
134 repartition_file_min_size: usize,
136 preserve_order_within_groups: bool,
138}
139
140impl Default for FileGroupPartitioner {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146impl FileGroupPartitioner {
147 pub fn new() -> Self {
152 Self {
153 target_partitions: 1,
154 repartition_file_min_size: 10 * 1024 * 1024,
155 preserve_order_within_groups: false,
156 }
157 }
158
159 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
161 self.target_partitions = target_partitions;
162 self
163 }
164
165 pub fn with_repartition_file_min_size(
167 mut self,
168 repartition_file_min_size: usize,
169 ) -> Self {
170 self.repartition_file_min_size = repartition_file_min_size;
171 self
172 }
173
174 pub fn with_preserve_order_within_groups(
176 mut self,
177 preserve_order_within_groups: bool,
178 ) -> Self {
179 self.preserve_order_within_groups = preserve_order_within_groups;
180 self
181 }
182
183 pub fn repartition_file_groups(
187 &self,
188 file_groups: &[FileGroup],
189 ) -> Option<Vec<FileGroup>> {
190 if file_groups.is_empty() {
191 return None;
192 }
193
194 if self.preserve_order_within_groups {
196 self.repartition_preserving_order(file_groups)
197 } else {
198 self.repartition_evenly_by_size(file_groups)
199 }
200 }
201
202 fn repartition_evenly_by_size(
205 &self,
206 file_groups: &[FileGroup],
207 ) -> Option<Vec<FileGroup>> {
208 let target_partitions = self.target_partitions;
209 let repartition_file_min_size = self.repartition_file_min_size;
210 let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
211
212 let total_size = flattened_files
213 .iter()
214 .map(|f| f.effective_size())
215 .sum::<u64>();
216 if total_size < (repartition_file_min_size as u64) || total_size == 0 {
217 return None;
218 }
219
220 let target_partition_size = total_size.div_ceil(target_partitions as u64);
221
222 let current_partition_index: usize = 0;
223 let current_partition_size: u64 = 0;
224
225 let repartitioned_files = flattened_files
227 .into_iter()
228 .scan(
229 (current_partition_index, current_partition_size),
230 |(current_partition_index, current_partition_size), source_file| {
231 let mut produced_files = vec![];
232 let (mut range_start, file_end) = source_file.range();
233 while range_start < file_end {
234 let range_end = min(
235 range_start
236 + (target_partition_size - *current_partition_size),
237 file_end,
238 );
239
240 let mut produced_file = source_file.clone();
241 produced_file.range = Some(FileRange {
242 start: range_start as i64,
243 end: range_end as i64,
244 });
245 produced_files.push((*current_partition_index, produced_file));
246
247 if *current_partition_size + (range_end - range_start)
248 >= target_partition_size
249 {
250 *current_partition_index += 1;
251 *current_partition_size = 0;
252 } else {
253 *current_partition_size += range_end - range_start;
254 }
255 range_start = range_end;
256 }
257 Some(produced_files)
258 },
259 )
260 .flatten()
261 .chunk_by(|(partition_idx, _)| *partition_idx)
262 .into_iter()
263 .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
264 .collect_vec();
265
266 Some(repartitioned_files)
267 }
268
269 fn repartition_preserving_order(
271 &self,
272 file_groups: &[FileGroup],
273 ) -> Option<Vec<FileGroup>> {
274 if file_groups.len() >= self.target_partitions {
277 return None;
278 }
279 let num_new_groups = self.target_partitions - file_groups.len();
280
281 if file_groups.len() == 1 && file_groups[0].len() == 1 {
283 return self.repartition_evenly_by_size(file_groups);
284 }
285
286 let mut heap: BinaryHeap<_> = file_groups
288 .iter()
289 .enumerate()
290 .filter_map(|(group_index, group)| {
291 if group.len() == 1 {
293 Some(ToRepartition {
294 source_index: group_index,
295 file_size: group[0].effective_size(),
296 new_groups: vec![group_index],
297 })
298 } else {
299 None
300 }
301 })
302 .map(CompareByRangeSize)
303 .collect();
304
305 if heap.is_empty() {
307 return None;
308 }
309
310 let mut file_groups: Vec<_> = file_groups
313 .iter()
314 .cloned()
315 .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
316 .collect();
317
318 for (group_index, group) in file_groups.iter().enumerate() {
320 if !group.is_empty() {
321 continue;
322 }
323 let mut largest_group = heap.pop().unwrap();
325 largest_group.new_groups.push(group_index);
326 heap.push(largest_group);
327 }
328
329 while let Some(to_repartition) = heap.pop() {
331 let range_size = to_repartition.range_size();
332 let ToRepartition {
333 source_index,
334 file_size: _,
335 new_groups,
336 } = to_repartition.into_inner();
337 assert_eq!(file_groups[source_index].len(), 1);
338 let original_file = file_groups[source_index].pop().unwrap();
339
340 let last_group = new_groups.len() - 1;
341 let (mut range_start, file_end) = original_file.range();
342 let mut range_end = range_start + range_size;
343 for (i, group_index) in new_groups.into_iter().enumerate() {
344 let target_group = &mut file_groups[group_index];
345 assert!(target_group.is_empty());
346
347 if i == last_group {
349 range_end = file_end;
350 }
351 target_group.push(
352 original_file
353 .clone()
354 .with_range(range_start as i64, range_end as i64),
355 );
356 range_start = range_end;
357 range_end += range_size;
358 }
359 }
360
361 Some(file_groups)
362 }
363}
364
365#[derive(Debug, Clone)]
380pub struct FileGroup {
381 files: Vec<PartitionedFile>,
383 statistics: Option<Arc<Statistics>>,
389}
390
391impl FileGroup {
392 pub fn new(files: Vec<PartitionedFile>) -> Self {
394 Self {
395 files,
396 statistics: None,
397 }
398 }
399
400 pub fn len(&self) -> usize {
402 self.files.len()
403 }
404
405 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
407 self.statistics = Some(statistics);
408 self
409 }
410
411 pub fn files(&self) -> &[PartitionedFile] {
413 &self.files
414 }
415
416 pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
417 self.files.iter()
418 }
419
420 pub fn into_inner(self) -> Vec<PartitionedFile> {
421 self.files
422 }
423
424 pub fn is_empty(&self) -> bool {
425 self.files.is_empty()
426 }
427
428 pub fn pop(&mut self) -> Option<PartitionedFile> {
430 self.files.pop()
431 }
432
433 pub fn push(&mut self, partitioned_file: PartitionedFile) {
435 self.files.push(partitioned_file);
436 }
437
438 pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
441 if let Some(index) = index {
442 self.files.get(index).and_then(|f| f.statistics.as_deref())
443 } else {
444 self.statistics.as_deref()
445 }
446 }
447
448 pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
450 self.statistics.as_mut().map(Arc::make_mut)
451 }
452
453 pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
455 if self.is_empty() {
456 return vec![];
457 }
458
459 self.files.sort_by(|a, b| a.path().cmp(b.path()));
463
464 let chunk_size = self.len().div_ceil(n);
466 let mut chunks = Vec::with_capacity(n);
467 let mut current_chunk = Vec::with_capacity(chunk_size);
468 for file in self.files.drain(..) {
469 current_chunk.push(file);
470 if current_chunk.len() == chunk_size {
471 let full_chunk = FileGroup::new(mem::replace(
472 &mut current_chunk,
473 Vec::with_capacity(chunk_size),
474 ));
475 chunks.push(full_chunk);
476 }
477 }
478
479 if !current_chunk.is_empty() {
480 chunks.push(FileGroup::new(current_chunk))
481 }
482
483 chunks
484 }
485
486 #[allow(clippy::allow_attributes, clippy::mutable_key_type)] pub fn group_by_partition_values(
493 self,
494 max_target_partitions: usize,
495 ) -> Vec<FileGroup> {
496 if self.is_empty() || max_target_partitions == 0 {
497 return vec![];
498 }
499
500 let mut partition_groups: HashMap<
501 Vec<datafusion_common::ScalarValue>,
502 Vec<PartitionedFile>,
503 > = HashMap::new();
504
505 for file in self.files {
506 partition_groups
507 .entry(file.partition_values.clone())
508 .or_default()
509 .push(file);
510 }
511
512 let num_unique_partitions = partition_groups.len();
513
514 let mut sorted_partitions: Vec<_> = partition_groups.into_iter().collect();
516 let sort_options =
517 vec![
518 SortOptions::default();
519 sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
520 ];
521 sorted_partitions.sort_by(|a, b| {
522 compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
523 });
524
525 if num_unique_partitions <= max_target_partitions {
526 sorted_partitions
527 .into_iter()
528 .map(|(_, files)| FileGroup::new(files))
529 .collect()
530 } else {
531 let mut target_groups = vec![vec![]; max_target_partitions];
535
536 for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() {
537 let bucket = idx % max_target_partitions;
538 target_groups[bucket].extend(files);
539 }
540
541 target_groups.into_iter().map(FileGroup::new).collect()
542 }
543 }
544}
545
546impl Index<usize> for FileGroup {
547 type Output = PartitionedFile;
548
549 fn index(&self, index: usize) -> &Self::Output {
550 &self.files[index]
551 }
552}
553
554impl IndexMut<usize> for FileGroup {
555 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
556 &mut self.files[index]
557 }
558}
559
560impl FromIterator<PartitionedFile> for FileGroup {
561 fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
562 let files = iter.into_iter().collect();
563 FileGroup::new(files)
564 }
565}
566
567impl From<Vec<PartitionedFile>> for FileGroup {
568 fn from(files: Vec<PartitionedFile>) -> Self {
569 FileGroup::new(files)
570 }
571}
572
573impl Default for FileGroup {
574 fn default() -> Self {
575 Self::new(Vec::new())
576 }
577}
578
579#[derive(Debug, Clone)]
581struct ToRepartition {
582 source_index: usize,
584 file_size: u64,
586 new_groups: Vec<usize>,
588}
589
590impl ToRepartition {
591 fn range_size(&self) -> u64 {
593 self.file_size / (self.new_groups.len() as u64)
594 }
595}
596
597struct CompareByRangeSize(ToRepartition);
598impl CompareByRangeSize {
599 fn into_inner(self) -> ToRepartition {
600 self.0
601 }
602}
603impl Ord for CompareByRangeSize {
604 fn cmp(&self, other: &Self) -> Ordering {
605 self.0.range_size().cmp(&other.0.range_size())
606 }
607}
608impl PartialOrd for CompareByRangeSize {
609 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
610 Some(self.cmp(other))
611 }
612}
613impl PartialEq for CompareByRangeSize {
614 fn eq(&self, other: &Self) -> bool {
615 self.cmp(other) == Ordering::Equal
617 }
618}
619impl Eq for CompareByRangeSize {}
620impl Deref for CompareByRangeSize {
621 type Target = ToRepartition;
622 fn deref(&self) -> &Self::Target {
623 &self.0
624 }
625}
626impl DerefMut for CompareByRangeSize {
627 fn deref_mut(&mut self) -> &mut Self::Target {
628 &mut self.0
629 }
630}
631
632#[cfg(test)]
633mod test {
634 use super::*;
635 use datafusion_common::ScalarValue;
636
637 #[test]
639 fn repartition_empty_file_only() {
640 let partitioned_file_empty = pfile("empty", 0);
641 let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
642
643 let partitioned_files = FileGroupPartitioner::new()
644 .with_target_partitions(4)
645 .with_repartition_file_min_size(0)
646 .repartition_file_groups(&file_group);
647
648 assert_partitioned_files(None, partitioned_files);
649 }
650
651 #[test]
653 fn repartition_empty_files() {
654 let pfile_a = pfile("a", 10);
655 let pfile_b = pfile("b", 10);
656 let pfile_empty = pfile("empty", 0);
657
658 let empty_first = vec![
659 FileGroup::new(vec![pfile_empty.clone()]),
660 FileGroup::new(vec![pfile_a.clone()]),
661 FileGroup::new(vec![pfile_b.clone()]),
662 ];
663 let empty_middle = vec![
664 FileGroup::new(vec![pfile_a.clone()]),
665 FileGroup::new(vec![pfile_empty.clone()]),
666 FileGroup::new(vec![pfile_b.clone()]),
667 ];
668 let empty_last = vec![
669 FileGroup::new(vec![pfile_a]),
670 FileGroup::new(vec![pfile_b]),
671 FileGroup::new(vec![pfile_empty]),
672 ];
673
674 let expected_2 = vec![
676 FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
677 FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
678 ];
679 let expected_3 = vec![
680 FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
681 FileGroup::new(vec![
682 pfile("a", 10).with_range(7, 10),
683 pfile("b", 10).with_range(0, 4),
684 ]),
685 FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
686 ];
687
688 let file_groups_tests = [empty_first, empty_middle, empty_last];
689
690 for fg in file_groups_tests {
691 let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
692 for (n_partition, expected) in all_expected {
693 let actual = FileGroupPartitioner::new()
694 .with_target_partitions(n_partition)
695 .with_repartition_file_min_size(10)
696 .repartition_file_groups(&fg);
697
698 assert_partitioned_files(Some(expected), actual);
699 }
700 }
701 }
702
703 #[test]
704 fn repartition_single_file() {
705 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
707
708 let actual = FileGroupPartitioner::new()
709 .with_target_partitions(4)
710 .with_repartition_file_min_size(10)
711 .repartition_file_groups(&single_partition);
712
713 let expected = Some(vec![
714 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
715 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
716 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
717 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
718 ]);
719 assert_partitioned_files(expected, actual);
720 }
721
722 #[test]
723 fn repartition_single_file_with_range() {
724 let single_partition =
726 vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
727
728 let actual = FileGroupPartitioner::new()
729 .with_target_partitions(4)
730 .with_repartition_file_min_size(10)
731 .repartition_file_groups(&single_partition);
732
733 let expected = Some(vec![
734 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
735 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
736 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
737 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
738 ]);
739 assert_partitioned_files(expected, actual);
740 }
741
742 #[test]
743 fn repartition_single_file_with_incomplete_range() {
744 let single_partition =
746 vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
747
748 let actual = FileGroupPartitioner::new()
749 .with_target_partitions(4)
750 .with_repartition_file_min_size(10)
751 .repartition_file_groups(&single_partition);
752
753 let expected = Some(vec![
754 FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
755 FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
756 FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
757 FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
758 ]);
759 assert_partitioned_files(expected, actual);
760 }
761
762 #[test]
763 fn repartition_single_file_duplicated_with_range() {
764 let single_partition = vec![FileGroup::new(vec![
766 pfile("a", 100).with_range(0, 50),
767 pfile("a", 100).with_range(50, 100),
768 ])];
769
770 let actual = FileGroupPartitioner::new()
771 .with_target_partitions(4)
772 .with_repartition_file_min_size(10)
773 .repartition_file_groups(&single_partition);
774
775 let expected = Some(vec![
776 FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
777 FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
778 FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
779 FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
780 ]);
781 assert_partitioned_files(expected, actual);
782 }
783
784 #[test]
785 fn repartition_too_much_partitions() {
786 let partitioned_file = pfile("a", 8);
788 let single_partition = vec![FileGroup::new(vec![partitioned_file])];
789
790 let actual = FileGroupPartitioner::new()
791 .with_target_partitions(96)
792 .with_repartition_file_min_size(5)
793 .repartition_file_groups(&single_partition);
794
795 let expected = Some(vec![
796 FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
797 FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
798 FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
799 FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
800 FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
801 FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
802 FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
803 FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
804 ]);
805
806 assert_partitioned_files(expected, actual);
807 }
808
809 #[test]
810 fn repartition_multiple_partitions() {
811 let source_partitions = vec![
813 FileGroup::new(vec![pfile("a", 40)]),
814 FileGroup::new(vec![pfile("b", 60)]),
815 ];
816
817 let actual = FileGroupPartitioner::new()
818 .with_target_partitions(3)
819 .with_repartition_file_min_size(10)
820 .repartition_file_groups(&source_partitions);
821
822 let expected = Some(vec![
823 FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
824 FileGroup::new(vec![
825 pfile("a", 40).with_range(34, 40),
826 pfile("b", 60).with_range(0, 28),
827 ]),
828 FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
829 ]);
830 assert_partitioned_files(expected, actual);
831 }
832
833 #[test]
834 fn repartition_same_num_partitions() {
835 let source_partitions = vec![
837 FileGroup::new(vec![pfile("a", 40)]),
838 FileGroup::new(vec![pfile("b", 60)]),
839 ];
840
841 let actual = FileGroupPartitioner::new()
842 .with_target_partitions(2)
843 .with_repartition_file_min_size(10)
844 .repartition_file_groups(&source_partitions);
845
846 let expected = Some(vec![
847 FileGroup::new(vec![
848 pfile("a", 40).with_range(0, 40),
849 pfile("b", 60).with_range(0, 10),
850 ]),
851 FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
852 ]);
853 assert_partitioned_files(expected, actual);
854 }
855
856 #[test]
857 fn repartition_no_action_min_size() {
858 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
860
861 let actual = FileGroupPartitioner::new()
862 .with_target_partitions(65)
863 .with_repartition_file_min_size(500)
864 .repartition_file_groups(&single_partition);
865
866 assert_partitioned_files(None, actual)
867 }
868
869 #[test]
870 fn repartition_no_action_zero_files() {
871 let empty_partition = vec![];
873
874 let partitioner = FileGroupPartitioner::new()
875 .with_target_partitions(65)
876 .with_repartition_file_min_size(500);
877
878 assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
879 }
880
881 #[test]
882 fn repartition_ordered_no_action_too_few_partitions() {
883 let input_partitions = vec![
885 FileGroup::new(vec![pfile("a", 100)]),
886 FileGroup::new(vec![pfile("b", 200)]),
887 ];
888
889 let actual = FileGroupPartitioner::new()
890 .with_preserve_order_within_groups(true)
891 .with_target_partitions(2)
892 .with_repartition_file_min_size(10)
893 .repartition_file_groups(&input_partitions);
894
895 assert_partitioned_files(None, actual)
896 }
897
898 #[test]
899 fn repartition_ordered_no_action_file_too_small() {
900 let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
902
903 let actual = FileGroupPartitioner::new()
904 .with_preserve_order_within_groups(true)
905 .with_target_partitions(2)
906 .with_repartition_file_min_size(1000)
908 .repartition_file_groups(&single_partition);
909
910 assert_partitioned_files(None, actual)
911 }
912
913 #[test]
914 fn repartition_ordered_one_large_file() {
915 let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
917
918 let actual = FileGroupPartitioner::new()
919 .with_preserve_order_within_groups(true)
920 .with_target_partitions(3)
921 .with_repartition_file_min_size(10)
922 .repartition_file_groups(&source_partitions);
923
924 let expected = Some(vec![
925 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
926 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
927 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
928 ]);
929 assert_partitioned_files(expected, actual);
930 }
931
932 #[test]
933 fn repartition_ordered_one_large_file_with_range() {
934 let source_partitions =
936 vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
937
938 let actual = FileGroupPartitioner::new()
939 .with_preserve_order_within_groups(true)
940 .with_target_partitions(3)
941 .with_repartition_file_min_size(10)
942 .repartition_file_groups(&source_partitions);
943
944 let expected = Some(vec![
945 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
946 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
947 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
948 ]);
949 assert_partitioned_files(expected, actual);
950 }
951
952 #[test]
953 fn repartition_ordered_one_large_one_small_file() {
954 let source_partitions = vec![
957 FileGroup::new(vec![pfile("a", 100)]),
958 FileGroup::new(vec![pfile("b", 30)]),
959 ];
960
961 let actual = FileGroupPartitioner::new()
962 .with_preserve_order_within_groups(true)
963 .with_target_partitions(4)
964 .with_repartition_file_min_size(10)
965 .repartition_file_groups(&source_partitions);
966
967 let expected = Some(vec![
968 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
970 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
972 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
974 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
976 ]);
977 assert_partitioned_files(expected, actual);
978 }
979
980 #[test]
981 fn repartition_ordered_one_large_one_small_file_with_full_range() {
982 let source_partitions = vec![
985 FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
986 FileGroup::new(vec![pfile("b", 30)]),
987 ];
988
989 let actual = FileGroupPartitioner::new()
990 .with_preserve_order_within_groups(true)
991 .with_target_partitions(4)
992 .with_repartition_file_min_size(10)
993 .repartition_file_groups(&source_partitions);
994
995 let expected = Some(vec![
996 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
998 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1000 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1002 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1004 ]);
1005 assert_partitioned_files(expected, actual);
1006 }
1007
1008 #[test]
1009 fn repartition_ordered_one_large_one_small_file_with_split_range() {
1010 let source_partitions = vec![
1013 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1014 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1015 FileGroup::new(vec![pfile("b", 30)]),
1016 ];
1017
1018 let actual = FileGroupPartitioner::new()
1019 .with_preserve_order_within_groups(true)
1020 .with_target_partitions(4)
1021 .with_repartition_file_min_size(10)
1022 .repartition_file_groups(&source_partitions);
1023
1024 let expected = Some(vec![
1025 FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
1027 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1029 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1031 FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
1033 ]);
1034 assert_partitioned_files(expected, actual);
1035 }
1036
1037 #[test]
1038 fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
1039 let source_partitions = vec![
1042 FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
1043 FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1044 ];
1045
1046 let actual = FileGroupPartitioner::new()
1047 .with_preserve_order_within_groups(true)
1048 .with_target_partitions(4)
1049 .with_repartition_file_min_size(10)
1050 .repartition_file_groups(&source_partitions);
1051
1052 let expected = Some(vec![
1053 FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
1055 FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1057 FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
1059 FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
1061 ]);
1062 assert_partitioned_files(expected, actual);
1063 }
1064
1065 #[test]
1066 fn repartition_ordered_two_large_files() {
1067 let source_partitions = vec![
1069 FileGroup::new(vec![pfile("a", 100)]),
1070 FileGroup::new(vec![pfile("b", 100)]),
1071 ];
1072
1073 let actual = FileGroupPartitioner::new()
1074 .with_preserve_order_within_groups(true)
1075 .with_target_partitions(4)
1076 .with_repartition_file_min_size(10)
1077 .repartition_file_groups(&source_partitions);
1078
1079 let expected = Some(vec![
1080 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1082 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1084 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1086 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1088 ]);
1089 assert_partitioned_files(expected, actual);
1090 }
1091
1092 #[test]
1093 fn repartition_ordered_two_large_one_small_files() {
1094 let source_partitions = vec![
1096 FileGroup::new(vec![pfile("a", 100)]),
1097 FileGroup::new(vec![pfile("b", 100)]),
1098 FileGroup::new(vec![pfile("c", 30)]),
1099 ];
1100
1101 let partitioner = FileGroupPartitioner::new()
1102 .with_preserve_order_within_groups(true)
1103 .with_repartition_file_min_size(10);
1104
1105 let actual = partitioner
1107 .with_target_partitions(4)
1108 .repartition_file_groups(&source_partitions);
1109
1110 let expected = Some(vec![
1111 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1113 FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
1115 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1117 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1119 ]);
1120 assert_partitioned_files(expected, actual);
1121
1122 let actual = partitioner
1124 .with_target_partitions(5)
1125 .repartition_file_groups(&source_partitions);
1126
1127 let expected = Some(vec![
1128 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1130 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1132 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1134 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1136 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1138 ]);
1139 assert_partitioned_files(expected, actual);
1140 }
1141
1142 #[test]
1143 fn repartition_ordered_one_large_one_small_existing_empty() {
1144 let source_partitions = vec![
1146 FileGroup::new(vec![pfile("a", 100)]),
1147 FileGroup::default(),
1148 FileGroup::new(vec![pfile("b", 40)]),
1149 FileGroup::default(),
1150 ];
1151
1152 let actual = FileGroupPartitioner::new()
1153 .with_preserve_order_within_groups(true)
1154 .with_target_partitions(5)
1155 .with_repartition_file_min_size(10)
1156 .repartition_file_groups(&source_partitions);
1157
1158 let expected = Some(vec![
1161 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
1163 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1164 FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
1166 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1168 FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
1170 ]);
1171 assert_partitioned_files(expected, actual);
1172 }
1173 #[test]
1174 fn repartition_ordered_existing_group_multiple_files() {
1175 let source_partitions = vec![
1177 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1179 FileGroup::new(vec![pfile("c", 40)]),
1180 ];
1181
1182 let actual = FileGroupPartitioner::new()
1183 .with_preserve_order_within_groups(true)
1184 .with_target_partitions(3)
1185 .with_repartition_file_min_size(10)
1186 .repartition_file_groups(&source_partitions);
1187
1188 let expected = Some(vec![
1191 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1195 FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
1197 FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
1199 ]);
1200 assert_partitioned_files(expected, actual);
1201 }
1202
1203 fn assert_partitioned_files(
1206 expected: Option<Vec<FileGroup>>,
1207 actual: Option<Vec<FileGroup>>,
1208 ) {
1209 match (expected, actual) {
1210 (None, None) => {}
1211 (Some(_), None) => panic!("Expected Some, got None"),
1212 (None, Some(_)) => panic!("Expected None, got Some"),
1213 (Some(expected), Some(actual)) => {
1214 let expected_string = format!("{expected:#?}");
1215 let actual_string = format!("{actual:#?}");
1216 assert_eq!(expected_string, actual_string);
1217 }
1218 }
1219 }
1220
1221 fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
1223 PartitionedFile::new(path, file_size)
1224 }
1225
1226 fn pfile_with_pv(path: &str, pv: &str) -> PartitionedFile {
1228 let mut file = pfile(path, 10);
1229 file.partition_values = vec![ScalarValue::from(pv)];
1230 file
1231 }
1232
1233 fn repartition_test(
1236 partitioner: FileGroupPartitioner,
1237 file_groups: Vec<FileGroup>,
1238 ) -> Option<Vec<FileGroup>> {
1239 let repartitioned = partitioner.repartition_file_groups(&file_groups);
1240
1241 let repartitioned_preserving_sort = partitioner
1242 .with_preserve_order_within_groups(true)
1243 .repartition_file_groups(&file_groups);
1244
1245 assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1246 repartitioned
1247 }
1248
1249 #[test]
1250 fn test_group_by_partition_values_edge_cases() {
1251 assert!(FileGroup::default().group_by_partition_values(4).is_empty());
1253 assert!(
1254 FileGroup::new(vec![pfile("a", 100)])
1255 .group_by_partition_values(0)
1256 .is_empty()
1257 );
1258 }
1259
1260 #[test]
1261 fn test_group_by_partition_values_less_groups_than_target() {
1262 let fg = FileGroup::new(vec![
1267 pfile_with_pv("a", "p1"),
1268 pfile_with_pv("b", "p1"),
1269 pfile_with_pv("c", "p2"),
1270 ]);
1271 let groups = fg.group_by_partition_values(4);
1272 assert_eq!(groups.len(), 2);
1273 assert_eq!(groups[0].len(), 2);
1274 assert_eq!(groups[1].len(), 1);
1275 }
1276
1277 #[test]
1278 fn test_group_by_partition_values_more_groups_than_target() {
1279 let fg = FileGroup::new(vec![
1282 pfile_with_pv("a", "p1"),
1283 pfile_with_pv("b", "p2"),
1284 pfile_with_pv("c", "p3"),
1285 pfile_with_pv("d", "p4"),
1286 pfile_with_pv("e", "p5"),
1287 ]);
1288 let groups = fg.group_by_partition_values(3);
1289 assert_eq!(groups.len(), 3);
1290 assert_eq!(groups[0].len(), 2);
1291 assert_eq!(groups[1].len(), 2);
1292 assert_eq!(groups[2].len(), 1);
1293 }
1294}