use crate::compute::sort::SortOptions;
use super::{null_sentinel, Rows};
pub const BLOCK_SIZE: usize = 32;
pub const BLOCK_CONTINUATION: u8 = 0xFF;
pub const EMPTY_SENTINEL: u8 = 1;
pub const NON_EMPTY_SENTINEL: u8 = 2;
#[inline]
fn div_ceil(value: usize, divisor: usize) -> usize {
value / divisor + (0 != value % divisor) as usize
}
pub fn encoded_len(a: Option<&[u8]>) -> usize {
match a {
Some(a) => 1 + div_ceil(a.len(), BLOCK_SIZE) * (BLOCK_SIZE + 1),
None => 1,
}
}
pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(out: &mut Rows, i: I, opts: SortOptions) {
for (offset, maybe_val) in out.offsets.iter_mut().skip(1).zip(i) {
match maybe_val {
Some(val) if val.is_empty() => {
out.buffer[*offset] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
*offset += 1;
}
Some(val) => {
let block_count = div_ceil(val.len(), BLOCK_SIZE);
let end_offset = *offset + 1 + block_count * (BLOCK_SIZE + 1);
let to_write = &mut out.buffer[*offset..end_offset];
to_write[0] = NON_EMPTY_SENTINEL;
let mut chunks = val.chunks_exact(BLOCK_SIZE);
for (input, output) in chunks
.by_ref()
.zip(to_write[1..].chunks_exact_mut(BLOCK_SIZE + 1))
{
let input: &[u8; BLOCK_SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; BLOCK_SIZE] =
(&mut output[..BLOCK_SIZE]).try_into().unwrap();
*out_block = *input;
output[BLOCK_SIZE] = BLOCK_CONTINUATION;
}
let remainder = chunks.remainder();
if !remainder.is_empty() {
let start_offset = 1 + (block_count - 1) * (BLOCK_SIZE + 1);
to_write[start_offset..start_offset + remainder.len()]
.copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
} else {
*to_write.last_mut().unwrap() = BLOCK_SIZE as u8;
}
*offset = end_offset;
if opts.descending {
to_write.iter_mut().for_each(|v| *v = !*v)
}
}
None => {
out.buffer[*offset] = null_sentinel(opts);
*offset += 1;
}
}
}
}