use crate::null_sentinel;
use arrow_array::builder::BufferBuilder;
use arrow_array::types::ByteArrayType;
use arrow_array::*;
use arrow_buffer::bit_util::ceil;
use arrow_buffer::{ArrowNativeType, MutableBuffer};
use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
use arrow_schema::{DataType, SortOptions};
use builder::make_view;
pub const BLOCK_SIZE: usize = 32;
pub const MINI_BLOCK_COUNT: usize = 4;
pub const MINI_BLOCK_SIZE: usize = BLOCK_SIZE / MINI_BLOCK_COUNT;
pub const BLOCK_CONTINUATION: u8 = 0xFF;
pub const EMPTY_SENTINEL: u8 = 1;
pub const NON_EMPTY_SENTINEL: u8 = 2;
pub const NULL_VALUE_SENTINEL: u8 = 3;
#[inline]
pub fn padded_length(a: Option<usize>) -> usize {
match a {
Some(a) => non_null_padded_length(a),
None => 1,
}
}
#[inline]
pub(crate) fn non_null_padded_length(len: usize) -> usize {
if len <= BLOCK_SIZE {
1 + ceil(len, MINI_BLOCK_SIZE) * (MINI_BLOCK_SIZE + 1)
} else {
MINI_BLOCK_COUNT + ceil(len, BLOCK_SIZE) * (BLOCK_SIZE + 1)
}
}
pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
data: &mut [u8],
offsets: &mut [usize],
i: I,
opts: SortOptions,
) {
for (offset, maybe_val) in offsets.iter_mut().skip(1).zip(i) {
*offset += encode_one(&mut data[*offset..], maybe_val, opts);
}
}
pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
data: &mut [u8],
offsets: &mut [usize],
input_array: &GenericByteArray<T>,
opts: SortOptions,
) {
let input_offsets = input_array.value_offsets();
let bytes = input_array.values().as_slice();
if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
let input_iter =
input_offsets
.windows(2)
.zip(null_buffer.iter())
.map(|(start_end, is_valid)| {
if is_valid {
let item_range = start_end[0].as_usize()..start_end[1].as_usize();
let item = unsafe { bytes.get_unchecked(item_range) };
Some(item)
} else {
None
}
});
encode(data, offsets, input_iter, opts);
} else {
let input_iter = input_offsets.windows(2).map(|start_end| {
let item_range = start_end[0].as_usize()..start_end[1].as_usize();
let item = unsafe { bytes.get_unchecked(item_range) };
Some(item)
});
encode(data, offsets, input_iter, opts);
}
}
pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = null_sentinel(opts);
1
}
pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = match opts.descending {
true => !EMPTY_SENTINEL,
false => EMPTY_SENTINEL,
};
1
}
pub fn encode_null_value(out: &mut [u8], opts: SortOptions) -> usize {
out[0] = match opts.descending {
true => !NON_EMPTY_SENTINEL,
false => NON_EMPTY_SENTINEL,
};
out[1] = match opts.descending {
true => !NULL_VALUE_SENTINEL,
false => NULL_VALUE_SENTINEL,
};
2
}
#[inline]
pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
match val {
None => encode_null(out, opts),
Some([]) => encode_empty(out, opts),
Some(val) => {
out[0] = NON_EMPTY_SENTINEL;
let len = if val.len() <= BLOCK_SIZE {
1 + encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], val)
} else {
let (initial, rem) = val.split_at(BLOCK_SIZE);
let offset = encode_blocks::<MINI_BLOCK_SIZE>(&mut out[1..], initial);
out[offset] = BLOCK_CONTINUATION;
1 + offset + encode_blocks::<BLOCK_SIZE>(&mut out[1 + offset..], rem)
};
if opts.descending {
out[..len].iter_mut().for_each(|v| *v = !*v)
}
len
}
}
}
#[inline]
fn encode_blocks<const SIZE: usize>(out: &mut [u8], val: &[u8]) -> usize {
let block_count = ceil(val.len(), SIZE);
let end_offset = block_count * (SIZE + 1);
let to_write = &mut out[..end_offset];
let chunks = val.chunks_exact(SIZE);
let remainder = chunks.remainder();
for (input, output) in chunks.clone().zip(to_write.chunks_exact_mut(SIZE + 1)) {
let input: &[u8; SIZE] = input.try_into().unwrap();
let out_block: &mut [u8; SIZE] = (&mut output[..SIZE]).try_into().unwrap();
*out_block = *input;
output[SIZE] = BLOCK_CONTINUATION;
}
if !remainder.is_empty() {
let start_offset = (block_count - 1) * (SIZE + 1);
to_write[start_offset..start_offset + remainder.len()].copy_from_slice(remainder);
*to_write.last_mut().unwrap() = remainder.len() as u8;
} else {
*to_write.last_mut().unwrap() = SIZE as u8;
}
end_offset
}
pub fn decode_blocks(row: &[u8], options: SortOptions, mut f: impl FnMut(&[u8])) -> usize {
let (non_empty_sentinel, continuation) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !BLOCK_CONTINUATION),
false => (NON_EMPTY_SENTINEL, BLOCK_CONTINUATION),
};
if row[0] != non_empty_sentinel {
return 1;
}
let block_len = |sentinel: u8| match options.descending {
true => !sentinel as usize,
false => sentinel as usize,
};
let mut idx = 1;
for _ in 0..MINI_BLOCK_COUNT {
let sentinel = row[idx + MINI_BLOCK_SIZE];
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + MINI_BLOCK_SIZE + 1;
}
f(&row[idx..idx + MINI_BLOCK_SIZE]);
idx += MINI_BLOCK_SIZE + 1;
}
loop {
let sentinel = row[idx + BLOCK_SIZE];
if sentinel != continuation {
f(&row[idx..idx + block_len(sentinel)]);
return idx + BLOCK_SIZE + 1;
}
f(&row[idx..idx + BLOCK_SIZE]);
idx += BLOCK_SIZE + 1;
}
}
fn decoded_len(row: &[u8], options: SortOptions) -> usize {
let mut len = 0;
decode_blocks(row, options, |block| len += block.len());
len
}
pub fn decode_binary<I: OffsetSizeTrait>(
rows: &mut [&[u8]],
options: SortOptions,
) -> GenericBinaryArray<I> {
let len = rows.len();
let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(len, |x| {
let valid = rows[x][0] != null_sentinel(options);
null_count += !valid as usize;
valid
});
let values_capacity = rows.iter().map(|row| decoded_len(row, options)).sum();
let mut offsets = BufferBuilder::<I>::new(len + 1);
offsets.append(I::zero());
let mut values = MutableBuffer::new(values_capacity);
for row in rows {
let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
*row = &row[offset..];
offsets.append(I::from_usize(values.len()).expect("offset overflow"))
}
if options.descending {
values.as_slice_mut().iter_mut().for_each(|o| *o = !*o)
}
let d = match I::IS_LARGE {
true => DataType::LargeBinary,
false => DataType::Binary,
};
let builder = ArrayDataBuilder::new(d)
.len(len)
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(offsets.finish())
.add_buffer(values.into());
unsafe { GenericBinaryArray::from(builder.build_unchecked()) }
}
fn decode_binary_view_inner(
rows: &mut [&[u8]],
options: SortOptions,
validate_utf8: bool,
) -> BinaryViewArray {
let len = rows.len();
let inline_str_max_len = MAX_INLINE_VIEW_LEN as usize;
let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(len, |x| {
let valid = rows[x][0] != null_sentinel(options);
null_count += !valid as usize;
valid
});
let values_capacity = if validate_utf8 {
rows.iter().map(|row| decoded_len(row, options)).sum()
} else {
rows.iter().fold(0, |acc, row| {
let len = decoded_len(row, options);
if len > inline_str_max_len {
acc + len
} else {
acc
}
}) + inline_str_max_len
};
let mut values = MutableBuffer::new(values_capacity);
let mut views = BufferBuilder::<u128>::new(len);
for row in rows {
let start_offset = values.len();
let offset = decode_blocks(row, options, |b| values.extend_from_slice(b));
let decoded_len = values.len() - start_offset;
if row[0] == null_sentinel(options) {
debug_assert_eq!(offset, 1);
debug_assert_eq!(start_offset, values.len());
views.append(0);
} else {
let val = unsafe { values.get_unchecked_mut(start_offset..) };
if options.descending {
val.iter_mut().for_each(|o| *o = !*o);
}
let view = make_view(val, 0, start_offset as u32);
views.append(view);
if !validate_utf8 && decoded_len <= inline_str_max_len {
values.truncate(start_offset);
}
}
*row = &row[offset..];
}
if validate_utf8 {
std::str::from_utf8(values.as_slice()).unwrap();
}
let builder = ArrayDataBuilder::new(DataType::BinaryView)
.len(len)
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(views.finish())
.add_buffer(values.into());
unsafe { BinaryViewArray::from(builder.build_unchecked()) }
}
pub fn decode_binary_view(rows: &mut [&[u8]], options: SortOptions) -> BinaryViewArray {
decode_binary_view_inner(rows, options, false)
}
pub unsafe fn decode_string<I: OffsetSizeTrait>(
rows: &mut [&[u8]],
options: SortOptions,
validate_utf8: bool,
) -> GenericStringArray<I> {
let decoded = decode_binary::<I>(rows, options);
if validate_utf8 {
return GenericStringArray::from(decoded);
}
let builder = decoded
.into_data()
.into_builder()
.data_type(GenericStringArray::<I>::DATA_TYPE);
GenericStringArray::from(unsafe { builder.build_unchecked() })
}
pub unsafe fn decode_string_view(
rows: &mut [&[u8]],
options: SortOptions,
validate_utf8: bool,
) -> StringViewArray {
let view = decode_binary_view_inner(rows, options, validate_utf8);
unsafe { view.to_string_view_unchecked() }
}
pub fn decode_null_value(rows: &mut [&[u8]], options: SortOptions) {
for row in rows.iter_mut() {
let (sentinel1, sentinel2) = match options.descending {
true => (!NON_EMPTY_SENTINEL, !NULL_VALUE_SENTINEL),
false => (NON_EMPTY_SENTINEL, NULL_VALUE_SENTINEL),
};
debug_assert_eq!(row[0], sentinel1, "Expected NULL_VALUE_SENTINEL at byte 0");
debug_assert_eq!(row[1], sentinel2, "Expected NULL_VALUE_SENTINEL at byte 1");
*row = &row[2..];
}
}