use crate::spec::DataFileMeta;
use std::cmp;
pub fn pack_for_ordered<T>(
items: Vec<T>,
weight_func: impl Fn(&T) -> i64,
target_weight: i64,
) -> Vec<Vec<T>> {
let mut packed: Vec<Vec<T>> = Vec::new();
let mut bin_items: Vec<T> = Vec::new();
let mut bin_weight: i64 = 0;
for item in items {
let weight = weight_func(&item);
if bin_weight + weight > target_weight && !bin_items.is_empty() {
packed.push(bin_items);
bin_items = Vec::new();
bin_weight = 0;
}
bin_weight += weight;
bin_items.push(item);
}
if !bin_items.is_empty() {
packed.push(bin_items);
}
packed
}
pub fn split_for_batch(
mut files: Vec<DataFileMeta>,
target_split_size: i64,
open_file_cost: i64,
) -> Vec<Vec<DataFileMeta>> {
files.sort_by_key(|f| f.min_sequence_number);
let weight_func = move |f: &DataFileMeta| cmp::max(f.file_size, open_file_cost);
pack_for_ordered(files, weight_func, target_split_size)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spec::stats::BinaryTableStats;
use chrono::{DateTime, Utc};
fn make_file(file_name: &str, file_size: i64, min_seq: i64, max_seq: i64) -> DataFileMeta {
DataFileMeta {
file_name: file_name.to_string(),
file_size,
row_count: 100,
min_key: Vec::new(),
max_key: Vec::new(),
key_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
value_stats: BinaryTableStats::new(Vec::new(), Vec::new(), Vec::new()),
min_sequence_number: min_seq,
max_sequence_number: max_seq,
schema_id: 0,
level: 0,
extra_files: Vec::new(),
creation_time: DateTime::<Utc>::from_timestamp(0, 0),
delete_row_count: None,
embedded_index: None,
first_row_id: None,
write_cols: None,
external_path: None,
file_source: None,
value_stats_cols: None,
}
}
#[test]
fn test_pack_for_ordered_empty() {
let result: Vec<Vec<i32>> = pack_for_ordered(vec![], |_| 1, 10);
assert!(result.is_empty());
}
#[test]
fn test_pack_for_ordered_single_oversized() {
let result = pack_for_ordered(vec![100], |x| *x, 10);
assert_eq!(result, vec![vec![100]]);
}
#[test]
fn test_pack_for_ordered_all_fit_in_one_bin() {
let result = pack_for_ordered(vec![1, 2, 3], |x| *x, 100);
assert_eq!(result, vec![vec![1, 2, 3]]);
}
#[test]
fn test_split_for_batch_low_open_cost() {
let files = vec![
make_file("1", 11, 0, 20),
make_file("2", 13, 21, 30),
make_file("3", 46, 31, 40),
make_file("4", 23, 41, 50),
make_file("5", 4, 51, 60),
make_file("6", 101, 61, 100),
];
let groups = split_for_batch(files, 40, 2);
let names: Vec<Vec<&str>> = groups
.iter()
.map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
.collect();
assert_eq!(
names,
vec![vec!["1", "2"], vec!["3"], vec!["4", "5"], vec!["6"]]
);
}
#[test]
fn test_split_for_batch_high_open_cost() {
let files = vec![
make_file("1", 11, 0, 20),
make_file("2", 13, 21, 30),
make_file("3", 46, 31, 40),
make_file("4", 23, 41, 50),
make_file("5", 4, 51, 60),
make_file("6", 101, 61, 100),
];
let groups = split_for_batch(files, 40, 20);
let names: Vec<Vec<&str>> = groups
.iter()
.map(|g| g.iter().map(|f| f.file_name.as_str()).collect())
.collect();
assert_eq!(
names,
vec![vec!["1", "2"], vec!["3"], vec!["4"], vec!["5"], vec!["6"]]
);
}
#[test]
fn test_split_for_batch_sorts_by_sequence() {
let files = vec![
make_file("c", 10, 30, 40),
make_file("a", 10, 10, 20),
make_file("b", 10, 20, 30),
];
let groups = split_for_batch(files, 1000, 0);
assert_eq!(groups.len(), 1);
let names: Vec<&str> = groups[0].iter().map(|f| f.file_name.as_str()).collect();
assert_eq!(names, vec!["a", "b", "c"]);
}
#[test]
fn test_split_for_batch_empty() {
let groups = split_for_batch(vec![], 128, 4);
assert!(groups.is_empty());
}
}