use arrow::array::types::UInt16Type;
use bytes::Bytes;
use fsst::Compressor;
use std::sync::Arc;
use super::LiquidByteViewArray;
use crate::liquid_array::byte_array::ArrowByteType;
use crate::liquid_array::ipc::LiquidIPCHeader;
use crate::liquid_array::raw::BitPackedArray;
use crate::liquid_array::raw::fsst_buffer::{
FsstArray, PrefixKey, RawFsstBuffer, decode_compact_offsets, empty_compact_offsets,
};
use crate::liquid_array::{LiquidDataType, SqueezeResult};
#[repr(C)]
pub(super) struct ByteViewArrayHeader {
pub(super) keys_size: u32,
pub(super) compact_offsets_size: u32,
pub(super) shared_prefix_size: u32,
pub(super) fsst_raw_size: u32,
pub(super) fingerprint_size: u32,
}
impl ByteViewArrayHeader {
pub(super) const fn size() -> usize {
const _: () =
assert!(std::mem::size_of::<ByteViewArrayHeader>() == ByteViewArrayHeader::size());
20
}
pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
let mut bytes = [0u8; Self::size()];
bytes[0..4].copy_from_slice(&self.keys_size.to_le_bytes());
bytes[4..8].copy_from_slice(&self.compact_offsets_size.to_le_bytes());
bytes[8..12].copy_from_slice(&self.shared_prefix_size.to_le_bytes());
bytes[12..16].copy_from_slice(&self.fsst_raw_size.to_le_bytes());
bytes[16..20].copy_from_slice(&self.fingerprint_size.to_le_bytes());
bytes
}
pub(super) fn from_bytes(bytes: &[u8]) -> Self {
if bytes.len() < Self::size() {
panic!(
"value too small for ByteViewArrayHeader, expected at least {} bytes, got {}",
Self::size(),
bytes.len()
);
}
let keys_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
let compact_offsets_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
let shared_prefix_size = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
let fsst_raw_size = u32::from_le_bytes(bytes[12..16].try_into().unwrap());
let fingerprint_size = u32::from_le_bytes(bytes[16..20].try_into().unwrap());
Self {
keys_size,
compact_offsets_size,
shared_prefix_size,
fsst_raw_size,
fingerprint_size,
}
}
}
pub(super) fn align_up_8(len: usize) -> usize {
(len + 7) & !7
}
fn decode_prefix_keys(bytes: &[u8]) -> Arc<[PrefixKey]> {
let entry_size = std::mem::size_of::<PrefixKey>();
if !bytes.len().is_multiple_of(entry_size) {
panic!("Invalid prefix keys size");
}
if bytes.is_empty() {
return Arc::<[PrefixKey]>::from([]);
}
let mut keys = Vec::with_capacity(bytes.len() / entry_size);
for chunk in bytes.chunks_exact(entry_size) {
let mut prefix7 = [0u8; 7];
prefix7.copy_from_slice(&chunk[..7]);
let len = chunk[7];
keys.push(PrefixKey::from_parts(prefix7, len));
}
keys.into()
}
impl LiquidByteViewArray<FsstArray> {
pub(crate) fn to_bytes_inner(&self) -> SqueezeResult<Vec<u8>> {
let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
let mut result = Vec::with_capacity(header_size + 1024);
result.resize(header_size, 0);
while !result.len().is_multiple_of(8) {
result.push(0);
}
let fsst_start = result.len();
let fsst_raw_bytes = self.fsst_buffer.raw_to_bytes();
result.extend_from_slice(&fsst_raw_bytes);
let fsst_raw_size = result.len() - fsst_start;
while !result.len().is_multiple_of(8) {
result.push(0);
}
let keys_start = result.len();
{
use std::num::NonZero;
let bit_packed = BitPackedArray::<UInt16Type>::from_primitive(
self.dictionary_keys.clone(),
NonZero::new(16).unwrap(),
);
bit_packed.to_bytes(&mut result);
}
let keys_size = result.len() - keys_start;
while !result.len().is_multiple_of(8) {
result.push(0);
}
let offsets_start = result.len();
self.fsst_buffer.write_compact_offsets(&mut result);
let compact_offsets_size = result.len() - offsets_start;
while !result.len().is_multiple_of(8) {
result.push(0);
}
for prefix in self.prefix_keys.iter() {
result.extend_from_slice(prefix.prefix7());
result.push(prefix.len_byte());
}
while !result.len().is_multiple_of(8) {
result.push(0);
}
let prefix_start = result.len();
result.extend_from_slice(&self.shared_prefix);
let shared_prefix_size = result.len() - prefix_start;
while !result.len().is_multiple_of(8) {
result.push(0);
}
if let Some(fingerprints) = self.string_fingerprints.as_ref() {
for &fingerprint in fingerprints.iter() {
result.extend_from_slice(&fingerprint.to_le_bytes());
}
}
let ipc = LiquidIPCHeader::new(
LiquidDataType::ByteViewArray as u16,
self.original_arrow_type as u16,
);
let view_header = ByteViewArrayHeader {
keys_size: keys_size as u32,
compact_offsets_size: compact_offsets_size as u32,
shared_prefix_size: shared_prefix_size as u32,
fsst_raw_size: fsst_raw_size as u32,
fingerprint_size: (self
.string_fingerprints
.as_ref()
.map(|fingerprints| fingerprints.len())
.unwrap_or(0)
* std::mem::size_of::<u32>()) as u32,
};
let header_slice = &mut result[0..header_size];
header_slice[0..LiquidIPCHeader::size()].copy_from_slice(&ipc.to_bytes());
header_slice[LiquidIPCHeader::size()..header_size].copy_from_slice(&view_header.to_bytes());
Ok(result)
}
pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> LiquidByteViewArray<FsstArray> {
let ipc = LiquidIPCHeader::from_bytes(&bytes);
let original_arrow_type = ArrowByteType::from(ipc.physical_type_id);
let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
let view_header =
ByteViewArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
let mut cursor = header_size;
cursor = align_up_8(cursor);
let fsst_end = cursor + view_header.fsst_raw_size as usize;
if fsst_end > bytes.len() {
panic!("FSST raw buffer extends beyond input buffer");
}
let fsst_raw = bytes.slice(cursor..fsst_end);
let raw_buffer = RawFsstBuffer::from_bytes(fsst_raw);
cursor = fsst_end;
cursor = align_up_8(cursor);
let keys_end = cursor + view_header.keys_size as usize;
if keys_end > bytes.len() {
panic!("Keys data extends beyond input buffer");
}
let keys_data = bytes.slice(cursor..keys_end);
let bit_packed = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
let dictionary_keys = bit_packed.to_primitive();
cursor = keys_end;
cursor = align_up_8(cursor);
let offsets_end = cursor + view_header.compact_offsets_size as usize;
if offsets_end > bytes.len() {
panic!("Compact offsets data extends beyond input buffer");
}
let compact_offsets = if view_header.compact_offsets_size > 0 {
let chunk = bytes.slice(cursor..offsets_end);
decode_compact_offsets(chunk.as_ref())
} else {
empty_compact_offsets()
};
cursor = offsets_end;
cursor = align_up_8(cursor);
let prefix_count = compact_offsets.len().saturating_sub(1);
let prefix_keys_size = prefix_count * std::mem::size_of::<PrefixKey>();
let prefix_keys_end = cursor + prefix_keys_size;
if prefix_keys_end > bytes.len() {
panic!("Prefix keys data extends beyond input buffer");
}
let prefix_keys = if prefix_keys_size > 0 {
decode_prefix_keys(&bytes[cursor..prefix_keys_end])
} else {
Arc::<[PrefixKey]>::from([])
};
cursor = prefix_keys_end;
cursor = align_up_8(cursor);
let prefix_end = cursor + view_header.shared_prefix_size as usize;
if prefix_end > bytes.len() {
panic!("Shared prefix data extends beyond input buffer");
}
let shared_prefix = bytes[cursor..prefix_end].to_vec();
cursor = prefix_end;
cursor = align_up_8(cursor);
let fingerprint_end = cursor + view_header.fingerprint_size as usize;
if fingerprint_end > bytes.len() {
panic!("Fingerprint data extends beyond input buffer");
}
let string_fingerprints = if view_header.fingerprint_size == 0 {
None
} else {
if !(view_header.fingerprint_size as usize).is_multiple_of(std::mem::size_of::<u32>()) {
panic!("Invalid fingerprint data size");
}
let expected = prefix_count * std::mem::size_of::<u32>();
if view_header.fingerprint_size as usize != expected {
panic!("Fingerprint data size does not match dictionary size");
}
let mut fingerprints = Vec::with_capacity(view_header.fingerprint_size as usize / 4);
for chunk in bytes[cursor..fingerprint_end].chunks_exact(4) {
fingerprints.push(u32::from_le_bytes(chunk.try_into().unwrap()));
}
Some(Arc::from(fingerprints.into_boxed_slice()))
};
LiquidByteViewArray {
dictionary_keys,
prefix_keys,
fsst_buffer: FsstArray::new(Arc::new(raw_buffer), compact_offsets, compressor),
original_arrow_type,
shared_prefix,
string_fingerprints,
}
}
}