1use crate::{FileRange, PartitionedFile};
21use datafusion_common::Statistics;
22use itertools::Itertools;
23use std::cmp::min;
24use std::collections::BinaryHeap;
25use std::iter::repeat_with;
26use std::mem;
27use std::ops::{Index, IndexMut};
28use std::sync::Arc;
29
30#[derive(Debug, Clone, Copy)]
129pub struct FileGroupPartitioner {
130 target_partitions: usize,
132 repartition_file_min_size: usize,
134 preserve_order_within_groups: bool,
136}
137
138impl Default for FileGroupPartitioner {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144impl FileGroupPartitioner {
145 pub fn new() -> Self {
150 Self {
151 target_partitions: 1,
152 repartition_file_min_size: 10 * 1024 * 1024,
153 preserve_order_within_groups: false,
154 }
155 }
156
157 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
159 self.target_partitions = target_partitions;
160 self
161 }
162
163 pub fn with_repartition_file_min_size(
165 mut self,
166 repartition_file_min_size: usize,
167 ) -> Self {
168 self.repartition_file_min_size = repartition_file_min_size;
169 self
170 }
171
172 pub fn with_preserve_order_within_groups(
174 mut self,
175 preserve_order_within_groups: bool,
176 ) -> Self {
177 self.preserve_order_within_groups = preserve_order_within_groups;
178 self
179 }
180
181 pub fn repartition_file_groups(
185 &self,
186 file_groups: &[FileGroup],
187 ) -> Option<Vec<FileGroup>> {
188 if file_groups.is_empty() {
189 return None;
190 }
191
192 let has_ranges = file_groups
194 .iter()
195 .flat_map(FileGroup::iter)
196 .any(|f| f.range.is_some());
197 if has_ranges {
198 return None;
199 }
200
201 if self.preserve_order_within_groups {
203 self.repartition_preserving_order(file_groups)
204 } else {
205 self.repartition_evenly_by_size(file_groups)
206 }
207 }
208
209 fn repartition_evenly_by_size(
212 &self,
213 file_groups: &[FileGroup],
214 ) -> Option<Vec<FileGroup>> {
215 let target_partitions = self.target_partitions;
216 let repartition_file_min_size = self.repartition_file_min_size;
217 let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
218
219 let total_size = flattened_files
220 .iter()
221 .map(|f| f.object_meta.size as i64)
222 .sum::<i64>();
223 if total_size < (repartition_file_min_size as i64) || total_size == 0 {
224 return None;
225 }
226
227 let target_partition_size =
228 (total_size as u64).div_ceil(target_partitions as u64);
229
230 let current_partition_index: usize = 0;
231 let current_partition_size: u64 = 0;
232
233 let repartitioned_files = flattened_files
235 .into_iter()
236 .scan(
237 (current_partition_index, current_partition_size),
238 |state, source_file| {
239 let mut produced_files = vec![];
240 let mut range_start = 0;
241 while range_start < source_file.object_meta.size {
242 let range_end = min(
243 range_start + (target_partition_size - state.1),
244 source_file.object_meta.size,
245 );
246
247 let mut produced_file = source_file.clone();
248 produced_file.range = Some(FileRange {
249 start: range_start as i64,
250 end: range_end as i64,
251 });
252 produced_files.push((state.0, produced_file));
253
254 if state.1 + (range_end - range_start) >= target_partition_size {
255 state.0 += 1;
256 state.1 = 0;
257 } else {
258 state.1 += range_end - range_start;
259 }
260 range_start = range_end;
261 }
262 Some(produced_files)
263 },
264 )
265 .flatten()
266 .chunk_by(|(partition_idx, _)| *partition_idx)
267 .into_iter()
268 .map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
269 .collect_vec();
270
271 Some(repartitioned_files)
272 }
273
274 fn repartition_preserving_order(
276 &self,
277 file_groups: &[FileGroup],
278 ) -> Option<Vec<FileGroup>> {
279 if file_groups.len() >= self.target_partitions {
282 return None;
283 }
284 let num_new_groups = self.target_partitions - file_groups.len();
285
286 if file_groups.len() == 1 && file_groups[0].len() == 1 {
288 return self.repartition_evenly_by_size(file_groups);
289 }
290
291 let mut heap: BinaryHeap<_> = file_groups
293 .iter()
294 .enumerate()
295 .filter_map(|(group_index, group)| {
296 if group.len() == 1 {
298 Some(ToRepartition {
299 source_index: group_index,
300 file_size: group[0].object_meta.size,
301 new_groups: vec![group_index],
302 })
303 } else {
304 None
305 }
306 })
307 .collect();
308
309 if heap.is_empty() {
311 return None;
312 }
313
314 let mut file_groups: Vec<_> = file_groups
317 .iter()
318 .cloned()
319 .chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
320 .collect();
321
322 for (group_index, group) in file_groups.iter().enumerate() {
324 if !group.is_empty() {
325 continue;
326 }
327 let mut largest_group = heap.pop().unwrap();
329 largest_group.new_groups.push(group_index);
330 heap.push(largest_group);
331 }
332
333 while let Some(to_repartition) = heap.pop() {
335 let range_size = to_repartition.range_size() as i64;
336 let ToRepartition {
337 source_index,
338 file_size,
339 new_groups,
340 } = to_repartition;
341 assert_eq!(file_groups[source_index].len(), 1);
342 let original_file = file_groups[source_index].pop().unwrap();
343
344 let last_group = new_groups.len() - 1;
345 let mut range_start: i64 = 0;
346 let mut range_end: i64 = range_size;
347 for (i, group_index) in new_groups.into_iter().enumerate() {
348 let target_group = &mut file_groups[group_index];
349 assert!(target_group.is_empty());
350
351 if i == last_group {
353 range_end = file_size as i64;
354 }
355 target_group
356 .push(original_file.clone().with_range(range_start, range_end));
357 range_start = range_end;
358 range_end += range_size;
359 }
360 }
361
362 Some(file_groups)
363 }
364}
365
366#[derive(Debug, Clone)]
369pub struct FileGroup {
370 files: Vec<PartitionedFile>,
372 statistics: Option<Arc<Statistics>>,
374}
375
376impl FileGroup {
377 pub fn new(files: Vec<PartitionedFile>) -> Self {
379 Self {
380 files,
381 statistics: None,
382 }
383 }
384
385 pub fn len(&self) -> usize {
387 self.files.len()
388 }
389
390 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
392 self.statistics = Some(statistics);
393 self
394 }
395
396 pub fn files(&self) -> &[PartitionedFile] {
398 &self.files
399 }
400
401 pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
402 self.files.iter()
403 }
404
405 pub fn into_inner(self) -> Vec<PartitionedFile> {
406 self.files
407 }
408
409 pub fn is_empty(&self) -> bool {
410 self.files.is_empty()
411 }
412
413 pub fn pop(&mut self) -> Option<PartitionedFile> {
415 self.files.pop()
416 }
417
418 pub fn push(&mut self, file: PartitionedFile) {
420 self.files.push(file);
421 }
422
423 pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
426 if let Some(index) = index {
427 self.files.get(index).and_then(|f| f.statistics.as_deref())
428 } else {
429 self.statistics.as_deref()
430 }
431 }
432
433 pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
435 self.statistics.as_mut().map(Arc::make_mut)
436 }
437
438 pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
440 if self.is_empty() {
441 return vec![];
442 }
443
444 self.files.sort_by(|a, b| a.path().cmp(b.path()));
448
449 let chunk_size = self.len().div_ceil(n);
451 let mut chunks = Vec::with_capacity(n);
452 let mut current_chunk = Vec::with_capacity(chunk_size);
453 for file in self.files.drain(..) {
454 current_chunk.push(file);
455 if current_chunk.len() == chunk_size {
456 let full_chunk = FileGroup::new(mem::replace(
457 &mut current_chunk,
458 Vec::with_capacity(chunk_size),
459 ));
460 chunks.push(full_chunk);
461 }
462 }
463
464 if !current_chunk.is_empty() {
465 chunks.push(FileGroup::new(current_chunk))
466 }
467
468 chunks
469 }
470}
471
472impl Index<usize> for FileGroup {
473 type Output = PartitionedFile;
474
475 fn index(&self, index: usize) -> &Self::Output {
476 &self.files[index]
477 }
478}
479
480impl IndexMut<usize> for FileGroup {
481 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
482 &mut self.files[index]
483 }
484}
485
486impl FromIterator<PartitionedFile> for FileGroup {
487 fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
488 let files = iter.into_iter().collect();
489 FileGroup::new(files)
490 }
491}
492
493impl From<Vec<PartitionedFile>> for FileGroup {
494 fn from(files: Vec<PartitionedFile>) -> Self {
495 FileGroup::new(files)
496 }
497}
498
499impl Default for FileGroup {
500 fn default() -> Self {
501 Self::new(Vec::new())
502 }
503}
504
505#[derive(Debug, Clone, PartialEq, Eq)]
507struct ToRepartition {
508 source_index: usize,
510 file_size: u64,
512 new_groups: Vec<usize>,
514}
515
516impl ToRepartition {
517 fn range_size(&self) -> u64 {
519 self.file_size / (self.new_groups.len() as u64)
520 }
521}
522
523impl PartialOrd for ToRepartition {
524 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
525 Some(self.cmp(other))
526 }
527}
528
529impl Ord for ToRepartition {
531 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
532 self.range_size().cmp(&other.range_size())
533 }
534}
535
536#[cfg(test)]
537mod test {
538 use super::*;
539
540 #[test]
542 fn repartition_empty_file_only() {
543 let partitioned_file_empty = pfile("empty", 0);
544 let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
545
546 let partitioned_files = FileGroupPartitioner::new()
547 .with_target_partitions(4)
548 .with_repartition_file_min_size(0)
549 .repartition_file_groups(&file_group);
550
551 assert_partitioned_files(None, partitioned_files);
552 }
553
554 #[test]
556 fn repartition_empty_files() {
557 let pfile_a = pfile("a", 10);
558 let pfile_b = pfile("b", 10);
559 let pfile_empty = pfile("empty", 0);
560
561 let empty_first = vec![
562 FileGroup::new(vec![pfile_empty.clone()]),
563 FileGroup::new(vec![pfile_a.clone()]),
564 FileGroup::new(vec![pfile_b.clone()]),
565 ];
566 let empty_middle = vec![
567 FileGroup::new(vec![pfile_a.clone()]),
568 FileGroup::new(vec![pfile_empty.clone()]),
569 FileGroup::new(vec![pfile_b.clone()]),
570 ];
571 let empty_last = vec![
572 FileGroup::new(vec![pfile_a]),
573 FileGroup::new(vec![pfile_b]),
574 FileGroup::new(vec![pfile_empty]),
575 ];
576
577 let expected_2 = vec![
579 FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
580 FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
581 ];
582 let expected_3 = vec![
583 FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
584 FileGroup::new(vec![
585 pfile("a", 10).with_range(7, 10),
586 pfile("b", 10).with_range(0, 4),
587 ]),
588 FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
589 ];
590
591 let file_groups_tests = [empty_first, empty_middle, empty_last];
592
593 for fg in file_groups_tests {
594 let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
595 for (n_partition, expected) in all_expected {
596 let actual = FileGroupPartitioner::new()
597 .with_target_partitions(n_partition)
598 .with_repartition_file_min_size(10)
599 .repartition_file_groups(&fg);
600
601 assert_partitioned_files(Some(expected), actual);
602 }
603 }
604 }
605
606 #[test]
607 fn repartition_single_file() {
608 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
610
611 let actual = FileGroupPartitioner::new()
612 .with_target_partitions(4)
613 .with_repartition_file_min_size(10)
614 .repartition_file_groups(&single_partition);
615
616 let expected = Some(vec![
617 FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
618 FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
619 FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
620 FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
621 ]);
622 assert_partitioned_files(expected, actual);
623 }
624
625 #[test]
626 fn repartition_too_much_partitions() {
627 let partitioned_file = pfile("a", 8);
629 let single_partition = vec![FileGroup::new(vec![partitioned_file])];
630
631 let actual = FileGroupPartitioner::new()
632 .with_target_partitions(96)
633 .with_repartition_file_min_size(5)
634 .repartition_file_groups(&single_partition);
635
636 let expected = Some(vec![
637 FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
638 FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
639 FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
640 FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
641 FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
642 FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
643 FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
644 FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
645 ]);
646
647 assert_partitioned_files(expected, actual);
648 }
649
650 #[test]
651 fn repartition_multiple_partitions() {
652 let source_partitions = vec![
654 FileGroup::new(vec![pfile("a", 40)]),
655 FileGroup::new(vec![pfile("b", 60)]),
656 ];
657
658 let actual = FileGroupPartitioner::new()
659 .with_target_partitions(3)
660 .with_repartition_file_min_size(10)
661 .repartition_file_groups(&source_partitions);
662
663 let expected = Some(vec![
664 FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
665 FileGroup::new(vec![
666 pfile("a", 40).with_range(34, 40),
667 pfile("b", 60).with_range(0, 28),
668 ]),
669 FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
670 ]);
671 assert_partitioned_files(expected, actual);
672 }
673
674 #[test]
675 fn repartition_same_num_partitions() {
676 let source_partitions = vec![
678 FileGroup::new(vec![pfile("a", 40)]),
679 FileGroup::new(vec![pfile("b", 60)]),
680 ];
681
682 let actual = FileGroupPartitioner::new()
683 .with_target_partitions(2)
684 .with_repartition_file_min_size(10)
685 .repartition_file_groups(&source_partitions);
686
687 let expected = Some(vec![
688 FileGroup::new(vec![
689 pfile("a", 40).with_range(0, 40),
690 pfile("b", 60).with_range(0, 10),
691 ]),
692 FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
693 ]);
694 assert_partitioned_files(expected, actual);
695 }
696
697 #[test]
698 fn repartition_no_action_ranges() {
699 let source_partitions = vec![
701 FileGroup::new(vec![pfile("a", 123)]),
702 FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
703 ];
704
705 let actual = FileGroupPartitioner::new()
706 .with_target_partitions(65)
707 .with_repartition_file_min_size(10)
708 .repartition_file_groups(&source_partitions);
709
710 assert_partitioned_files(None, actual)
711 }
712
713 #[test]
714 fn repartition_no_action_min_size() {
715 let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
717
718 let actual = FileGroupPartitioner::new()
719 .with_target_partitions(65)
720 .with_repartition_file_min_size(500)
721 .repartition_file_groups(&single_partition);
722
723 assert_partitioned_files(None, actual)
724 }
725
726 #[test]
727 fn repartition_no_action_zero_files() {
728 let empty_partition = vec![];
730
731 let partitioner = FileGroupPartitioner::new()
732 .with_target_partitions(65)
733 .with_repartition_file_min_size(500);
734
735 assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
736 }
737
738 #[test]
739 fn repartition_ordered_no_action_too_few_partitions() {
740 let input_partitions = vec![
742 FileGroup::new(vec![pfile("a", 100)]),
743 FileGroup::new(vec![pfile("b", 200)]),
744 ];
745
746 let actual = FileGroupPartitioner::new()
747 .with_preserve_order_within_groups(true)
748 .with_target_partitions(2)
749 .with_repartition_file_min_size(10)
750 .repartition_file_groups(&input_partitions);
751
752 assert_partitioned_files(None, actual)
753 }
754
755 #[test]
756 fn repartition_ordered_no_action_file_too_small() {
757 let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
759
760 let actual = FileGroupPartitioner::new()
761 .with_preserve_order_within_groups(true)
762 .with_target_partitions(2)
763 .with_repartition_file_min_size(1000)
765 .repartition_file_groups(&single_partition);
766
767 assert_partitioned_files(None, actual)
768 }
769
770 #[test]
771 fn repartition_ordered_one_large_file() {
772 let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
774
775 let actual = FileGroupPartitioner::new()
776 .with_preserve_order_within_groups(true)
777 .with_target_partitions(3)
778 .with_repartition_file_min_size(10)
779 .repartition_file_groups(&source_partitions);
780
781 let expected = Some(vec![
782 FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
783 FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
784 FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
785 ]);
786 assert_partitioned_files(expected, actual);
787 }
788
789 #[test]
790 fn repartition_ordered_one_large_one_small_file() {
791 let source_partitions = vec![
794 FileGroup::new(vec![pfile("a", 100)]),
795 FileGroup::new(vec![pfile("b", 30)]),
796 ];
797
798 let actual = FileGroupPartitioner::new()
799 .with_preserve_order_within_groups(true)
800 .with_target_partitions(4)
801 .with_repartition_file_min_size(10)
802 .repartition_file_groups(&source_partitions);
803
804 let expected = Some(vec![
805 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
807 FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
809 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
811 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
813 ]);
814 assert_partitioned_files(expected, actual);
815 }
816
817 #[test]
818 fn repartition_ordered_two_large_files() {
819 let source_partitions = vec![
821 FileGroup::new(vec![pfile("a", 100)]),
822 FileGroup::new(vec![pfile("b", 100)]),
823 ];
824
825 let actual = FileGroupPartitioner::new()
826 .with_preserve_order_within_groups(true)
827 .with_target_partitions(4)
828 .with_repartition_file_min_size(10)
829 .repartition_file_groups(&source_partitions);
830
831 let expected = Some(vec![
832 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
834 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
836 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
838 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
840 ]);
841 assert_partitioned_files(expected, actual);
842 }
843
844 #[test]
845 fn repartition_ordered_two_large_one_small_files() {
846 let source_partitions = vec![
848 FileGroup::new(vec![pfile("a", 100)]),
849 FileGroup::new(vec![pfile("b", 100)]),
850 FileGroup::new(vec![pfile("c", 30)]),
851 ];
852
853 let partitioner = FileGroupPartitioner::new()
854 .with_preserve_order_within_groups(true)
855 .with_repartition_file_min_size(10);
856
857 let actual = partitioner
859 .with_target_partitions(4)
860 .repartition_file_groups(&source_partitions);
861
862 let expected = Some(vec![
863 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
865 FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
867 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
869 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
871 ]);
872 assert_partitioned_files(expected, actual);
873
874 let actual = partitioner
876 .with_target_partitions(5)
877 .repartition_file_groups(&source_partitions);
878
879 let expected = Some(vec![
880 FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
882 FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
884 FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
886 FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
888 FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
890 ]);
891 assert_partitioned_files(expected, actual);
892 }
893
894 #[test]
895 fn repartition_ordered_one_large_one_small_existing_empty() {
896 let source_partitions = vec![
898 FileGroup::new(vec![pfile("a", 100)]),
899 FileGroup::default(),
900 FileGroup::new(vec![pfile("b", 40)]),
901 FileGroup::default(),
902 ];
903
904 let actual = FileGroupPartitioner::new()
905 .with_preserve_order_within_groups(true)
906 .with_target_partitions(5)
907 .with_repartition_file_min_size(10)
908 .repartition_file_groups(&source_partitions);
909
910 let expected = Some(vec![
913 FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
915 FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
916 FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
918 FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
920 FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
922 ]);
923 assert_partitioned_files(expected, actual);
924 }
925 #[test]
926 fn repartition_ordered_existing_group_multiple_files() {
927 let source_partitions = vec![
929 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
931 FileGroup::new(vec![pfile("c", 40)]),
932 ];
933
934 let actual = FileGroupPartitioner::new()
935 .with_preserve_order_within_groups(true)
936 .with_target_partitions(3)
937 .with_repartition_file_min_size(10)
938 .repartition_file_groups(&source_partitions);
939
940 let expected = Some(vec![
943 FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
947 FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
949 FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
951 ]);
952 assert_partitioned_files(expected, actual);
953 }
954
955 fn assert_partitioned_files(
958 expected: Option<Vec<FileGroup>>,
959 actual: Option<Vec<FileGroup>>,
960 ) {
961 match (expected, actual) {
962 (None, None) => {}
963 (Some(_), None) => panic!("Expected Some, got None"),
964 (None, Some(_)) => panic!("Expected None, got Some"),
965 (Some(expected), Some(actual)) => {
966 let expected_string = format!("{expected:#?}");
967 let actual_string = format!("{actual:#?}");
968 assert_eq!(expected_string, actual_string);
969 }
970 }
971 }
972
973 fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
975 PartitionedFile::new(path, file_size)
976 }
977
978 fn repartition_test(
981 partitioner: FileGroupPartitioner,
982 file_groups: Vec<FileGroup>,
983 ) -> Option<Vec<FileGroup>> {
984 let repartitioned = partitioner.repartition_file_groups(&file_groups);
985
986 let repartitioned_preserving_sort = partitioner
987 .with_preserve_order_within_groups(true)
988 .repartition_file_groups(&file_groups);
989
990 assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
991 repartitioned
992 }
993}