use crate::{FileRange, PartitionedFile};
use datafusion_common::Statistics;
use itertools::Itertools;
use std::cmp::min;
use std::collections::BinaryHeap;
use std::iter::repeat_with;
use std::mem;
use std::ops::{Index, IndexMut};
use std::sync::Arc;
#[derive(Debug, Clone, Copy)]
pub struct FileGroupPartitioner {
target_partitions: usize,
repartition_file_min_size: usize,
preserve_order_within_groups: bool,
}
impl Default for FileGroupPartitioner {
fn default() -> Self {
Self::new()
}
}
impl FileGroupPartitioner {
pub fn new() -> Self {
Self {
target_partitions: 1,
repartition_file_min_size: 10 * 1024 * 1024,
preserve_order_within_groups: false,
}
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_repartition_file_min_size(
mut self,
repartition_file_min_size: usize,
) -> Self {
self.repartition_file_min_size = repartition_file_min_size;
self
}
pub fn with_preserve_order_within_groups(
mut self,
preserve_order_within_groups: bool,
) -> Self {
self.preserve_order_within_groups = preserve_order_within_groups;
self
}
pub fn repartition_file_groups(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
if file_groups.is_empty() {
return None;
}
let has_ranges = file_groups
.iter()
.flat_map(FileGroup::iter)
.any(|f| f.range.is_some());
if has_ranges {
return None;
}
if self.preserve_order_within_groups {
self.repartition_preserving_order(file_groups)
} else {
self.repartition_evenly_by_size(file_groups)
}
}
fn repartition_evenly_by_size(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
let target_partitions = self.target_partitions;
let repartition_file_min_size = self.repartition_file_min_size;
let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
let total_size = flattened_files
.iter()
.map(|f| f.object_meta.size as i64)
.sum::<i64>();
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
return None;
}
let target_partition_size =
(total_size as u64).div_ceil(target_partitions as u64);
let current_partition_index: usize = 0;
let current_partition_size: u64 = 0;
let repartitioned_files = flattened_files
.into_iter()
.scan(
(current_partition_index, current_partition_size),
|state, source_file| {
let mut produced_files = vec![];
let mut range_start = 0;
while range_start < source_file.object_meta.size {
let range_end = min(
range_start + (target_partition_size - state.1),
source_file.object_meta.size,
);
let mut produced_file = source_file.clone();
produced_file.range = Some(FileRange {
start: range_start as i64,
end: range_end as i64,
});
produced_files.push((state.0, produced_file));
if state.1 + (range_end - range_start) >= target_partition_size {
state.0 += 1;
state.1 = 0;
} else {
state.1 += range_end - range_start;
}
range_start = range_end;
}
Some(produced_files)
},
)
.flatten()
.chunk_by(|(partition_idx, _)| *partition_idx)
.into_iter()
.map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
.collect_vec();
Some(repartitioned_files)
}
fn repartition_preserving_order(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
if file_groups.len() >= self.target_partitions {
return None;
}
let num_new_groups = self.target_partitions - file_groups.len();
if file_groups.len() == 1 && file_groups[0].len() == 1 {
return self.repartition_evenly_by_size(file_groups);
}
let mut heap: BinaryHeap<_> = file_groups
.iter()
.enumerate()
.filter_map(|(group_index, group)| {
if group.len() == 1 {
Some(ToRepartition {
source_index: group_index,
file_size: group[0].object_meta.size,
new_groups: vec![group_index],
})
} else {
None
}
})
.collect();
if heap.is_empty() {
return None;
}
let mut file_groups: Vec<_> = file_groups
.iter()
.cloned()
.chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
.collect();
for (group_index, group) in file_groups.iter().enumerate() {
if !group.is_empty() {
continue;
}
let mut largest_group = heap.pop().unwrap();
largest_group.new_groups.push(group_index);
heap.push(largest_group);
}
while let Some(to_repartition) = heap.pop() {
let range_size = to_repartition.range_size() as i64;
let ToRepartition {
source_index,
file_size,
new_groups,
} = to_repartition;
assert_eq!(file_groups[source_index].len(), 1);
let original_file = file_groups[source_index].pop().unwrap();
let last_group = new_groups.len() - 1;
let mut range_start: i64 = 0;
let mut range_end: i64 = range_size;
for (i, group_index) in new_groups.into_iter().enumerate() {
let target_group = &mut file_groups[group_index];
assert!(target_group.is_empty());
if i == last_group {
range_end = file_size as i64;
}
target_group
.push(original_file.clone().with_range(range_start, range_end));
range_start = range_end;
range_end += range_size;
}
}
Some(file_groups)
}
}
#[derive(Debug, Clone)]
pub struct FileGroup {
files: Vec<PartitionedFile>,
statistics: Option<Arc<Statistics>>,
}
impl FileGroup {
pub fn new(files: Vec<PartitionedFile>) -> Self {
Self {
files,
statistics: None,
}
}
pub fn len(&self) -> usize {
self.files.len()
}
pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
self.statistics = Some(statistics);
self
}
pub fn files(&self) -> &[PartitionedFile] {
&self.files
}
pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
self.files.iter()
}
pub fn into_inner(self) -> Vec<PartitionedFile> {
self.files
}
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
pub fn pop(&mut self) -> Option<PartitionedFile> {
self.files.pop()
}
pub fn push(&mut self, file: PartitionedFile) {
self.files.push(file);
}
pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
if let Some(index) = index {
self.files.get(index).and_then(|f| f.statistics.as_deref())
} else {
self.statistics.as_deref()
}
}
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
self.statistics.as_mut().map(Arc::make_mut)
}
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
if self.is_empty() {
return vec![];
}
self.files.sort_by(|a, b| a.path().cmp(b.path()));
let chunk_size = self.len().div_ceil(n);
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in self.files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
let full_chunk = FileGroup::new(mem::replace(
&mut current_chunk,
Vec::with_capacity(chunk_size),
));
chunks.push(full_chunk);
}
}
if !current_chunk.is_empty() {
chunks.push(FileGroup::new(current_chunk))
}
chunks
}
}
impl Index<usize> for FileGroup {
type Output = PartitionedFile;
fn index(&self, index: usize) -> &Self::Output {
&self.files[index]
}
}
impl IndexMut<usize> for FileGroup {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.files[index]
}
}
impl FromIterator<PartitionedFile> for FileGroup {
fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
let files = iter.into_iter().collect();
FileGroup::new(files)
}
}
impl From<Vec<PartitionedFile>> for FileGroup {
fn from(files: Vec<PartitionedFile>) -> Self {
FileGroup::new(files)
}
}
impl Default for FileGroup {
fn default() -> Self {
Self::new(Vec::new())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ToRepartition {
source_index: usize,
file_size: u64,
new_groups: Vec<usize>,
}
impl ToRepartition {
fn range_size(&self) -> u64 {
self.file_size / (self.new_groups.len() as u64)
}
}
impl PartialOrd for ToRepartition {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ToRepartition {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.range_size().cmp(&other.range_size())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn repartition_empty_file_only() {
let partitioned_file_empty = pfile("empty", 0);
let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
let partitioned_files = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(0)
.repartition_file_groups(&file_group);
assert_partitioned_files(None, partitioned_files);
}
#[test]
fn repartition_empty_files() {
let pfile_a = pfile("a", 10);
let pfile_b = pfile("b", 10);
let pfile_empty = pfile("empty", 0);
let empty_first = vec![
FileGroup::new(vec![pfile_empty.clone()]),
FileGroup::new(vec![pfile_a.clone()]),
FileGroup::new(vec![pfile_b.clone()]),
];
let empty_middle = vec![
FileGroup::new(vec![pfile_a.clone()]),
FileGroup::new(vec![pfile_empty.clone()]),
FileGroup::new(vec![pfile_b.clone()]),
];
let empty_last = vec![
FileGroup::new(vec![pfile_a]),
FileGroup::new(vec![pfile_b]),
FileGroup::new(vec![pfile_empty]),
];
let expected_2 = vec![
FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
];
let expected_3 = vec![
FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
FileGroup::new(vec![
pfile("a", 10).with_range(7, 10),
pfile("b", 10).with_range(0, 4),
]),
FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
];
let file_groups_tests = [empty_first, empty_middle, empty_last];
for fg in file_groups_tests {
let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
for (n_partition, expected) in all_expected {
let actual = FileGroupPartitioner::new()
.with_target_partitions(n_partition)
.with_repartition_file_min_size(10)
.repartition_file_groups(&fg);
assert_partitioned_files(Some(expected), actual);
}
}
}
#[test]
fn repartition_single_file() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_too_much_partitions() {
let partitioned_file = pfile("a", 8);
let single_partition = vec![FileGroup::new(vec![partitioned_file])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(96)
.with_repartition_file_min_size(5)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_multiple_partitions() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 40)]),
FileGroup::new(vec![pfile("b", 60)]),
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
FileGroup::new(vec![
pfile("a", 40).with_range(34, 40),
pfile("b", 60).with_range(0, 28),
]),
FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_same_num_partitions() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 40)]),
FileGroup::new(vec![pfile("b", 60)]),
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![
pfile("a", 40).with_range(0, 40),
pfile("b", 60).with_range(0, 10),
]),
FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_no_action_ranges() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 123)]),
FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_no_action_min_size() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_no_action_zero_files() {
let empty_partition = vec![];
let partitioner = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500);
assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
}
#[test]
fn repartition_ordered_no_action_too_few_partitions() {
let input_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 200)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&input_partitions);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_no_action_file_too_small() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(1000)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_one_large_file() {
let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 30)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 100)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_one_small_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 30)]),
];
let partitioner = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_repartition_file_min_size(10);
let actual = partitioner
.with_target_partitions(4)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
let actual = partitioner
.with_target_partitions(5)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_existing_empty() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::default(),
FileGroup::new(vec![pfile("b", 40)]),
FileGroup::default(),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(5)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_existing_group_multiple_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 40)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
fn assert_partitioned_files(
expected: Option<Vec<FileGroup>>,
actual: Option<Vec<FileGroup>>,
) {
match (expected, actual) {
(None, None) => {}
(Some(_), None) => panic!("Expected Some, got None"),
(None, Some(_)) => panic!("Expected None, got Some"),
(Some(expected), Some(actual)) => {
let expected_string = format!("{expected:#?}");
let actual_string = format!("{actual:#?}");
assert_eq!(expected_string, actual_string);
}
}
}
fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
PartitionedFile::new(path, file_size)
}
fn repartition_test(
partitioner: FileGroupPartitioner,
file_groups: Vec<FileGroup>,
) -> Option<Vec<FileGroup>> {
let repartitioned = partitioner.repartition_file_groups(&file_groups);
let repartitioned_preserving_sort = partitioner
.with_preserve_order_within_groups(true)
.repartition_file_groups(&file_groups);
assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
repartitioned
}
}