1use crate::{FileRange, PartitionedFile};
21use arrow::compute::SortOptions;
22use datafusion_common::Statistics;
23use datafusion_common::utils::compare_rows;
24use itertools::Itertools;
25use std::cmp::{Ordering, min};
26use std::collections::{BinaryHeap, HashMap};
27use std::iter::repeat_with;
28use std::mem;
29use std::ops::{Deref, DerefMut, Index, IndexMut};
30use std::sync::Arc;
31
32#[derive(Debug, Clone, Copy)]
131pub struct FileGroupPartitioner {
132 target_partitions: usize,
134 repartition_file_min_size: usize,
136 preserve_order_within_groups: bool,
138}
139
140impl Default for FileGroupPartitioner {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146impl FileGroupPartitioner {
147 pub fn new() -> Self {
152 Self {
153 target_partitions: 1,
154 repartition_file_min_size: 10 * 1024 * 1024,
155 preserve_order_within_groups: false,
156 }
157 }
158
159 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
161 self.target_partitions = target_partitions;
162 self
163 }
164
165 pub fn with_repartition_file_min_size(
167 mut self,
168 repartition_file_min_size: usize,
169 ) -> Self {
170 self.repartition_file_min_size = repartition_file_min_size;
171 self
172 }
173
174 pub fn with_preserve_order_within_groups(
176 mut self,
177 preserve_order_within_groups: bool,
178 ) -> Self {
179 self.preserve_order_within_groups = preserve_order_within_groups;
180 self
181 }
182
183 pub fn repartition_file_groups(
187 &self,
188 file_groups: &[FileGroup],
189 ) -> Option<Vec<FileGroup>> {
190 if file_groups.is_empty() {
191 return None;
192 }
193
194 if self.preserve_order_within_groups {
196 self.repartition_preserving_order(file_groups)
197 } else {
198 self.repartition_evenly_by_size(file_groups)
199 }
200 }
201
202 fn repartition_evenly_by_size(
205 &self,
206 file_groups: &[FileGroup],
207 ) -> Option<Vec<FileGroup>> {
208 let target_partitions = self.target_partitions;
209 let repartition_file_min_size = self.repartition_file_min_size;
210 let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
211
212 let total_size = flattened_files
213 .iter()
214 .map(|f| f.effective_size())
215 .sum::<u64>();
216 if total_size < (repartition_file_min_size as u64) || total_size == 0 {
217 return None;
218 }
219
220 let target_partition_size = total_size.div_ceil(target_partitions as u64);
221
222 let current_partition_index: usize = 0;
223 let current_partition_size: u64 = 0;
224
225 let repartitioned_files = flattened_files
227 .into_iter()
228 .scan(
229 (current_partition_index, current_partition_size),
230 |(current_partition_index, current_partition_size), source_file| {
231 let mut produced_files = vec![];
232 let (mut range_start, file_end) = source_file.range();
233 while range_start < file_end {
234 let range_end = min(
235 range_start
236 + (target_partition_size - *current_partition_size),
237 file_end,
238 );
239
240 let mut produced_file = source_file.clone();
241 produced_file.range = Some(FileRange {
242 start: range_start as i64,
243 end: range_end as i64,
244 });
245 produced_files.push((*current_partition_index, produced_file));
246
247 if *current_partition_size + (range_end - range_start)
248 >= target_partition_size
249 {
250 *current_partition_index += 1;
251 *current_partition_size = 0;
252 } else {
253 *current_partition_size += range_end - range_start;
254 }
255 range_start = range_end;
256 }
257 Some(produced_files)
258 },
259 )
260 .flatten()
261 .chunk_by(|(partition_idx, _)| *partition_idx)
262 .into_iter()
263 .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
264 .collect_vec();
265
266 Some(repartitioned_files)
267 }
268
269 fn repartition_preserving_order(
271 &self,
272 file_groups: &[FileGroup],
273 ) -> Option<Vec<FileGroup>> {
274 if file_groups.len() >= self.target_partitions {
277 return None;
278 }
279 let num_new_groups = self.target_partitions - file_groups.len();
280
281 if file_groups.len() == 1 && file_groups[0].len() == 1 {
283 return self.repartition_evenly_by_size(file_groups);
284 }
285
286 let mut heap: BinaryHeap<_> = file_groups
288 .iter()
289 .enumerate()
290 .filter_map(|(group_index, group)| {
291 if group.len() == 1 {
293 Some(ToRepartition {
294 source_index: group_index,
295 file_size: group[0].effective_size(),
296 new_groups: vec![group_index],
297 })
298 } else {
299 None
300 }
301 })
302 .map(CompareByRangeSize)
303 .collect();
304
305 if heap.is_empty() {
307 return None;
308 }
309
310 let mut file_groups: Vec<_> = file_groups
313 .iter()
314 .cloned()
315 .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
316 .collect();
317
318 for (group_index, group) in file_groups.iter().enumerate() {
320 if !group.is_empty() {
321 continue;
322 }
323 let mut largest_group = heap.pop().unwrap();
325 largest_group.new_groups.push(group_index);
326 heap.push(largest_group);
327 }
328
329 while let Some(to_repartition) = heap.pop() {
331 let range_size = to_repartition.range_size();
332 let ToRepartition {
333 source_index,
334 file_size: _,
335 new_groups,
336 } = to_repartition.into_inner();
337 assert_eq!(file_groups[source_index].len(), 1);
338 let original_file = file_groups[source_index].pop().unwrap();
339
340 let last_group = new_groups.len() - 1;
341 let (mut range_start, file_end) = original_file.range();
342 let mut range_end = range_start + range_size;
343 for (i, group_index) in new_groups.into_iter().enumerate() {
344 let target_group = &mut file_groups[group_index];
345 assert!(target_group.is_empty());
346
347 if i == last_group {
349 range_end = file_end;
350 }
351 target_group.push(
352 original_file
353 .clone()
354 .with_range(range_start as i64, range_end as i64),
355 );
356 range_start = range_end;
357 range_end += range_size;
358 }
359 }
360
361 Some(file_groups)
362 }
363}
364
365#[derive(Debug, Clone)]
380pub struct FileGroup {
381 files: Vec<PartitionedFile>,
383 statistics: Option<Arc<Statistics>>,
389}
390
391impl FileGroup {
392 pub fn new(files: Vec<PartitionedFile>) -> Self {
394 Self {
395 files,
396 statistics: None,
397 }
398 }
399
400 pub fn len(&self) -> usize {
402 self.files.len()
403 }
404
405 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
407 self.statistics = Some(statistics);
408 self
409 }
410
411 pub fn files(&self) -> &[PartitionedFile] {
413 &self.files
414 }
415
416 pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
417 self.files.iter()
418 }
419
420 pub fn into_inner(self) -> Vec<PartitionedFile> {
421 self.files
422 }
423
424 pub fn is_empty(&self) -> bool {
425 self.files.is_empty()
426 }
427
428 pub fn pop(&mut self) -> Option<PartitionedFile> {
430 self.files.pop()
431 }
432
433 pub fn push(&mut self, partitioned_file: PartitionedFile) {
435 self.files.push(partitioned_file);
436 }
437
438 pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
441 if let Some(index) = index {
442 self.files.get(index).and_then(|f| f.statistics.as_deref())
443 } else {
444 self.statistics.as_deref()
445 }
446 }
447
448 pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
450 self.statistics.as_mut().map(Arc::make_mut)
451 }
452
453 pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
455 if self.is_empty() {
456 return vec![];
457 }
458
459 self.files.sort_by(|a, b| a.path().cmp(b.path()));
463
464 let chunk_size = self.len().div_ceil(n);
466 let mut chunks = Vec::with_capacity(n);
467 let mut current_chunk = Vec::with_capacity(chunk_size);
468 for file in self.files.drain(..) {
469 current_chunk.push(file);
470 if current_chunk.len() == chunk_size {
471 let full_chunk = FileGroup::new(mem::replace(
472 &mut current_chunk,
473 Vec::with_capacity(chunk_size),
474 ));
475 chunks.push(full_chunk);
476 }
477 }
478
479 if !current_chunk.is_empty() {
480 chunks.push(FileGroup::new(current_chunk))
481 }
482
483 chunks
484 }
485
486 pub fn group_by_partition_values(
492 self,
493 max_target_partitions: usize,
494 ) -> Vec<FileGroup> {
495 if self.is_empty() || max_target_partitions == 0 {
496 return vec![];
497 }
498
499 let mut partition_groups: HashMap<
500 Vec<datafusion_common::ScalarValue>,
501 Vec<PartitionedFile>,
502 > = HashMap::new();
503
504 for file in self.files {
505 partition_groups
506 .entry(file.partition_values.clone())
507 .or_default()
508 .push(file);
509 }
510
511 let num_unique_partitions = partition_groups.len();
512
513 let mut sorted_partitions: Vec<_> = partition_groups.into_iter().collect();
515 let sort_options =
516 vec![
517 SortOptions::default();
518 sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
519 ];
520 sorted_partitions.sort_by(|a, b| {
521 compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
522 });
523
524 if num_unique_partitions <= max_target_partitions {
525 sorted_partitions
526 .into_iter()
527 .map(|(_, files)| FileGroup::new(files))
528 .collect()
529 } else {
530 let mut target_groups = vec![vec![]; max_target_partitions];
534
535 for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() {
536 let bucket = idx % max_target_partitions;
537 target_groups[bucket].extend(files);
538 }
539
540 target_groups.into_iter().map(FileGroup::new).collect()
541 }
542 }
543}
544
545impl Index<usize> for FileGroup {
546 type Output = PartitionedFile;
547
548 fn index(&self, index: usize) -> &Self::Output {
549 &self.files[index]
550 }
551}
552
553impl IndexMut<usize> for FileGroup {
554 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
555 &mut self.files[index]
556 }
557}
558
559impl FromIterator<PartitionedFile> for FileGroup {
560 fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
561 let files = iter.into_iter().collect();
562 FileGroup::new(files)
563 }
564}
565
566impl From<Vec<PartitionedFile>> for FileGroup {
567 fn from(files: Vec<PartitionedFile>) -> Self {
568 FileGroup::new(files)
569 }
570}
571
572impl Default for FileGroup {
573 fn default() -> Self {
574 Self::new(Vec::new())
575 }
576}
577
578#[derive(Debug, Clone)]
580struct ToRepartition {
581 source_index: usize,
583 file_size: u64,
585 new_groups: Vec<usize>,
587}
588
589impl ToRepartition {
590 fn range_size(&self) -> u64 {
592 self.file_size / (self.new_groups.len() as u64)
593 }
594}
595
596struct CompareByRangeSize(ToRepartition);
597impl CompareByRangeSize {
598 fn into_inner(self) -> ToRepartition {
599 self.0
600 }
601}
602impl Ord for CompareByRangeSize {
603 fn cmp(&self, other: &Self) -> Ordering {
604 self.0.range_size().cmp(&other.0.range_size())
605 }
606}
607impl PartialOrd for CompareByRangeSize {
608 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
609 Some(self.cmp(other))
610 }
611}
612impl PartialEq for CompareByRangeSize {
613 fn eq(&self, other: &Self) -> bool {
614 self.cmp(other) == Ordering::Equal
616 }
617}
618impl Eq for CompareByRangeSize {}
619impl Deref for CompareByRangeSize {
620 type Target = ToRepartition;
621 fn deref(&self) -> &Self::Target {
622 &self.0
623 }
624}
625impl DerefMut for CompareByRangeSize {
626 fn deref_mut(&mut self) -> &mut Self::Target {
627 &mut self.0
628 }
629}
630
631#[cfg(test)]
632mod test {
633 use super::*;
634 use datafusion_common::ScalarValue;
635
636 #[test]
638 fn repartition_empty_file_only() {
639 let partitioned_file_empty = pfile("empty", 0);
640 let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
641
642 let partitioned_files = FileGroupPartitioner::new()
643 .with_target_partitions(4)
644 .with_repartition_file_min_size(0)
645 .repartition_file_groups(&file_group);
646
647 assert_partitioned_files(None, partitioned_files);
648 }
649
650 #[test]
652 fn repartition_empty_files() {
653 let pfile_a = pfile("a", 10);
654 let pfile_b = pfile("b", 10);
655 let pfile_empty = pfile("empty", 0);
656
657 let empty_first = vec![
658 FileGroup::new(vec![pfile_empty.clone()]),
659 FileGroup::new(vec![pfile_a.clone()]),
660 FileGroup::new(vec![pfile_b.clone()]),
661 ];
662 let empty_middle = vec![
663 FileGroup::new(vec![pfile_a.clone()]),
664 FileGroup::new(vec![pfile_empty.clone()]),
665 FileGroup::new(vec![pfile_b.clone()]),
666 ];
667 let empty_last = vec![
668 FileGroup::new(vec![pfile_a]),
669 FileGroup::new(vec![pfile_b]),
670 FileGroup::new(vec![pfile_empty]),
671 ];
672
673 let expected_2 = vec![
675 FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
676 FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
677 ];
678 let expected_3 = vec![
679 FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
680 FileGroup::new(vec![
681 pfile("a", 10).with_range(7, 10),
682 pfile("b", 10).with_range(0, 4),
683 ]),
684 FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
685 ];
686
687 let file_groups_tests = [empty_first, empty_middle, empty_last];
688
689 for fg in file_groups_tests {
690 let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
691 for (n_partition, expected) in all_expected {
692 let actual = FileGroupPartitioner::new()
693 .with_target_partitions(n_partition)
694 .with_repartition_file_min_size(10)
695 .repartition_file_groups(&fg);
696
697 assert_partitioned_files(Some(expected), actual);
698 }
699 }
700 }
701
702 #[test]
703 fn repartition_single_file() {
704 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
706
707 let actual = FileGroupPartitioner::new()
708 .with_target_partitions(4)
709 .with_repartition_file_min_size(10)
710 .repartition_file_groups(&single_partition);
711
712 let expected = Some(vec![
713 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
714 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
715 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
716 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
717 ]);
718 assert_partitioned_files(expected, actual);
719 }
720
721 #[test]
722 fn repartition_single_file_with_range() {
723 let single_partition =
725 vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
726
727 let actual = FileGroupPartitioner::new()
728 .with_target_partitions(4)
729 .with_repartition_file_min_size(10)
730 .repartition_file_groups(&single_partition);
731
732 let expected = Some(vec![
733 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
734 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
735 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
736 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
737 ]);
738 assert_partitioned_files(expected, actual);
739 }
740
741 #[test]
742 fn repartition_single_file_with_incomplete_range() {
743 let single_partition =
745 vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
746
747 let actual = FileGroupPartitioner::new()
748 .with_target_partitions(4)
749 .with_repartition_file_min_size(10)
750 .repartition_file_groups(&single_partition);
751
752 let expected = Some(vec![
753 FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
754 FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
755 FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
756 FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
757 ]);
758 assert_partitioned_files(expected, actual);
759 }
760
761 #[test]
762 fn repartition_single_file_duplicated_with_range() {
763 let single_partition = vec![FileGroup::new(vec![
765 pfile("a", 100).with_range(0, 50),
766 pfile("a", 100).with_range(50, 100),
767 ])];
768
769 let actual = FileGroupPartitioner::new()
770 .with_target_partitions(4)
771 .with_repartition_file_min_size(10)
772 .repartition_file_groups(&single_partition);
773
774 let expected = Some(vec![
775 FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
776 FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
777 FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
778 FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
779 ]);
780 assert_partitioned_files(expected, actual);
781 }
782
783 #[test]
784 fn repartition_too_much_partitions() {
785 let partitioned_file = pfile("a", 8);
787 let single_partition = vec![FileGroup::new(vec![partitioned_file])];
788
789 let actual = FileGroupPartitioner::new()
790 .with_target_partitions(96)
791 .with_repartition_file_min_size(5)
792 .repartition_file_groups(&single_partition);
793
794 let expected = Some(vec![
795 FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
796 FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
797 FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
798 FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
799 FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
800 FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
801 FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
802 FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
803 ]);
804
805 assert_partitioned_files(expected, actual);
806 }
807
808 #[test]
809 fn repartition_multiple_partitions() {
810 let source_partitions = vec![
812 FileGroup::new(vec![pfile("a", 40)]),
813 FileGroup::new(vec![pfile("b", 60)]),
814 ];
815
816 let actual = FileGroupPartitioner::new()
817 .with_target_partitions(3)
818 .with_repartition_file_min_size(10)
819 .repartition_file_groups(&source_partitions);
820
821 let expected = Some(vec![
822 FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
823 FileGroup::new(vec![
824 pfile("a", 40).with_range(34, 40),
825 pfile("b", 60).with_range(0, 28),
826 ]),
827 FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
828 ]);
829 assert_partitioned_files(expected, actual);
830 }
831
832 #[test]
833 fn repartition_same_num_partitions() {
834 let source_partitions = vec![
836 FileGroup::new(vec![pfile("a", 40)]),
837 FileGroup::new(vec![pfile("b", 60)]),
838 ];
839
840 let actual = FileGroupPartitioner::new()
841 .with_target_partitions(2)
842 .with_repartition_file_min_size(10)
843 .repartition_file_groups(&source_partitions);
844
845 let expected = Some(vec![
846 FileGroup::new(vec![
847 pfile("a", 40).with_range(0, 40),
848 pfile("b", 60).with_range(0, 10),
849 ]),
850 FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
851 ]);
852 assert_partitioned_files(expected, actual);
853 }
854
855 #[test]
856 fn repartition_no_action_min_size() {
857 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
859
860 let actual = FileGroupPartitioner::new()
861 .with_target_partitions(65)
862 .with_repartition_file_min_size(500)
863 .repartition_file_groups(&single_partition);
864
865 assert_partitioned_files(None, actual)
866 }
867
868 #[test]
869 fn repartition_no_action_zero_files() {
870 let empty_partition = vec![];
872
873 let partitioner = FileGroupPartitioner::new()
874 .with_target_partitions(65)
875 .with_repartition_file_min_size(500);
876
877 assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
878 }
879
880 #[test]
881 fn repartition_ordered_no_action_too_few_partitions() {
882 let input_partitions = vec![
884 FileGroup::new(vec![pfile("a", 100)]),
885 FileGroup::new(vec![pfile("b", 200)]),
886 ];
887
888 let actual = FileGroupPartitioner::new()
889 .with_preserve_order_within_groups(true)
890 .with_target_partitions(2)
891 .with_repartition_file_min_size(10)
892 .repartition_file_groups(&input_partitions);
893
894 assert_partitioned_files(None, actual)
895 }
896
897 #[test]
898 fn repartition_ordered_no_action_file_too_small() {
899 let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
901
902 let actual = FileGroupPartitioner::new()
903 .with_preserve_order_within_groups(true)
904 .with_target_partitions(2)
905 .with_repartition_file_min_size(1000)
907 .repartition_file_groups(&single_partition);
908
909 assert_partitioned_files(None, actual)
910 }
911
912 #[test]
913 fn repartition_ordered_one_large_file() {
914 let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
916
917 let actual = FileGroupPartitioner::new()
918 .with_preserve_order_within_groups(true)
919 .with_target_partitions(3)
920 .with_repartition_file_min_size(10)
921 .repartition_file_groups(&source_partitions);
922
923 let expected = Some(vec![
924 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
925 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
926 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
927 ]);
928 assert_partitioned_files(expected, actual);
929 }
930
931 #[test]
932 fn repartition_ordered_one_large_file_with_range() {
933 let source_partitions =
935 vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
936
937 let actual = FileGroupPartitioner::new()
938 .with_preserve_order_within_groups(true)
939 .with_target_partitions(3)
940 .with_repartition_file_min_size(10)
941 .repartition_file_groups(&source_partitions);
942
943 let expected = Some(vec![
944 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
945 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
946 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
947 ]);
948 assert_partitioned_files(expected, actual);
949 }
950
951 #[test]
952 fn repartition_ordered_one_large_one_small_file() {
953 let source_partitions = vec![
956 FileGroup::new(vec![pfile("a", 100)]),
957 FileGroup::new(vec![pfile("b", 30)]),
958 ];
959
960 let actual = FileGroupPartitioner::new()
961 .with_preserve_order_within_groups(true)
962 .with_target_partitions(4)
963 .with_repartition_file_min_size(10)
964 .repartition_file_groups(&source_partitions);
965
966 let expected = Some(vec![
967 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
969 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
971 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
973 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
975 ]);
976 assert_partitioned_files(expected, actual);
977 }
978
979 #[test]
980 fn repartition_ordered_one_large_one_small_file_with_full_range() {
981 let source_partitions = vec![
984 FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
985 FileGroup::new(vec![pfile("b", 30)]),
986 ];
987
988 let actual = FileGroupPartitioner::new()
989 .with_preserve_order_within_groups(true)
990 .with_target_partitions(4)
991 .with_repartition_file_min_size(10)
992 .repartition_file_groups(&source_partitions);
993
994 let expected = Some(vec![
995 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
997 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
999 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1001 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1003 ]);
1004 assert_partitioned_files(expected, actual);
1005 }
1006
1007 #[test]
1008 fn repartition_ordered_one_large_one_small_file_with_split_range() {
1009 let source_partitions = vec![
1012 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1013 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1014 FileGroup::new(vec![pfile("b", 30)]),
1015 ];
1016
1017 let actual = FileGroupPartitioner::new()
1018 .with_preserve_order_within_groups(true)
1019 .with_target_partitions(4)
1020 .with_repartition_file_min_size(10)
1021 .repartition_file_groups(&source_partitions);
1022
1023 let expected = Some(vec![
1024 FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
1026 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1028 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
1030 FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
1032 ]);
1033 assert_partitioned_files(expected, actual);
1034 }
1035
1036 #[test]
1037 fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
1038 let source_partitions = vec![
1041 FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
1042 FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1043 ];
1044
1045 let actual = FileGroupPartitioner::new()
1046 .with_preserve_order_within_groups(true)
1047 .with_target_partitions(4)
1048 .with_repartition_file_min_size(10)
1049 .repartition_file_groups(&source_partitions);
1050
1051 let expected = Some(vec![
1052 FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
1054 FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
1056 FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
1058 FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
1060 ]);
1061 assert_partitioned_files(expected, actual);
1062 }
1063
1064 #[test]
1065 fn repartition_ordered_two_large_files() {
1066 let source_partitions = vec![
1068 FileGroup::new(vec![pfile("a", 100)]),
1069 FileGroup::new(vec![pfile("b", 100)]),
1070 ];
1071
1072 let actual = FileGroupPartitioner::new()
1073 .with_preserve_order_within_groups(true)
1074 .with_target_partitions(4)
1075 .with_repartition_file_min_size(10)
1076 .repartition_file_groups(&source_partitions);
1077
1078 let expected = Some(vec![
1079 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1081 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1083 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1085 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1087 ]);
1088 assert_partitioned_files(expected, actual);
1089 }
1090
1091 #[test]
1092 fn repartition_ordered_two_large_one_small_files() {
1093 let source_partitions = vec![
1095 FileGroup::new(vec![pfile("a", 100)]),
1096 FileGroup::new(vec![pfile("b", 100)]),
1097 FileGroup::new(vec![pfile("c", 30)]),
1098 ];
1099
1100 let partitioner = FileGroupPartitioner::new()
1101 .with_preserve_order_within_groups(true)
1102 .with_repartition_file_min_size(10);
1103
1104 let actual = partitioner
1106 .with_target_partitions(4)
1107 .repartition_file_groups(&source_partitions);
1108
1109 let expected = Some(vec![
1110 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1112 FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
1114 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1116 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1118 ]);
1119 assert_partitioned_files(expected, actual);
1120
1121 let actual = partitioner
1123 .with_target_partitions(5)
1124 .repartition_file_groups(&source_partitions);
1125
1126 let expected = Some(vec![
1127 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
1129 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
1131 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
1133 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
1135 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
1137 ]);
1138 assert_partitioned_files(expected, actual);
1139 }
1140
1141 #[test]
1142 fn repartition_ordered_one_large_one_small_existing_empty() {
1143 let source_partitions = vec![
1145 FileGroup::new(vec![pfile("a", 100)]),
1146 FileGroup::default(),
1147 FileGroup::new(vec![pfile("b", 40)]),
1148 FileGroup::default(),
1149 ];
1150
1151 let actual = FileGroupPartitioner::new()
1152 .with_preserve_order_within_groups(true)
1153 .with_target_partitions(5)
1154 .with_repartition_file_min_size(10)
1155 .repartition_file_groups(&source_partitions);
1156
1157 let expected = Some(vec![
1160 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
1162 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
1163 FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
1165 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
1167 FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
1169 ]);
1170 assert_partitioned_files(expected, actual);
1171 }
1172 #[test]
1173 fn repartition_ordered_existing_group_multiple_files() {
1174 let source_partitions = vec![
1176 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1178 FileGroup::new(vec![pfile("c", 40)]),
1179 ];
1180
1181 let actual = FileGroupPartitioner::new()
1182 .with_preserve_order_within_groups(true)
1183 .with_target_partitions(3)
1184 .with_repartition_file_min_size(10)
1185 .repartition_file_groups(&source_partitions);
1186
1187 let expected = Some(vec![
1190 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
1194 FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
1196 FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
1198 ]);
1199 assert_partitioned_files(expected, actual);
1200 }
1201
1202 fn assert_partitioned_files(
1205 expected: Option<Vec<FileGroup>>,
1206 actual: Option<Vec<FileGroup>>,
1207 ) {
1208 match (expected, actual) {
1209 (None, None) => {}
1210 (Some(_), None) => panic!("Expected Some, got None"),
1211 (None, Some(_)) => panic!("Expected None, got Some"),
1212 (Some(expected), Some(actual)) => {
1213 let expected_string = format!("{expected:#?}");
1214 let actual_string = format!("{actual:#?}");
1215 assert_eq!(expected_string, actual_string);
1216 }
1217 }
1218 }
1219
1220 fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
1222 PartitionedFile::new(path, file_size)
1223 }
1224
1225 fn pfile_with_pv(path: &str, pv: &str) -> PartitionedFile {
1227 let mut file = pfile(path, 10);
1228 file.partition_values = vec![ScalarValue::from(pv)];
1229 file
1230 }
1231
1232 fn repartition_test(
1235 partitioner: FileGroupPartitioner,
1236 file_groups: Vec<FileGroup>,
1237 ) -> Option<Vec<FileGroup>> {
1238 let repartitioned = partitioner.repartition_file_groups(&file_groups);
1239
1240 let repartitioned_preserving_sort = partitioner
1241 .with_preserve_order_within_groups(true)
1242 .repartition_file_groups(&file_groups);
1243
1244 assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1245 repartitioned
1246 }
1247
1248 #[test]
1249 fn test_group_by_partition_values_edge_cases() {
1250 assert!(FileGroup::default().group_by_partition_values(4).is_empty());
1252 assert!(
1253 FileGroup::new(vec![pfile("a", 100)])
1254 .group_by_partition_values(0)
1255 .is_empty()
1256 );
1257 }
1258
1259 #[test]
1260 fn test_group_by_partition_values_less_groups_than_target() {
1261 let fg = FileGroup::new(vec![
1266 pfile_with_pv("a", "p1"),
1267 pfile_with_pv("b", "p1"),
1268 pfile_with_pv("c", "p2"),
1269 ]);
1270 let groups = fg.group_by_partition_values(4);
1271 assert_eq!(groups.len(), 2);
1272 assert_eq!(groups[0].len(), 2);
1273 assert_eq!(groups[1].len(), 1);
1274 }
1275
1276 #[test]
1277 fn test_group_by_partition_values_more_groups_than_target() {
1278 let fg = FileGroup::new(vec![
1281 pfile_with_pv("a", "p1"),
1282 pfile_with_pv("b", "p2"),
1283 pfile_with_pv("c", "p3"),
1284 pfile_with_pv("d", "p4"),
1285 pfile_with_pv("e", "p5"),
1286 ]);
1287 let groups = fg.group_by_partition_values(3);
1288 assert_eq!(groups.len(), 3);
1289 assert_eq!(groups[0].len(), 2);
1290 assert_eq!(groups[1].len(), 2);
1291 assert_eq!(groups[2].len(), 1);
1292 }
1293}