1use crate::{FileRange, PartitionedFile};
21use datafusion_common::Statistics;
22use itertools::Itertools;
23use std::cmp::{min, Ordering};
24use std::collections::BinaryHeap;
25use std::iter::repeat_with;
26use std::mem;
27use std::ops::{Deref, DerefMut, Index, IndexMut};
28use std::sync::Arc;
29
30#[derive(Debug, Clone, Copy)]
129pub struct FileGroupPartitioner {
130 target_partitions: usize,
132 repartition_file_min_size: usize,
134 preserve_order_within_groups: bool,
136}
137
138impl Default for FileGroupPartitioner {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl FileGroupPartitioner {
145 pub fn new() -> Self {
150 Self {
151 target_partitions: 1,
152 repartition_file_min_size: 10 * 1024 * 1024,
153 preserve_order_within_groups: false,
154 }
155 }
156
157 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
159 self.target_partitions = target_partitions;
160 self
161 }
162
163 pub fn with_repartition_file_min_size(
165 mut self,
166 repartition_file_min_size: usize,
167 ) -> Self {
168 self.repartition_file_min_size = repartition_file_min_size;
169 self
170 }
171
172 pub fn with_preserve_order_within_groups(
174 mut self,
175 preserve_order_within_groups: bool,
176 ) -> Self {
177 self.preserve_order_within_groups = preserve_order_within_groups;
178 self
179 }
180
181 pub fn repartition_file_groups(
185 &self,
186 file_groups: &[FileGroup],
187 ) -> Option<Vec<FileGroup>> {
188 if file_groups.is_empty() {
189 return None;
190 }
191
192 let has_ranges = file_groups
194 .iter()
195 .flat_map(FileGroup::iter)
196 .any(|f| f.range.is_some());
197 if has_ranges {
198 return None;
199 }
200
201 if self.preserve_order_within_groups {
203 self.repartition_preserving_order(file_groups)
204 } else {
205 self.repartition_evenly_by_size(file_groups)
206 }
207 }
208
209 fn repartition_evenly_by_size(
212 &self,
213 file_groups: &[FileGroup],
214 ) -> Option<Vec<FileGroup>> {
215 let target_partitions = self.target_partitions;
216 let repartition_file_min_size = self.repartition_file_min_size;
217 let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
218
219 let total_size = flattened_files
220 .iter()
221 .map(|f| f.object_meta.size as i64)
222 .sum::<i64>();
223 if total_size < (repartition_file_min_size as i64) || total_size == 0 {
224 return None;
225 }
226
227 let target_partition_size =
228 (total_size as u64).div_ceil(target_partitions as u64);
229
230 let current_partition_index: usize = 0;
231 let current_partition_size: u64 = 0;
232
233 let repartitioned_files = flattened_files
235 .into_iter()
236 .scan(
237 (current_partition_index, current_partition_size),
238 |state, source_file| {
239 let mut produced_files = vec![];
240 let mut range_start = 0;
241 while range_start < source_file.object_meta.size {
242 let range_end = min(
243 range_start + (target_partition_size - state.1),
244 source_file.object_meta.size,
245 );
246
247 let mut produced_file = source_file.clone();
248 produced_file.range = Some(FileRange {
249 start: range_start as i64,
250 end: range_end as i64,
251 });
252 produced_files.push((state.0, produced_file));
253
254 if state.1 + (range_end - range_start) >= target_partition_size {
255 state.0 += 1;
256 state.1 = 0;
257 } else {
258 state.1 += range_end - range_start;
259 }
260 range_start = range_end;
261 }
262 Some(produced_files)
263 },
264 )
265 .flatten()
266 .chunk_by(|(partition_idx, _)| *partition_idx)
267 .into_iter()
268 .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
269 .collect_vec();
270
271 Some(repartitioned_files)
272 }
273
274 fn repartition_preserving_order(
276 &self,
277 file_groups: &[FileGroup],
278 ) -> Option<Vec<FileGroup>> {
279 if file_groups.len() >= self.target_partitions {
282 return None;
283 }
284 let num_new_groups = self.target_partitions - file_groups.len();
285
286 if file_groups.len() == 1 && file_groups[0].len() == 1 {
288 return self.repartition_evenly_by_size(file_groups);
289 }
290
291 let mut heap: BinaryHeap<_> = file_groups
293 .iter()
294 .enumerate()
295 .filter_map(|(group_index, group)| {
296 if group.len() == 1 {
298 Some(ToRepartition {
299 source_index: group_index,
300 file_size: group[0].object_meta.size,
301 new_groups: vec![group_index],
302 })
303 } else {
304 None
305 }
306 })
307 .map(CompareByRangeSize)
308 .collect();
309
310 if heap.is_empty() {
312 return None;
313 }
314
315 let mut file_groups: Vec<_> = file_groups
318 .iter()
319 .cloned()
320 .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
321 .collect();
322
323 for (group_index, group) in file_groups.iter().enumerate() {
325 if !group.is_empty() {
326 continue;
327 }
328 let mut largest_group = heap.pop().unwrap();
330 largest_group.new_groups.push(group_index);
331 heap.push(largest_group);
332 }
333
334 while let Some(to_repartition) = heap.pop() {
336 let range_size = to_repartition.range_size() as i64;
337 let ToRepartition {
338 source_index,
339 file_size,
340 new_groups,
341 } = to_repartition.into_inner();
342 assert_eq!(file_groups[source_index].len(), 1);
343 let original_file = file_groups[source_index].pop().unwrap();
344
345 let last_group = new_groups.len() - 1;
346 let mut range_start: i64 = 0;
347 let mut range_end: i64 = range_size;
348 for (i, group_index) in new_groups.into_iter().enumerate() {
349 let target_group = &mut file_groups[group_index];
350 assert!(target_group.is_empty());
351
352 if i == last_group {
354 range_end = file_size as i64;
355 }
356 target_group
357 .push(original_file.clone().with_range(range_start, range_end));
358 range_start = range_end;
359 range_end += range_size;
360 }
361 }
362
363 Some(file_groups)
364 }
365}
366
367#[derive(Debug, Clone)]
370pub struct FileGroup {
371 files: Vec<PartitionedFile>,
373 statistics: Option<Arc<Statistics>>,
375}
376
377impl FileGroup {
378 pub fn new(files: Vec<PartitionedFile>) -> Self {
380 Self {
381 files,
382 statistics: None,
383 }
384 }
385
386 pub fn len(&self) -> usize {
388 self.files.len()
389 }
390
391 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
393 self.statistics = Some(statistics);
394 self
395 }
396
397 pub fn files(&self) -> &[PartitionedFile] {
399 &self.files
400 }
401
402 pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
403 self.files.iter()
404 }
405
406 pub fn into_inner(self) -> Vec<PartitionedFile> {
407 self.files
408 }
409
410 pub fn is_empty(&self) -> bool {
411 self.files.is_empty()
412 }
413
414 pub fn pop(&mut self) -> Option<PartitionedFile> {
416 self.files.pop()
417 }
418
419 pub fn push(&mut self, file: PartitionedFile) {
421 self.files.push(file);
422 }
423
424 pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
427 if let Some(index) = index {
428 self.files.get(index).and_then(|f| f.statistics.as_deref())
429 } else {
430 self.statistics.as_deref()
431 }
432 }
433
434 pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
436 self.statistics.as_mut().map(Arc::make_mut)
437 }
438
439 pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
441 if self.is_empty() {
442 return vec![];
443 }
444
445 self.files.sort_by(|a, b| a.path().cmp(b.path()));
449
450 let chunk_size = self.len().div_ceil(n);
452 let mut chunks = Vec::with_capacity(n);
453 let mut current_chunk = Vec::with_capacity(chunk_size);
454 for file in self.files.drain(..) {
455 current_chunk.push(file);
456 if current_chunk.len() == chunk_size {
457 let full_chunk = FileGroup::new(mem::replace(
458 &mut current_chunk,
459 Vec::with_capacity(chunk_size),
460 ));
461 chunks.push(full_chunk);
462 }
463 }
464
465 if !current_chunk.is_empty() {
466 chunks.push(FileGroup::new(current_chunk))
467 }
468
469 chunks
470 }
471}
472
473impl Index<usize> for FileGroup {
474 type Output = PartitionedFile;
475
476 fn index(&self, index: usize) -> &Self::Output {
477 &self.files[index]
478 }
479}
480
481impl IndexMut<usize> for FileGroup {
482 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
483 &mut self.files[index]
484 }
485}
486
487impl FromIterator<PartitionedFile> for FileGroup {
488 fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
489 let files = iter.into_iter().collect();
490 FileGroup::new(files)
491 }
492}
493
494impl From<Vec<PartitionedFile>> for FileGroup {
495 fn from(files: Vec<PartitionedFile>) -> Self {
496 FileGroup::new(files)
497 }
498}
499
500impl Default for FileGroup {
501 fn default() -> Self {
502 Self::new(Vec::new())
503 }
504}
505
506#[derive(Debug, Clone)]
508struct ToRepartition {
509 source_index: usize,
511 file_size: u64,
513 new_groups: Vec<usize>,
515}
516
517impl ToRepartition {
518 fn range_size(&self) -> u64 {
520 self.file_size / (self.new_groups.len() as u64)
521 }
522}
523
524struct CompareByRangeSize(ToRepartition);
525impl CompareByRangeSize {
526 fn into_inner(self) -> ToRepartition {
527 self.0
528 }
529}
530impl Ord for CompareByRangeSize {
531 fn cmp(&self, other: &Self) -> Ordering {
532 self.0.range_size().cmp(&other.0.range_size())
533 }
534}
535impl PartialOrd for CompareByRangeSize {
536 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
537 Some(self.cmp(other))
538 }
539}
540impl PartialEq for CompareByRangeSize {
541 fn eq(&self, other: &Self) -> bool {
542 self.cmp(other) == Ordering::Equal
544 }
545}
546impl Eq for CompareByRangeSize {}
547impl Deref for CompareByRangeSize {
548 type Target = ToRepartition;
549 fn deref(&self) -> &Self::Target {
550 &self.0
551 }
552}
553impl DerefMut for CompareByRangeSize {
554 fn deref_mut(&mut self) -> &mut Self::Target {
555 &mut self.0
556 }
557}
558
559#[cfg(test)]
560mod test {
561 use super::*;
562
563 #[test]
565 fn repartition_empty_file_only() {
566 let partitioned_file_empty = pfile("empty", 0);
567 let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
568
569 let partitioned_files = FileGroupPartitioner::new()
570 .with_target_partitions(4)
571 .with_repartition_file_min_size(0)
572 .repartition_file_groups(&file_group);
573
574 assert_partitioned_files(None, partitioned_files);
575 }
576
577 #[test]
579 fn repartition_empty_files() {
580 let pfile_a = pfile("a", 10);
581 let pfile_b = pfile("b", 10);
582 let pfile_empty = pfile("empty", 0);
583
584 let empty_first = vec![
585 FileGroup::new(vec![pfile_empty.clone()]),
586 FileGroup::new(vec![pfile_a.clone()]),
587 FileGroup::new(vec![pfile_b.clone()]),
588 ];
589 let empty_middle = vec![
590 FileGroup::new(vec![pfile_a.clone()]),
591 FileGroup::new(vec![pfile_empty.clone()]),
592 FileGroup::new(vec![pfile_b.clone()]),
593 ];
594 let empty_last = vec![
595 FileGroup::new(vec![pfile_a]),
596 FileGroup::new(vec![pfile_b]),
597 FileGroup::new(vec![pfile_empty]),
598 ];
599
600 let expected_2 = vec![
602 FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
603 FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
604 ];
605 let expected_3 = vec![
606 FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
607 FileGroup::new(vec![
608 pfile("a", 10).with_range(7, 10),
609 pfile("b", 10).with_range(0, 4),
610 ]),
611 FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
612 ];
613
614 let file_groups_tests = [empty_first, empty_middle, empty_last];
615
616 for fg in file_groups_tests {
617 let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
618 for (n_partition, expected) in all_expected {
619 let actual = FileGroupPartitioner::new()
620 .with_target_partitions(n_partition)
621 .with_repartition_file_min_size(10)
622 .repartition_file_groups(&fg);
623
624 assert_partitioned_files(Some(expected), actual);
625 }
626 }
627 }
628
629 #[test]
630 fn repartition_single_file() {
631 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
633
634 let actual = FileGroupPartitioner::new()
635 .with_target_partitions(4)
636 .with_repartition_file_min_size(10)
637 .repartition_file_groups(&single_partition);
638
639 let expected = Some(vec![
640 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
641 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
642 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
643 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
644 ]);
645 assert_partitioned_files(expected, actual);
646 }
647
648 #[test]
649 fn repartition_too_much_partitions() {
650 let partitioned_file = pfile("a", 8);
652 let single_partition = vec![FileGroup::new(vec![partitioned_file])];
653
654 let actual = FileGroupPartitioner::new()
655 .with_target_partitions(96)
656 .with_repartition_file_min_size(5)
657 .repartition_file_groups(&single_partition);
658
659 let expected = Some(vec![
660 FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
661 FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
662 FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
663 FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
664 FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
665 FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
666 FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
667 FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
668 ]);
669
670 assert_partitioned_files(expected, actual);
671 }
672
673 #[test]
674 fn repartition_multiple_partitions() {
675 let source_partitions = vec![
677 FileGroup::new(vec![pfile("a", 40)]),
678 FileGroup::new(vec![pfile("b", 60)]),
679 ];
680
681 let actual = FileGroupPartitioner::new()
682 .with_target_partitions(3)
683 .with_repartition_file_min_size(10)
684 .repartition_file_groups(&source_partitions);
685
686 let expected = Some(vec![
687 FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
688 FileGroup::new(vec![
689 pfile("a", 40).with_range(34, 40),
690 pfile("b", 60).with_range(0, 28),
691 ]),
692 FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
693 ]);
694 assert_partitioned_files(expected, actual);
695 }
696
697 #[test]
698 fn repartition_same_num_partitions() {
699 let source_partitions = vec![
701 FileGroup::new(vec![pfile("a", 40)]),
702 FileGroup::new(vec![pfile("b", 60)]),
703 ];
704
705 let actual = FileGroupPartitioner::new()
706 .with_target_partitions(2)
707 .with_repartition_file_min_size(10)
708 .repartition_file_groups(&source_partitions);
709
710 let expected = Some(vec![
711 FileGroup::new(vec![
712 pfile("a", 40).with_range(0, 40),
713 pfile("b", 60).with_range(0, 10),
714 ]),
715 FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
716 ]);
717 assert_partitioned_files(expected, actual);
718 }
719
720 #[test]
721 fn repartition_no_action_ranges() {
722 let source_partitions = vec![
724 FileGroup::new(vec![pfile("a", 123)]),
725 FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
726 ];
727
728 let actual = FileGroupPartitioner::new()
729 .with_target_partitions(65)
730 .with_repartition_file_min_size(10)
731 .repartition_file_groups(&source_partitions);
732
733 assert_partitioned_files(None, actual)
734 }
735
736 #[test]
737 fn repartition_no_action_min_size() {
738 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
740
741 let actual = FileGroupPartitioner::new()
742 .with_target_partitions(65)
743 .with_repartition_file_min_size(500)
744 .repartition_file_groups(&single_partition);
745
746 assert_partitioned_files(None, actual)
747 }
748
749 #[test]
750 fn repartition_no_action_zero_files() {
751 let empty_partition = vec![];
753
754 let partitioner = FileGroupPartitioner::new()
755 .with_target_partitions(65)
756 .with_repartition_file_min_size(500);
757
758 assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
759 }
760
761 #[test]
762 fn repartition_ordered_no_action_too_few_partitions() {
763 let input_partitions = vec![
765 FileGroup::new(vec![pfile("a", 100)]),
766 FileGroup::new(vec![pfile("b", 200)]),
767 ];
768
769 let actual = FileGroupPartitioner::new()
770 .with_preserve_order_within_groups(true)
771 .with_target_partitions(2)
772 .with_repartition_file_min_size(10)
773 .repartition_file_groups(&input_partitions);
774
775 assert_partitioned_files(None, actual)
776 }
777
778 #[test]
779 fn repartition_ordered_no_action_file_too_small() {
780 let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
782
783 let actual = FileGroupPartitioner::new()
784 .with_preserve_order_within_groups(true)
785 .with_target_partitions(2)
786 .with_repartition_file_min_size(1000)
788 .repartition_file_groups(&single_partition);
789
790 assert_partitioned_files(None, actual)
791 }
792
793 #[test]
794 fn repartition_ordered_one_large_file() {
795 let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
797
798 let actual = FileGroupPartitioner::new()
799 .with_preserve_order_within_groups(true)
800 .with_target_partitions(3)
801 .with_repartition_file_min_size(10)
802 .repartition_file_groups(&source_partitions);
803
804 let expected = Some(vec![
805 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
806 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
807 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
808 ]);
809 assert_partitioned_files(expected, actual);
810 }
811
812 #[test]
813 fn repartition_ordered_one_large_one_small_file() {
814 let source_partitions = vec![
817 FileGroup::new(vec![pfile("a", 100)]),
818 FileGroup::new(vec![pfile("b", 30)]),
819 ];
820
821 let actual = FileGroupPartitioner::new()
822 .with_preserve_order_within_groups(true)
823 .with_target_partitions(4)
824 .with_repartition_file_min_size(10)
825 .repartition_file_groups(&source_partitions);
826
827 let expected = Some(vec![
828 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
830 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
832 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
834 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
836 ]);
837 assert_partitioned_files(expected, actual);
838 }
839
840 #[test]
841 fn repartition_ordered_two_large_files() {
842 let source_partitions = vec![
844 FileGroup::new(vec![pfile("a", 100)]),
845 FileGroup::new(vec![pfile("b", 100)]),
846 ];
847
848 let actual = FileGroupPartitioner::new()
849 .with_preserve_order_within_groups(true)
850 .with_target_partitions(4)
851 .with_repartition_file_min_size(10)
852 .repartition_file_groups(&source_partitions);
853
854 let expected = Some(vec![
855 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
857 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
859 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
861 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
863 ]);
864 assert_partitioned_files(expected, actual);
865 }
866
867 #[test]
868 fn repartition_ordered_two_large_one_small_files() {
869 let source_partitions = vec![
871 FileGroup::new(vec![pfile("a", 100)]),
872 FileGroup::new(vec![pfile("b", 100)]),
873 FileGroup::new(vec![pfile("c", 30)]),
874 ];
875
876 let partitioner = FileGroupPartitioner::new()
877 .with_preserve_order_within_groups(true)
878 .with_repartition_file_min_size(10);
879
880 let actual = partitioner
882 .with_target_partitions(4)
883 .repartition_file_groups(&source_partitions);
884
885 let expected = Some(vec![
886 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
888 FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
890 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
892 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
894 ]);
895 assert_partitioned_files(expected, actual);
896
897 let actual = partitioner
899 .with_target_partitions(5)
900 .repartition_file_groups(&source_partitions);
901
902 let expected = Some(vec![
903 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
905 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
907 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
909 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
911 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
913 ]);
914 assert_partitioned_files(expected, actual);
915 }
916
917 #[test]
918 fn repartition_ordered_one_large_one_small_existing_empty() {
919 let source_partitions = vec![
921 FileGroup::new(vec![pfile("a", 100)]),
922 FileGroup::default(),
923 FileGroup::new(vec![pfile("b", 40)]),
924 FileGroup::default(),
925 ];
926
927 let actual = FileGroupPartitioner::new()
928 .with_preserve_order_within_groups(true)
929 .with_target_partitions(5)
930 .with_repartition_file_min_size(10)
931 .repartition_file_groups(&source_partitions);
932
933 let expected = Some(vec![
936 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
938 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
939 FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
941 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
943 FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
945 ]);
946 assert_partitioned_files(expected, actual);
947 }
948 #[test]
949 fn repartition_ordered_existing_group_multiple_files() {
950 let source_partitions = vec![
952 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
954 FileGroup::new(vec![pfile("c", 40)]),
955 ];
956
957 let actual = FileGroupPartitioner::new()
958 .with_preserve_order_within_groups(true)
959 .with_target_partitions(3)
960 .with_repartition_file_min_size(10)
961 .repartition_file_groups(&source_partitions);
962
963 let expected = Some(vec![
966 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
970 FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
972 FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
974 ]);
975 assert_partitioned_files(expected, actual);
976 }
977
978 fn assert_partitioned_files(
981 expected: Option<Vec<FileGroup>>,
982 actual: Option<Vec<FileGroup>>,
983 ) {
984 match (expected, actual) {
985 (None, None) => {}
986 (Some(_), None) => panic!("Expected Some, got None"),
987 (None, Some(_)) => panic!("Expected None, got Some"),
988 (Some(expected), Some(actual)) => {
989 let expected_string = format!("{expected:#?}");
990 let actual_string = format!("{actual:#?}");
991 assert_eq!(expected_string, actual_string);
992 }
993 }
994 }
995
996 fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
998 PartitionedFile::new(path, file_size)
999 }
1000
1001 fn repartition_test(
1004 partitioner: FileGroupPartitioner,
1005 file_groups: Vec<FileGroup>,
1006 ) -> Option<Vec<FileGroup>> {
1007 let repartitioned = partitioner.repartition_file_groups(&file_groups);
1008
1009 let repartitioned_preserving_sort = partitioner
1010 .with_preserve_order_within_groups(true)
1011 .repartition_file_groups(&file_groups);
1012
1013 assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
1014 repartitioned
1015 }
1016}