use crate::basic::Type;
use crate::column::writer::LevelDataRef;
use crate::column::writer::encoder::ColumnValueEncoder;
use crate::file::properties::WriterProperties;
use crate::schema::types::ColumnDescriptor;
pub(crate) struct ByteBudgetChunker {
page_byte_limit: usize,
max_def_level: i16,
static_always_fits: bool,
dict_page_byte_limit: usize,
static_dict_always_fits: bool,
}
impl ByteBudgetChunker {
#[inline]
pub(crate) fn new(
descr: &ColumnDescriptor,
props: &WriterProperties,
base_batch_size: usize,
) -> Self {
let page_byte_limit = props.column_data_page_size_limit(descr.path());
let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path());
let static_bytes_per_value = match descr.physical_type() {
Type::BOOLEAN => Some(1),
Type::INT32 | Type::FLOAT => Some(std::mem::size_of::<i32>()),
Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::<i64>()),
Type::INT96 => Some(12),
Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize),
Type::BYTE_ARRAY => None,
};
let static_fits = |limit: usize| {
static_bytes_per_value
.map(|b| b.saturating_mul(base_batch_size) <= limit)
.unwrap_or(false)
};
Self {
page_byte_limit,
max_def_level: descr.max_def_level(),
static_always_fits: static_fits(page_byte_limit),
dict_page_byte_limit,
static_dict_always_fits: static_fits(dict_page_byte_limit),
}
}
#[inline]
pub(crate) fn pick_sub_batch_size<E: ColumnValueEncoder>(
&self,
encoder: &E,
values: &E::Values,
value_indices: Option<&[usize]>,
chunk_def: LevelDataRef<'_>,
values_offset: usize,
chunk_size: usize,
) -> usize {
if chunk_size == 0 {
return chunk_size;
}
let budget = if encoder.has_dictionary() {
if self.static_dict_always_fits {
return chunk_size;
}
match encoder.estimated_dict_page_size() {
Some(used) => self.dict_page_byte_limit.saturating_sub(used),
None => return chunk_size,
}
} else {
if self.static_always_fits {
return chunk_size;
}
self.page_byte_limit
};
self.byte_budget_sub_batch_size::<E>(
values,
value_indices,
chunk_def,
values_offset,
chunk_size,
budget,
)
}
#[inline(never)]
fn byte_budget_sub_batch_size<E: ColumnValueEncoder>(
&self,
values: &E::Values,
value_indices: Option<&[usize]>,
chunk_def: LevelDataRef<'_>,
values_offset: usize,
chunk_size: usize,
budget: usize,
) -> usize {
let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level);
if vals_in_chunk == 0 {
return chunk_size;
}
let fit = match value_indices {
Some(idx) => {
let end = (values_offset + vals_in_chunk).min(idx.len());
let start = values_offset.min(end);
E::count_values_within_byte_budget_gather(values, &idx[start..end], budget)
}
None => {
E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget)
}
};
match fit {
None => chunk_size,
Some(values_per_subbatch) => {
let levels_per_subbatch = if vals_in_chunk == chunk_size {
values_per_subbatch
} else {
(values_per_subbatch * chunk_size)
.div_ceil(vals_in_chunk)
.max(1)
};
chunk_size.min(levels_per_subbatch.max(1))
}
}
}
}