use crate::datasource::listing::{FileRange, PartitionedFile};
use itertools::Itertools;
use std::cmp::min;
use std::collections::BinaryHeap;
use std::iter::repeat_with;
#[derive(Debug, Clone, Copy)]
pub struct FileGroupPartitioner {
target_partitions: usize,
repartition_file_min_size: usize,
preserve_order_within_groups: bool,
}
impl Default for FileGroupPartitioner {
fn default() -> Self {
Self::new()
}
}
impl FileGroupPartitioner {
pub fn new() -> Self {
Self {
target_partitions: 1,
repartition_file_min_size: 10 * 1024 * 1024,
preserve_order_within_groups: false,
}
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_repartition_file_min_size(
mut self,
repartition_file_min_size: usize,
) -> Self {
self.repartition_file_min_size = repartition_file_min_size;
self
}
pub fn with_preserve_order_within_groups(
mut self,
preserve_order_within_groups: bool,
) -> Self {
self.preserve_order_within_groups = preserve_order_within_groups;
self
}
pub fn repartition_file_groups(
&self,
file_groups: &[Vec<PartitionedFile>],
) -> Option<Vec<Vec<PartitionedFile>>> {
if file_groups.is_empty() {
return None;
}
let has_ranges = file_groups.iter().flatten().any(|f| f.range.is_some());
if has_ranges {
return None;
}
if self.preserve_order_within_groups {
self.repartition_preserving_order(file_groups)
} else {
self.repartition_evenly_by_size(file_groups)
}
}
fn repartition_evenly_by_size(
&self,
file_groups: &[Vec<PartitionedFile>],
) -> Option<Vec<Vec<PartitionedFile>>> {
let target_partitions = self.target_partitions;
let repartition_file_min_size = self.repartition_file_min_size;
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
let total_size = flattened_files
.iter()
.map(|f| f.object_meta.size as i64)
.sum::<i64>();
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
return None;
}
let target_partition_size = (total_size as usize).div_ceil(target_partitions);
let current_partition_index: usize = 0;
let current_partition_size: usize = 0;
let repartitioned_files = flattened_files
.into_iter()
.scan(
(current_partition_index, current_partition_size),
|state, source_file| {
let mut produced_files = vec![];
let mut range_start = 0;
while range_start < source_file.object_meta.size {
let range_end = min(
range_start + (target_partition_size - state.1),
source_file.object_meta.size,
);
let mut produced_file = source_file.clone();
produced_file.range = Some(FileRange {
start: range_start as i64,
end: range_end as i64,
});
produced_files.push((state.0, produced_file));
if state.1 + (range_end - range_start) >= target_partition_size {
state.0 += 1;
state.1 = 0;
} else {
state.1 += range_end - range_start;
}
range_start = range_end;
}
Some(produced_files)
},
)
.flatten()
.chunk_by(|(partition_idx, _)| *partition_idx)
.into_iter()
.map(|(_, group)| group.map(|(_, vals)| vals).collect_vec())
.collect_vec();
Some(repartitioned_files)
}
fn repartition_preserving_order(
&self,
file_groups: &[Vec<PartitionedFile>],
) -> Option<Vec<Vec<PartitionedFile>>> {
if file_groups.len() >= self.target_partitions {
return None;
}
let num_new_groups = self.target_partitions - file_groups.len();
if file_groups.len() == 1 && file_groups[0].len() == 1 {
return self.repartition_evenly_by_size(file_groups);
}
let mut heap: BinaryHeap<_> = file_groups
.iter()
.enumerate()
.filter_map(|(group_index, group)| {
if group.len() == 1 {
Some(ToRepartition {
source_index: group_index,
file_size: group[0].object_meta.size,
new_groups: vec![group_index],
})
} else {
None
}
})
.collect();
if heap.is_empty() {
return None;
}
let mut file_groups: Vec<_> = file_groups
.iter()
.cloned()
.chain(repeat_with(Vec::new).take(num_new_groups))
.collect();
for (group_index, group) in file_groups.iter().enumerate() {
if !group.is_empty() {
continue;
}
let mut largest_group = heap.pop().unwrap();
largest_group.new_groups.push(group_index);
heap.push(largest_group);
}
while let Some(to_repartition) = heap.pop() {
let range_size = to_repartition.range_size() as i64;
let ToRepartition {
source_index,
file_size,
new_groups,
} = to_repartition;
assert_eq!(file_groups[source_index].len(), 1);
let original_file = file_groups[source_index].pop().unwrap();
let last_group = new_groups.len() - 1;
let mut range_start: i64 = 0;
let mut range_end: i64 = range_size;
for (i, group_index) in new_groups.into_iter().enumerate() {
let target_group = &mut file_groups[group_index];
assert!(target_group.is_empty());
if i == last_group {
range_end = file_size as i64;
}
target_group
.push(original_file.clone().with_range(range_start, range_end));
range_start = range_end;
range_end += range_size;
}
}
Some(file_groups)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ToRepartition {
source_index: usize,
file_size: usize,
new_groups: Vec<usize>,
}
impl ToRepartition {
fn range_size(&self) -> usize {
self.file_size / self.new_groups.len()
}
}
impl PartialOrd for ToRepartition {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ToRepartition {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.range_size().cmp(&other.range_size())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn repartition_empty_file_only() {
let partitioned_file_empty = pfile("empty", 0);
let file_group = vec![vec![partitioned_file_empty]];
let partitioned_files = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(0)
.repartition_file_groups(&file_group);
assert_partitioned_files(None, partitioned_files);
}
#[test]
fn repartition_empty_files() {
let pfile_a = pfile("a", 10);
let pfile_b = pfile("b", 10);
let pfile_empty = pfile("empty", 0);
let empty_first = vec![
vec![pfile_empty.clone()],
vec![pfile_a.clone()],
vec![pfile_b.clone()],
];
let empty_middle = vec![
vec![pfile_a.clone()],
vec![pfile_empty.clone()],
vec![pfile_b.clone()],
];
let empty_last = vec![vec![pfile_a], vec![pfile_b], vec![pfile_empty]];
let expected_2 = vec![
vec![pfile("a", 10).with_range(0, 10)],
vec![pfile("b", 10).with_range(0, 10)],
];
let expected_3 = vec![
vec![pfile("a", 10).with_range(0, 7)],
vec![
pfile("a", 10).with_range(7, 10),
pfile("b", 10).with_range(0, 4),
],
vec![pfile("b", 10).with_range(4, 10)],
];
let file_groups_tests = [empty_first, empty_middle, empty_last];
for fg in file_groups_tests {
let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
for (n_partition, expected) in all_expected {
let actual = FileGroupPartitioner::new()
.with_target_partitions(n_partition)
.with_repartition_file_min_size(10)
.repartition_file_groups(&fg);
assert_partitioned_files(Some(expected), actual);
}
}
}
#[test]
fn repartition_single_file() {
let single_partition = vec![vec![pfile("a", 123)]];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
vec![pfile("a", 123).with_range(0, 31)],
vec![pfile("a", 123).with_range(31, 62)],
vec![pfile("a", 123).with_range(62, 93)],
vec![pfile("a", 123).with_range(93, 123)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_too_much_partitions() {
let partitioned_file = pfile("a", 8);
let single_partition = vec![vec![partitioned_file]];
let actual = FileGroupPartitioner::new()
.with_target_partitions(96)
.with_repartition_file_min_size(5)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
vec![pfile("a", 8).with_range(0, 1)],
vec![pfile("a", 8).with_range(1, 2)],
vec![pfile("a", 8).with_range(2, 3)],
vec![pfile("a", 8).with_range(3, 4)],
vec![pfile("a", 8).with_range(4, 5)],
vec![pfile("a", 8).with_range(5, 6)],
vec![pfile("a", 8).with_range(6, 7)],
vec![pfile("a", 8).with_range(7, 8)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_multiple_partitions() {
let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
let actual = FileGroupPartitioner::new()
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 40).with_range(0, 34)],
vec![
pfile("a", 40).with_range(34, 40),
pfile("b", 60).with_range(0, 28),
],
vec![pfile("b", 60).with_range(28, 60)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_same_num_partitions() {
let source_partitions = vec![vec![pfile("a", 40)], vec![pfile("b", 60)]];
let actual = FileGroupPartitioner::new()
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![
pfile("a", 40).with_range(0, 40),
pfile("b", 60).with_range(0, 10),
],
vec![pfile("b", 60).with_range(10, 60)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_no_action_ranges() {
let source_partitions = vec![
vec![pfile("a", 123)],
vec![pfile("b", 144).with_range(1, 50)],
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_no_action_min_size() {
let single_partition = vec![vec![pfile("a", 123)]];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_no_action_zero_files() {
let empty_partition = vec![];
let partitioner = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500);
assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
}
#[test]
fn repartition_ordered_no_action_too_few_partitions() {
let input_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 200)]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&input_partitions);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_no_action_file_too_small() {
let single_partition = vec![vec![pfile("a", 100)]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(1000)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_one_large_file() {
let source_partitions = vec![vec![pfile("a", 100)]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 34)],
vec![pfile("a", 100).with_range(34, 68)],
vec![pfile("a", 100).with_range(68, 100)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file() {
let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 30)]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 33)],
vec![pfile("b", 30).with_range(0, 30)],
vec![pfile("a", 100).with_range(33, 66)],
vec![pfile("a", 100).with_range(66, 100)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_files() {
let source_partitions = vec![vec![pfile("a", 100)], vec![pfile("b", 100)]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 50)],
vec![pfile("b", 100).with_range(0, 50)],
vec![pfile("a", 100).with_range(50, 100)],
vec![pfile("b", 100).with_range(50, 100)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_one_small_files() {
let source_partitions = vec![
vec![pfile("a", 100)],
vec![pfile("b", 100)],
vec![pfile("c", 30)],
];
let partitioner = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_repartition_file_min_size(10);
let actual = partitioner
.with_target_partitions(4)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 50)],
vec![pfile("b", 100).with_range(0, 100)],
vec![pfile("c", 30).with_range(0, 30)],
vec![pfile("a", 100).with_range(50, 100)],
]);
assert_partitioned_files(expected, actual);
let actual = partitioner
.with_target_partitions(5)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 50)],
vec![pfile("b", 100).with_range(0, 50)],
vec![pfile("c", 30).with_range(0, 30)],
vec![pfile("a", 100).with_range(50, 100)],
vec![pfile("b", 100).with_range(50, 100)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_existing_empty() {
let source_partitions =
vec![vec![pfile("a", 100)], vec![], vec![pfile("b", 40)], vec![]];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(5)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100).with_range(0, 33)],
vec![pfile("a", 100).with_range(33, 66)],
vec![pfile("b", 40).with_range(0, 20)],
vec![pfile("a", 100).with_range(66, 100)],
vec![pfile("b", 40).with_range(20, 40)],
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_existing_group_multiple_files() {
let source_partitions = vec![
vec![pfile("a", 100), pfile("b", 100)],
vec![pfile("c", 40)],
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
vec![pfile("a", 100), pfile("b", 100)],
vec![pfile("c", 40).with_range(0, 20)],
vec![pfile("c", 40).with_range(20, 40)],
]);
assert_partitioned_files(expected, actual);
}
fn assert_partitioned_files(
expected: Option<Vec<Vec<PartitionedFile>>>,
actual: Option<Vec<Vec<PartitionedFile>>>,
) {
match (expected, actual) {
(None, None) => {}
(Some(_), None) => panic!("Expected Some, got None"),
(None, Some(_)) => panic!("Expected None, got Some"),
(Some(expected), Some(actual)) => {
let expected_string = format!("{:#?}", expected);
let actual_string = format!("{:#?}", actual);
assert_eq!(expected_string, actual_string);
}
}
}
fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
PartitionedFile::new(path, file_size)
}
fn repartition_test(
partitioner: FileGroupPartitioner,
file_groups: Vec<Vec<PartitionedFile>>,
) -> Option<Vec<Vec<PartitionedFile>>> {
let repartitioned = partitioner.repartition_file_groups(&file_groups);
let repartitioned_preserving_sort = partitioner
.with_preserve_order_within_groups(true)
.repartition_file_groups(&file_groups);
assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
repartitioned
}
}