use crate::{FileRange, PartitionedFile};
use arrow::compute::SortOptions;
use datafusion_common::Statistics;
use datafusion_common::utils::compare_rows;
use itertools::Itertools;
use std::cmp::{Ordering, min};
use std::collections::{BinaryHeap, HashMap};
use std::iter::repeat_with;
use std::mem;
use std::ops::{Deref, DerefMut, Index, IndexMut};
use std::sync::Arc;
#[derive(Debug, Clone, Copy)]
pub struct FileGroupPartitioner {
target_partitions: usize,
repartition_file_min_size: usize,
preserve_order_within_groups: bool,
}
impl Default for FileGroupPartitioner {
fn default() -> Self {
Self::new()
}
}
impl FileGroupPartitioner {
pub fn new() -> Self {
Self {
target_partitions: 1,
repartition_file_min_size: 10 * 1024 * 1024,
preserve_order_within_groups: false,
}
}
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = target_partitions;
self
}
pub fn with_repartition_file_min_size(
mut self,
repartition_file_min_size: usize,
) -> Self {
self.repartition_file_min_size = repartition_file_min_size;
self
}
pub fn with_preserve_order_within_groups(
mut self,
preserve_order_within_groups: bool,
) -> Self {
self.preserve_order_within_groups = preserve_order_within_groups;
self
}
pub fn repartition_file_groups(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
if file_groups.is_empty() {
return None;
}
if self.preserve_order_within_groups {
self.repartition_preserving_order(file_groups)
} else {
self.repartition_evenly_by_size(file_groups)
}
}
fn repartition_evenly_by_size(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
let target_partitions = self.target_partitions;
let repartition_file_min_size = self.repartition_file_min_size;
let flattened_files = file_groups.iter().flat_map(FileGroup::iter).collect_vec();
let total_size = flattened_files
.iter()
.map(|f| f.effective_size())
.sum::<u64>();
if total_size < (repartition_file_min_size as u64) || total_size == 0 {
return None;
}
let target_partition_size = total_size.div_ceil(target_partitions as u64);
let current_partition_index: usize = 0;
let current_partition_size: u64 = 0;
let repartitioned_files = flattened_files
.into_iter()
.scan(
(current_partition_index, current_partition_size),
|(current_partition_index, current_partition_size), source_file| {
let mut produced_files = vec![];
let (mut range_start, file_end) = source_file.range();
while range_start < file_end {
let range_end = min(
range_start
+ (target_partition_size - *current_partition_size),
file_end,
);
let mut produced_file = source_file.clone();
produced_file.range = Some(FileRange {
start: range_start as i64,
end: range_end as i64,
});
produced_files.push((*current_partition_index, produced_file));
if *current_partition_size + (range_end - range_start)
>= target_partition_size
{
*current_partition_index += 1;
*current_partition_size = 0;
} else {
*current_partition_size += range_end - range_start;
}
range_start = range_end;
}
Some(produced_files)
},
)
.flatten()
.chunk_by(|(partition_idx, _)| *partition_idx)
.into_iter()
.map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
.collect_vec();
Some(repartitioned_files)
}
fn repartition_preserving_order(
&self,
file_groups: &[FileGroup],
) -> Option<Vec<FileGroup>> {
if file_groups.len() >= self.target_partitions {
return None;
}
let num_new_groups = self.target_partitions - file_groups.len();
if file_groups.len() == 1 && file_groups[0].len() == 1 {
return self.repartition_evenly_by_size(file_groups);
}
let mut heap: BinaryHeap<_> = file_groups
.iter()
.enumerate()
.filter_map(|(group_index, group)| {
if group.len() == 1 {
Some(ToRepartition {
source_index: group_index,
file_size: group[0].effective_size(),
new_groups: vec![group_index],
})
} else {
None
}
})
.map(CompareByRangeSize)
.collect();
if heap.is_empty() {
return None;
}
let mut file_groups: Vec<_> = file_groups
.iter()
.cloned()
.chain(repeat_with(|| FileGroup::new(Vec::new())).take(num_new_groups))
.collect();
for (group_index, group) in file_groups.iter().enumerate() {
if !group.is_empty() {
continue;
}
let mut largest_group = heap.pop().unwrap();
largest_group.new_groups.push(group_index);
heap.push(largest_group);
}
while let Some(to_repartition) = heap.pop() {
let range_size = to_repartition.range_size();
let ToRepartition {
source_index,
file_size: _,
new_groups,
} = to_repartition.into_inner();
assert_eq!(file_groups[source_index].len(), 1);
let original_file = file_groups[source_index].pop().unwrap();
let last_group = new_groups.len() - 1;
let (mut range_start, file_end) = original_file.range();
let mut range_end = range_start + range_size;
for (i, group_index) in new_groups.into_iter().enumerate() {
let target_group = &mut file_groups[group_index];
assert!(target_group.is_empty());
if i == last_group {
range_end = file_end;
}
target_group.push(
original_file
.clone()
.with_range(range_start as i64, range_end as i64),
);
range_start = range_end;
range_end += range_size;
}
}
Some(file_groups)
}
}
#[derive(Debug, Clone)]
pub struct FileGroup {
files: Vec<PartitionedFile>,
statistics: Option<Arc<Statistics>>,
}
impl FileGroup {
pub fn new(files: Vec<PartitionedFile>) -> Self {
Self {
files,
statistics: None,
}
}
pub fn len(&self) -> usize {
self.files.len()
}
pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
self.statistics = Some(statistics);
self
}
pub fn files(&self) -> &[PartitionedFile] {
&self.files
}
pub fn iter(&self) -> impl Iterator<Item = &PartitionedFile> {
self.files.iter()
}
pub fn into_inner(self) -> Vec<PartitionedFile> {
self.files
}
pub fn is_empty(&self) -> bool {
self.files.is_empty()
}
pub fn pop(&mut self) -> Option<PartitionedFile> {
self.files.pop()
}
pub fn push(&mut self, partitioned_file: PartitionedFile) {
self.files.push(partitioned_file);
}
pub fn file_statistics(&self, index: Option<usize>) -> Option<&Statistics> {
if let Some(index) = index {
self.files.get(index).and_then(|f| f.statistics.as_deref())
} else {
self.statistics.as_deref()
}
}
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
self.statistics.as_mut().map(Arc::make_mut)
}
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
if self.is_empty() {
return vec![];
}
self.files.sort_by(|a, b| a.path().cmp(b.path()));
let chunk_size = self.len().div_ceil(n);
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in self.files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
let full_chunk = FileGroup::new(mem::replace(
&mut current_chunk,
Vec::with_capacity(chunk_size),
));
chunks.push(full_chunk);
}
}
if !current_chunk.is_empty() {
chunks.push(FileGroup::new(current_chunk))
}
chunks
}
pub fn group_by_partition_values(
self,
max_target_partitions: usize,
) -> Vec<FileGroup> {
if self.is_empty() || max_target_partitions == 0 {
return vec![];
}
let mut partition_groups: HashMap<
Vec<datafusion_common::ScalarValue>,
Vec<PartitionedFile>,
> = HashMap::new();
for file in self.files {
partition_groups
.entry(file.partition_values.clone())
.or_default()
.push(file);
}
let num_unique_partitions = partition_groups.len();
let mut sorted_partitions: Vec<_> = partition_groups.into_iter().collect();
let sort_options =
vec![
SortOptions::default();
sorted_partitions.first().map(|(k, _)| k.len()).unwrap_or(0)
];
sorted_partitions.sort_by(|a, b| {
compare_rows(&a.0, &b.0, &sort_options).unwrap_or(Ordering::Equal)
});
if num_unique_partitions <= max_target_partitions {
sorted_partitions
.into_iter()
.map(|(_, files)| FileGroup::new(files))
.collect()
} else {
let mut target_groups = vec![vec![]; max_target_partitions];
for (idx, (_, files)) in sorted_partitions.into_iter().enumerate() {
let bucket = idx % max_target_partitions;
target_groups[bucket].extend(files);
}
target_groups.into_iter().map(FileGroup::new).collect()
}
}
}
impl Index<usize> for FileGroup {
type Output = PartitionedFile;
fn index(&self, index: usize) -> &Self::Output {
&self.files[index]
}
}
impl IndexMut<usize> for FileGroup {
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.files[index]
}
}
impl FromIterator<PartitionedFile> for FileGroup {
fn from_iter<I: IntoIterator<Item = PartitionedFile>>(iter: I) -> Self {
let files = iter.into_iter().collect();
FileGroup::new(files)
}
}
impl From<Vec<PartitionedFile>> for FileGroup {
fn from(files: Vec<PartitionedFile>) -> Self {
FileGroup::new(files)
}
}
impl Default for FileGroup {
fn default() -> Self {
Self::new(Vec::new())
}
}
#[derive(Debug, Clone)]
struct ToRepartition {
source_index: usize,
file_size: u64,
new_groups: Vec<usize>,
}
impl ToRepartition {
fn range_size(&self) -> u64 {
self.file_size / (self.new_groups.len() as u64)
}
}
struct CompareByRangeSize(ToRepartition);
impl CompareByRangeSize {
fn into_inner(self) -> ToRepartition {
self.0
}
}
impl Ord for CompareByRangeSize {
fn cmp(&self, other: &Self) -> Ordering {
self.0.range_size().cmp(&other.0.range_size())
}
}
impl PartialOrd for CompareByRangeSize {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for CompareByRangeSize {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl Eq for CompareByRangeSize {}
impl Deref for CompareByRangeSize {
type Target = ToRepartition;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for CompareByRangeSize {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(test)]
mod test {
use super::*;
use datafusion_common::ScalarValue;
#[test]
fn repartition_empty_file_only() {
let partitioned_file_empty = pfile("empty", 0);
let file_group = vec![FileGroup::new(vec![partitioned_file_empty])];
let partitioned_files = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(0)
.repartition_file_groups(&file_group);
assert_partitioned_files(None, partitioned_files);
}
#[test]
fn repartition_empty_files() {
let pfile_a = pfile("a", 10);
let pfile_b = pfile("b", 10);
let pfile_empty = pfile("empty", 0);
let empty_first = vec![
FileGroup::new(vec![pfile_empty.clone()]),
FileGroup::new(vec![pfile_a.clone()]),
FileGroup::new(vec![pfile_b.clone()]),
];
let empty_middle = vec![
FileGroup::new(vec![pfile_a.clone()]),
FileGroup::new(vec![pfile_empty.clone()]),
FileGroup::new(vec![pfile_b.clone()]),
];
let empty_last = vec![
FileGroup::new(vec![pfile_a]),
FileGroup::new(vec![pfile_b]),
FileGroup::new(vec![pfile_empty]),
];
let expected_2 = vec![
FileGroup::new(vec![pfile("a", 10).with_range(0, 10)]),
FileGroup::new(vec![pfile("b", 10).with_range(0, 10)]),
];
let expected_3 = vec![
FileGroup::new(vec![pfile("a", 10).with_range(0, 7)]),
FileGroup::new(vec![
pfile("a", 10).with_range(7, 10),
pfile("b", 10).with_range(0, 4),
]),
FileGroup::new(vec![pfile("b", 10).with_range(4, 10)]),
];
let file_groups_tests = [empty_first, empty_middle, empty_last];
for fg in file_groups_tests {
let all_expected = [(2, expected_2.clone()), (3, expected_3.clone())];
for (n_partition, expected) in all_expected {
let actual = FileGroupPartitioner::new()
.with_target_partitions(n_partition)
.with_repartition_file_min_size(10)
.repartition_file_groups(&fg);
assert_partitioned_files(Some(expected), actual);
}
}
}
#[test]
fn repartition_single_file() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_single_file_with_range() {
let single_partition =
vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_single_file_with_incomplete_range() {
let single_partition =
vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_single_file_duplicated_with_range() {
let single_partition = vec![FileGroup::new(vec![
pfile("a", 100).with_range(0, 50),
pfile("a", 100).with_range(50, 100),
])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_too_much_partitions() {
let partitioned_file = pfile("a", 8);
let single_partition = vec![FileGroup::new(vec![partitioned_file])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(96)
.with_repartition_file_min_size(5)
.repartition_file_groups(&single_partition);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 8).with_range(0, 1)]),
FileGroup::new(vec![pfile("a", 8).with_range(1, 2)]),
FileGroup::new(vec![pfile("a", 8).with_range(2, 3)]),
FileGroup::new(vec![pfile("a", 8).with_range(3, 4)]),
FileGroup::new(vec![pfile("a", 8).with_range(4, 5)]),
FileGroup::new(vec![pfile("a", 8).with_range(5, 6)]),
FileGroup::new(vec![pfile("a", 8).with_range(6, 7)]),
FileGroup::new(vec![pfile("a", 8).with_range(7, 8)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_multiple_partitions() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 40)]),
FileGroup::new(vec![pfile("b", 60)]),
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 40).with_range(0, 34)]),
FileGroup::new(vec![
pfile("a", 40).with_range(34, 40),
pfile("b", 60).with_range(0, 28),
]),
FileGroup::new(vec![pfile("b", 60).with_range(28, 60)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_same_num_partitions() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 40)]),
FileGroup::new(vec![pfile("b", 60)]),
];
let actual = FileGroupPartitioner::new()
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![
pfile("a", 40).with_range(0, 40),
pfile("b", 60).with_range(0, 10),
]),
FileGroup::new(vec![pfile("b", 60).with_range(10, 60)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_no_action_min_size() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 123)])];
let actual = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_no_action_zero_files() {
let empty_partition = vec![];
let partitioner = FileGroupPartitioner::new()
.with_target_partitions(65)
.with_repartition_file_min_size(500);
assert_partitioned_files(None, repartition_test(partitioner, empty_partition))
}
#[test]
fn repartition_ordered_no_action_too_few_partitions() {
let input_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 200)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(10)
.repartition_file_groups(&input_partitions);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_no_action_file_too_small() {
let single_partition = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(2)
.with_repartition_file_min_size(1000)
.repartition_file_groups(&single_partition);
assert_partitioned_files(None, actual)
}
#[test]
fn repartition_ordered_one_large_file() {
let source_partitions = vec![FileGroup::new(vec![pfile("a", 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_file_with_range() {
let source_partitions =
vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 30)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file_with_full_range() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
FileGroup::new(vec![pfile("b", 30)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file_with_split_range() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 30)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 100)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(4)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_two_large_one_small_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::new(vec![pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 30)]),
];
let partitioner = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_repartition_file_min_size(10);
let actual = partitioner
.with_target_partitions(4)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 100)]),
FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
let actual = partitioner
.with_target_partitions(5)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("b", 100).with_range(0, 50)]),
FileGroup::new(vec![pfile("c", 30).with_range(0, 30)]),
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
FileGroup::new(vec![pfile("b", 100).with_range(50, 100)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_one_large_one_small_existing_empty() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100)]),
FileGroup::default(),
FileGroup::new(vec![pfile("b", 40)]),
FileGroup::default(),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(5)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
FileGroup::new(vec![pfile("b", 40).with_range(0, 20)]),
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
FileGroup::new(vec![pfile("b", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
#[test]
fn repartition_ordered_existing_group_multiple_files() {
let source_partitions = vec![
FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 40)]),
];
let actual = FileGroupPartitioner::new()
.with_preserve_order_within_groups(true)
.with_target_partitions(3)
.with_repartition_file_min_size(10)
.repartition_file_groups(&source_partitions);
let expected = Some(vec![
FileGroup::new(vec![pfile("a", 100), pfile("b", 100)]),
FileGroup::new(vec![pfile("c", 40).with_range(0, 20)]),
FileGroup::new(vec![pfile("c", 40).with_range(20, 40)]),
]);
assert_partitioned_files(expected, actual);
}
fn assert_partitioned_files(
expected: Option<Vec<FileGroup>>,
actual: Option<Vec<FileGroup>>,
) {
match (expected, actual) {
(None, None) => {}
(Some(_), None) => panic!("Expected Some, got None"),
(None, Some(_)) => panic!("Expected None, got Some"),
(Some(expected), Some(actual)) => {
let expected_string = format!("{expected:#?}");
let actual_string = format!("{actual:#?}");
assert_eq!(expected_string, actual_string);
}
}
}
fn pfile(path: impl Into<String>, file_size: u64) -> PartitionedFile {
PartitionedFile::new(path, file_size)
}
fn pfile_with_pv(path: &str, pv: &str) -> PartitionedFile {
let mut file = pfile(path, 10);
file.partition_values = vec![ScalarValue::from(pv)];
file
}
fn repartition_test(
partitioner: FileGroupPartitioner,
file_groups: Vec<FileGroup>,
) -> Option<Vec<FileGroup>> {
let repartitioned = partitioner.repartition_file_groups(&file_groups);
let repartitioned_preserving_sort = partitioner
.with_preserve_order_within_groups(true)
.repartition_file_groups(&file_groups);
assert_partitioned_files(repartitioned.clone(), repartitioned_preserving_sort);
repartitioned
}
#[test]
fn test_group_by_partition_values_edge_cases() {
assert!(FileGroup::default().group_by_partition_values(4).is_empty());
assert!(
FileGroup::new(vec![pfile("a", 100)])
.group_by_partition_values(0)
.is_empty()
);
}
#[test]
fn test_group_by_partition_values_less_groups_than_target() {
let fg = FileGroup::new(vec![
pfile_with_pv("a", "p1"),
pfile_with_pv("b", "p1"),
pfile_with_pv("c", "p2"),
]);
let groups = fg.group_by_partition_values(4);
assert_eq!(groups.len(), 2);
assert_eq!(groups[0].len(), 2);
assert_eq!(groups[1].len(), 1);
}
#[test]
fn test_group_by_partition_values_more_groups_than_target() {
let fg = FileGroup::new(vec![
pfile_with_pv("a", "p1"),
pfile_with_pv("b", "p2"),
pfile_with_pv("c", "p3"),
pfile_with_pv("d", "p4"),
pfile_with_pv("e", "p5"),
]);
let groups = fg.group_by_partition_values(3);
assert_eq!(groups.len(), 3);
assert_eq!(groups[0].len(), 2);
assert_eq!(groups[1].len(), 2);
assert_eq!(groups[2].len(), 1);
}
}