1use crate::{FileRange, PartitionedFile};
21use itertools::Itertools;
22use std::cmp::min;
23use std::collections::BinaryHeap;
24use std::iter::repeat_with;
25
26#[derive(Debug, Clone, Copy)]
125pub struct FileGroupPartitioner {
126 target_partitions: usize,
128 repartition_file_min_size: usize,
130 preserve_order_within_groups: bool,
132}
133
134impl Default for FileGroupPartitioner {
135 fn default() -> Self {
136 Self::new()
137 }
138}
139
140impl FileGroupPartitioner {
141 pub fn new() -> Self {
146 Self {
147 target_partitions: 1,
148 repartition_file_min_size: 10 * 1024 * 1024,
149 preserve_order_within_groups: false,
150 }
151 }
152
153 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
155 self.target_partitions = target_partitions;
156 self
157 }
158
159 pub fn with_repartition_file_min_size(
161 mut self,
162 repartition_file_min_size: usize,
163 ) -> Self {
164 self.repartition_file_min_size = repartition_file_min_size;
165 self
166 }
167
168 pub fn with_preserve_order_within_groups(
170 mut self,
171 preserve_order_within_groups: bool,
172 ) -> Self {
173 self.preserve_order_within_groups = preserve_order_within_groups;
174 self
175 }
176
177 pub fn repartition_file_groups(
181 &self,
182 file_groups: &[Vec<PartitionedFile>],
183 ) -> Option<Vec<Vec<PartitionedFile>>> {
184 if file_groups.is_empty() {
185 return None;
186 }
187
188 let has_ranges = file_groups.iter().flatten().any(|f| f.range.is_some());
190 if has_ranges {
191 return None;
192 }
193
194 if self.preserve_order_within_groups {
196 self.repartition_preserving_order(file_groups)
197 } else {
198 self.repartition_evenly_by_size(file_groups)
199 }
200 }
201
202 fn repartition_evenly_by_size(
205 &self,
206 file_groups: &[Vec<PartitionedFile>],
207 ) -> Option<Vec<Vec<PartitionedFile>>> {
208 let target_partitions = self.target_partitions;
209 let repartition_file_min_size = self.repartition_file_min_size;
210 let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
211
212 let total_size = flattened_files
213 .iter()
214 .map(|f| f.object_meta.size as i64)
215 .sum::<i64>();
216 if total_size < (repartition_file_min_size as i64) || total_size == 0 {
217 return None;
218 }
219
220 let target_partition_size = (total_size as usize).div_ceil(target_partitions);
221
222 let current_partition_index: usize = 0;
223 let current_partition_size: usize = 0;
224
225 let repartitioned_files = flattened_files
227 .into_iter()
228 .scan(
229 (current_partition_index, current_partition_size),
230 |state, source_file| {
231 let mut produced_files = vec![];
232 let mut range_start = 0;
233 while range_start < source_file.object_meta.size {
234 let range_end = min(
235 range_start + (target_partition_size - state.1),
236 source_file.object_meta.size,
237 );
238
239 let mut produced_file = source_file.clone();
240 produced_file.range = Some(FileRange {
241 start: range_start as i64,
242 end: range_end as i64,
243 });
244 produced_files.push((state.0, produced_file));
245
246 if state.1 + (range_end - range_start) >= target_partition_size {
247 state.0 += 1;
248 state.1 = 0;
249 } else {
250 state.1 += range_end - range_start;
251 }
252 range_start = range_end;
253 }
254 Some(produced_files)
255 },
256 )
257 .flatten()
258 .chunk_by(|(partition_idx, _)| *partition_idx)
259 .into_iter()
260 .map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
261 .collect_vec();
262
263 Some(repartitioned_files)
264 }
265
266 fn repartition_preserving_order(
268 &self,
269 file_groups: &[Vec<PartitionedFile>],
270 ) -> Option<Vec<Vec<PartitionedFile>>> {
271 if file_groups.len() >= self.target_partitions {
274 return None;
275 }
276 let num_new_groups = self.target_partitions - file_groups.len();
277
278 if file_groups.len() == 1 && file_groups[0].len() == 1 {
280 return self.repartition_evenly_by_size(file_groups);
281 }
282
283 let mut heap: BinaryHeap<_> = file_groups
285 .iter()
286 .enumerate()
287 .filter_map(|(group_index, group)| {
288 if group.len() == 1 {
290 Some(ToRepartition {
291 source_index: group_index,
292 file_size: group[0].object_meta.size,
293 new_groups: vec![group_index],
294 })
295 } else {
296 None
297 }
298 })
299 .collect();
300
301 if heap.is_empty() {
303 return None;
304 }
305
306 let mut file_groups: Vec<_> = file_groups
308 .iter()
309 .cloned()
310 .chain(repeat_with(Vec::new).take(num_new_groups))
311 .collect();
312
313 for (group_index, group) in file_groups.iter().enumerate() {
315 if !group.is_empty() {
316 continue;
317 }
318 let mut largest_group = heap.pop().unwrap();
320 largest_group.new_groups.push(group_index);
321 heap.push(largest_group);
322 }
323
324 while let Some(to_repartition) = heap.pop() {
326 let range_size = to_repartition.range_size() as i64;
327 let ToRepartition {
328 source_index,
329 file_size,
330 new_groups,
331 } = to_repartition;
332 assert_eq!(file_groups[source_index].len(), 1);
333 let original_file = file_groups[source_index].pop().unwrap();
334
335 let last_group = new_groups.len() - 1;
336 let mut range_start: i64 = 0;
337 let mut range_end: i64 = range_size;
338 for (i, group_index) in new_groups.into_iter().enumerate() {
339 let target_group = &mut file_groups[group_index];
340 assert!(target_group.is_empty());
341
342 if i == last_group {
344 range_end = file_size as i64;
345 }
346 target_group
347 .push(original_file.clone().with_range(range_start, range_end));
348 range_start = range_end;
349 range_end += range_size;
350 }
351 }
352
353 Some(file_groups)
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
359struct ToRepartition {
360 source_index: usize,
362 file_size: usize,
364 new_groups: Vec<usize>,
366}
367
368impl ToRepartition {
369 fn range_size(&self) -> usize {
371 self.file_size / self.new_groups.len()
372 }
373}
374
375impl PartialOrd for ToRepartition {
376 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
377 Some(self.cmp(other))
378 }
379}
380
381impl Ord for ToRepartition {
383 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
384 self.range_size().cmp(&other.range_size())
385 }
386}
387
388#[cfg(test)]
389mod test {
390 use super::*;
391
392 #[test]
394 fn repartition_empty_file_only() {
395 let partitioned_file_empty = pfile("empty", 0);
396 let file_group = vec![vec![partitioned_file_empty]];
397
398 let partitioned_files = FileGroupPartitioner::new()
399 .with_target_partitions(4)
400 .with_repartition_file_min_size(0)
401 .repartition_file_groups(&file_group);
402
403 assert_partitioned_files(None, partitioned_files);
404 }
405
406 #[test]
408 fn repartition_empty_files() {
409 let pfile_a = pfile("a", 10);
410 let pfile_b = pfile("b", 10);
411 let pfile_empty = pfile("empty", 0);
412
413 let empty_first = vec![
414 vec![pfile_empty.clone()],
415 vec![pfile_a.clone()],
416 vec![pfile_b.clone()],
417 ];
418 let empty_middle = vec![
419 vec![pfile_a.clone()],
420 vec![pfile_empty.clone()],
421 vec![pfile_b.clone()],
422 ];
423 let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]];
424
425 let expected_2 = vec![
427 vec![pfile("a", 10).with_range(0, 10)],
428 vec![pfile("b", 10).with_range(0, 10)],
429 ];
430 let expected_3 = vec![
431 vec![pfile("a", 10).with_range(0, 7)],
432 vec![
433 pfile("a", 10).with_range(7, 10),
434 pfile("b", 10).with_range(0, 4),
435 ],
436 vec![pfile("b", 10).with_range(4, 10)],
437 ];
438
439 let file_groups_tests = [empty_first, empty_middle, empty_last];
440
441 for fg in file_groups_tests {
442 let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
443 for (n_partition, expected) in all_expected {
444 let actual = FileGroupPartitioner::new()
445 .with_target_partitions(n_partition)
446 .with_repartition_file_min_size(10)
447 .repartition_file_groups(&fg);
448
449 assert_partitioned_files(Some(expected), actual);
450 }
451 }
452 }
453
454 #[test]
455 fn repartition_single_file() {
456 let single_partition = vec![vec![pfile("a", 123)]];
458
459 let actual = FileGroupPartitioner::new()
460 .with_target_partitions(4)
461 .with_repartition_file_min_size(10)
462 .repartition_file_groups(&single_partition);
463
464 let expected = Some(vec![
465 vec![pfile("a", 123).with_range(0, 31)],
466 vec![pfile("a", 123).with_range(31, 62)],
467 vec![pfile("a", 123).with_range(62, 93)],
468 vec![pfile("a", 123).with_range(93, 123)],
469 ]);
470 assert_partitioned_files(expected, actual);
471 }
472
473 #[test]
474 fn repartition_too_much_partitions() {
475 let partitioned_file = pfile("a", 8);
477 let single_partition = vec![vec![partitioned_file]];
478
479 let actual = FileGroupPartitioner::new()
480 .with_target_partitions(96)
481 .with_repartition_file_min_size(5)
482 .repartition_file_groups(&single_partition);
483
484 let expected = Some(vec![
485 vec![pfile("a", 8).with_range(0, 1)],
486 vec![pfile("a", 8).with_range(1, 2)],
487 vec![pfile("a", 8).with_range(2, 3)],
488 vec![pfile("a", 8).with_range(3, 4)],
489 vec![pfile("a", 8).with_range(4, 5)],
490 vec![pfile("a", 8).with_range(5, 6)],
491 vec![pfile("a", 8).with_range(6, 7)],
492 vec![pfile("a", 8).with_range(7, 8)],
493 ]);
494
495 assert_partitioned_files(expected, actual);
496 }
497
498 #[test]
499 fn repartition_multiple_partitions() {
500 let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
502
503 let actual = FileGroupPartitioner::new()
504 .with_target_partitions(3)
505 .with_repartition_file_min_size(10)
506 .repartition_file_groups(&source_partitions);
507
508 let expected = Some(vec![
509 vec![pfile("a", 40).with_range(0, 34)],
510 vec![
511 pfile("a", 40).with_range(34, 40),
512 pfile("b", 60).with_range(0, 28),
513 ],
514 vec![pfile("b", 60).with_range(28, 60)],
515 ]);
516 assert_partitioned_files(expected, actual);
517 }
518
519 #[test]
520 fn repartition_same_num_partitions() {
521 let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
523
524 let actual = FileGroupPartitioner::new()
525 .with_target_partitions(2)
526 .with_repartition_file_min_size(10)
527 .repartition_file_groups(&source_partitions);
528
529 let expected = Some(vec![
530 vec![
531 pfile("a", 40).with_range(0, 40),
532 pfile("b", 60).with_range(0, 10),
533 ],
534 vec![pfile("b", 60).with_range(10, 60)],
535 ]);
536 assert_partitioned_files(expected, actual);
537 }
538
539 #[test]
540 fn repartition_no_action_ranges() {
541 let source_partitions = vec![
543 vec![pfile("a", 123)],
544 vec![pfile("b", 144).with_range(1, 50)],
545 ];
546
547 let actual = FileGroupPartitioner::new()
548 .with_target_partitions(65)
549 .with_repartition_file_min_size(10)
550 .repartition_file_groups(&source_partitions);
551
552 assert_partitioned_files(None, actual)
553 }
554
555 #[test]
556 fn repartition_no_action_min_size() {
557 let single_partition = vec![vec![pfile("a", 123)]];
559
560 let actual = FileGroupPartitioner::new()
561 .with_target_partitions(65)
562 .with_repartition_file_min_size(500)
563 .repartition_file_groups(&single_partition);
564
565 assert_partitioned_files(None, actual)
566 }
567
568 #[test]
569 fn repartition_no_action_zero_files() {
570 let empty_partition = vec![];
572
573 let partitioner = FileGroupPartitioner::new()
574 .with_target_partitions(65)
575 .with_repartition_file_min_size(500);
576
577 assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
578 }
579
580 #[test]
581 fn repartition_ordered_no_action_too_few_partitions() {
582 let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 200)]];
584
585 let actual = FileGroupPartitioner::new()
586 .with_preserve_order_within_groups(true)
587 .with_target_partitions(2)
588 .with_repartition_file_min_size(10)
589 .repartition_file_groups(&input_partitions);
590
591 assert_partitioned_files(None, actual)
592 }
593
594 #[test]
595 fn repartition_ordered_no_action_file_too_small() {
596 let single_partition = vec![vec![pfile("a", 100)]];
598
599 let actual = FileGroupPartitioner::new()
600 .with_preserve_order_within_groups(true)
601 .with_target_partitions(2)
602 .with_repartition_file_min_size(1000)
604 .repartition_file_groups(&single_partition);
605
606 assert_partitioned_files(None, actual)
607 }
608
609 #[test]
610 fn repartition_ordered_one_large_file() {
611 let source_partitions = vec![vec![pfile("a", 100)]];
613
614 let actual = FileGroupPartitioner::new()
615 .with_preserve_order_within_groups(true)
616 .with_target_partitions(3)
617 .with_repartition_file_min_size(10)
618 .repartition_file_groups(&source_partitions);
619
620 let expected = Some(vec![
621 vec![pfile("a", 100).with_range(0, 34)],
622 vec![pfile("a", 100).with_range(34, 68)],
623 vec![pfile("a", 100).with_range(68, 100)],
624 ]);
625 assert_partitioned_files(expected, actual);
626 }
627
628 #[test]
629 fn repartition_ordered_one_large_one_small_file() {
630 let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 30)]];
633
634 let actual = FileGroupPartitioner::new()
635 .with_preserve_order_within_groups(true)
636 .with_target_partitions(4)
637 .with_repartition_file_min_size(10)
638 .repartition_file_groups(&source_partitions);
639
640 let expected = Some(vec![
641 vec![pfile("a", 100).with_range(0, 33)],
643 vec![pfile("b", 30).with_range(0, 30)],
645 vec![pfile("a", 100).with_range(33, 66)],
647 vec![pfile("a", 100).with_range(66, 100)],
649 ]);
650 assert_partitioned_files(expected, actual);
651 }
652
653 #[test]
654 fn repartition_ordered_two_large_files() {
655 let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 100)]];
657
658 let actual = FileGroupPartitioner::new()
659 .with_preserve_order_within_groups(true)
660 .with_target_partitions(4)
661 .with_repartition_file_min_size(10)
662 .repartition_file_groups(&source_partitions);
663
664 let expected = Some(vec![
665 vec![pfile("a", 100).with_range(0, 50)],
667 vec![pfile("b", 100).with_range(0, 50)],
669 vec![pfile("a", 100).with_range(50, 100)],
671 vec![pfile("b", 100).with_range(50, 100)],
673 ]);
674 assert_partitioned_files(expected, actual);
675 }
676
677 #[test]
678 fn repartition_ordered_two_large_one_small_files() {
679 let source_partitions = vec![
681 vec![pfile("a", 100)],
682 vec![pfile("b", 100)],
683 vec![pfile("c", 30)],
684 ];
685
686 let partitioner = FileGroupPartitioner::new()
687 .with_preserve_order_within_groups(true)
688 .with_repartition_file_min_size(10);
689
690 let actual = partitioner
692 .with_target_partitions(4)
693 .repartition_file_groups(&source_partitions);
694
695 let expected = Some(vec![
696 vec![pfile("a", 100).with_range(0, 50)],
698 vec![pfile("b", 100).with_range(0, 100)],
700 vec![pfile("c", 30).with_range(0, 30)],
702 vec![pfile("a", 100).with_range(50, 100)],
704 ]);
705 assert_partitioned_files(expected, actual);
706
707 let actual = partitioner
709 .with_target_partitions(5)
710 .repartition_file_groups(&source_partitions);
711
712 let expected = Some(vec![
713 vec![pfile("a", 100).with_range(0, 50)],
715 vec![pfile("b", 100).with_range(0, 50)],
717 vec![pfile("c", 30).with_range(0, 30)],
719 vec![pfile("a", 100).with_range(50, 100)],
721 vec![pfile("b", 100).with_range(50, 100)],
723 ]);
724 assert_partitioned_files(expected, actual);
725 }
726
727 #[test]
728 fn repartition_ordered_one_large_one_small_existing_empty() {
729 let source_partitions =
731 vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]];
732
733 let actual = FileGroupPartitioner::new()
734 .with_preserve_order_within_groups(true)
735 .with_target_partitions(5)
736 .with_repartition_file_min_size(10)
737 .repartition_file_groups(&source_partitions);
738
739 let expected = Some(vec![
742 vec![pfile("a", 100).with_range(0, 33)],
744 vec![pfile("a", 100).with_range(33, 66)],
745 vec![pfile("b", 40).with_range(0, 20)],
747 vec![pfile("a", 100).with_range(66, 100)],
749 vec![pfile("b", 40).with_range(20, 40)],
751 ]);
752 assert_partitioned_files(expected, actual);
753 }
754 #[test]
755 fn repartition_ordered_existing_group_multiple_files() {
756 let source_partitions = vec![
758 vec![pfile("a", 100), pfile("b", 100)],
760 vec![pfile("c", 40)],
761 ];
762
763 let actual = FileGroupPartitioner::new()
764 .with_preserve_order_within_groups(true)
765 .with_target_partitions(3)
766 .with_repartition_file_min_size(10)
767 .repartition_file_groups(&source_partitions);
768
769 let expected = Some(vec![
772 vec![pfile("a", 100), pfile("b", 100)],
776 vec![pfile("c", 40).with_range(0, 20)],
778 vec![pfile("c", 40).with_range(20, 40)],
780 ]);
781 assert_partitioned_files(expected, actual);
782 }
783
784 fn assert_partitioned_files(
787 expected: Option<Vec<Vec<PartitionedFile>>>,
788 actual: Option<Vec<Vec<PartitionedFile>>>,
789 ) {
790 match (expected, actual) {
791 (None, None) => {}
792 (Some(_), None) => panic!("Expected Some, got None"),
793 (None, Some(_)) => panic!("Expected None, got Some"),
794 (Some(expected), Some(actual)) => {
795 let expected_string = format!("{:#?}", expected);
796 let actual_string = format!("{:#?}", actual);
797 assert_eq!(expected_string, actual_string);
798 }
799 }
800 }
801
802 fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
804 PartitionedFile::new(path, file_size)
805 }
806
807 fn repartition_test(
810 partitioner: FileGroupPartitioner,
811 file_groups: Vec<Vec<PartitionedFile>>,
812 ) -> Option<Vec<Vec<PartitionedFile>>> {
813 let repartitioned = partitioner.repartition_file_groups(&file_groups);
814
815 let repartitioned_preserving_sort = partitioner
816 .with_preserve_order_within_groups(true)
817 .repartition_file_groups(&file_groups);
818
819 assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
820 repartitioned
821 }
822}