liquid_cache/liquid_array/byte_view_array/
serialization.rs1use arrow::array::types::UInt16Type;
2use bytes::Bytes;
3use fsst::Compressor;
4use std::sync::Arc;
5
6use super::LiquidByteViewArray;
7use crate::liquid_array::byte_array::ArrowByteType;
8use crate::liquid_array::ipc::LiquidIPCHeader;
9use crate::liquid_array::raw::BitPackedArray;
10use crate::liquid_array::raw::fsst_buffer::{
11 FsstArray, PrefixKey, RawFsstBuffer, decode_compact_offsets, empty_compact_offsets,
12};
13use crate::liquid_array::{LiquidDataType, SqueezeResult};
14
15#[repr(C)]
17pub(super) struct ByteViewArrayHeader {
18 pub(super) keys_size: u32,
19 pub(super) compact_offsets_size: u32,
20 pub(super) shared_prefix_size: u32,
21 pub(super) fsst_raw_size: u32,
22 pub(super) fingerprint_size: u32,
23}
24
25impl ByteViewArrayHeader {
26 pub(super) const fn size() -> usize {
27 const _: () =
28 assert!(std::mem::size_of::<ByteViewArrayHeader>() == ByteViewArrayHeader::size());
29 20
30 }
31
32 pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
33 let mut bytes = [0u8; Self::size()];
34 bytes[0..4].copy_from_slice(&self.keys_size.to_le_bytes());
35 bytes[4..8].copy_from_slice(&self.compact_offsets_size.to_le_bytes());
36 bytes[8..12].copy_from_slice(&self.shared_prefix_size.to_le_bytes());
37 bytes[12..16].copy_from_slice(&self.fsst_raw_size.to_le_bytes());
38 bytes[16..20].copy_from_slice(&self.fingerprint_size.to_le_bytes());
39 bytes
40 }
41
42 pub(super) fn from_bytes(bytes: &[u8]) -> Self {
43 if bytes.len() < Self::size() {
44 panic!(
45 "value too small for ByteViewArrayHeader, expected at least {} bytes, got {}",
46 Self::size(),
47 bytes.len()
48 );
49 }
50 let keys_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
51 let compact_offsets_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
52 let shared_prefix_size = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
53 let fsst_raw_size = u32::from_le_bytes(bytes[12..16].try_into().unwrap());
54 let fingerprint_size = u32::from_le_bytes(bytes[16..20].try_into().unwrap());
55 Self {
56 keys_size,
57 compact_offsets_size,
58 shared_prefix_size,
59 fsst_raw_size,
60 fingerprint_size,
61 }
62 }
63}
64
65pub(super) fn align_up_8(len: usize) -> usize {
66 (len + 7) & !7
67}
68
69fn decode_prefix_keys(bytes: &[u8]) -> Arc<[PrefixKey]> {
70 let entry_size = std::mem::size_of::<PrefixKey>();
71 if !bytes.len().is_multiple_of(entry_size) {
72 panic!("Invalid prefix keys size");
73 }
74 if bytes.is_empty() {
75 return Arc::<[PrefixKey]>::from([]);
76 }
77 let mut keys = Vec::with_capacity(bytes.len() / entry_size);
78 for chunk in bytes.chunks_exact(entry_size) {
79 let mut prefix7 = [0u8; 7];
80 prefix7.copy_from_slice(&chunk[..7]);
81 let len = chunk[7];
82 keys.push(PrefixKey::from_parts(prefix7, len));
83 }
84 keys.into()
85}
86
87impl LiquidByteViewArray<FsstArray> {
88 pub(crate) fn to_bytes_inner(&self) -> SqueezeResult<Vec<u8>> {
124 let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
125 let mut result = Vec::with_capacity(header_size + 1024);
126 result.resize(header_size, 0);
127
128 while !result.len().is_multiple_of(8) {
130 result.push(0);
131 }
132 let fsst_start = result.len();
133 let fsst_raw_bytes = self.fsst_buffer.raw_to_bytes();
134 result.extend_from_slice(&fsst_raw_bytes);
135 let fsst_raw_size = result.len() - fsst_start;
136
137 while !result.len().is_multiple_of(8) {
139 result.push(0);
140 }
141
142 let keys_start = result.len();
144 {
145 use std::num::NonZero;
146 let bit_packed = BitPackedArray::<UInt16Type>::from_primitive(
147 self.dictionary_keys.clone(),
148 NonZero::new(16).unwrap(),
149 );
150 bit_packed.to_bytes(&mut result);
151 }
152 let keys_size = result.len() - keys_start;
153
154 while !result.len().is_multiple_of(8) {
156 result.push(0);
157 }
158
159 let offsets_start = result.len();
161 self.fsst_buffer.write_compact_offsets(&mut result);
162 let compact_offsets_size = result.len() - offsets_start;
163
164 while !result.len().is_multiple_of(8) {
166 result.push(0);
167 }
168
169 for prefix in self.prefix_keys.iter() {
171 result.extend_from_slice(prefix.prefix7());
172 result.push(prefix.len_byte());
173 }
174
175 while !result.len().is_multiple_of(8) {
177 result.push(0);
178 }
179
180 let prefix_start = result.len();
182 result.extend_from_slice(&self.shared_prefix);
183 let shared_prefix_size = result.len() - prefix_start;
184
185 while !result.len().is_multiple_of(8) {
187 result.push(0);
188 }
189
190 if let Some(fingerprints) = self.string_fingerprints.as_ref() {
192 for &fingerprint in fingerprints.iter() {
193 result.extend_from_slice(&fingerprint.to_le_bytes());
194 }
195 }
196
197 let ipc = LiquidIPCHeader::new(
199 LiquidDataType::ByteViewArray as u16,
200 self.original_arrow_type as u16,
201 );
202 let view_header = ByteViewArrayHeader {
203 keys_size: keys_size as u32,
204 compact_offsets_size: compact_offsets_size as u32,
205 shared_prefix_size: shared_prefix_size as u32,
206 fsst_raw_size: fsst_raw_size as u32,
207 fingerprint_size: (self
208 .string_fingerprints
209 .as_ref()
210 .map(|fingerprints| fingerprints.len())
211 .unwrap_or(0)
212 * std::mem::size_of::<u32>()) as u32,
213 };
214
215 let header_slice = &mut result[0..header_size];
217 header_slice[0..LiquidIPCHeader::size()].copy_from_slice(&ipc.to_bytes());
218 header_slice[LiquidIPCHeader::size()..header_size].copy_from_slice(&view_header.to_bytes());
219
220 Ok(result)
221 }
222
223 pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> LiquidByteViewArray<FsstArray> {
225 let ipc = LiquidIPCHeader::from_bytes(&bytes);
227 let original_arrow_type = ArrowByteType::from(ipc.physical_type_id);
228 let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
229 let view_header =
230 ByteViewArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
231
232 let mut cursor = header_size;
233
234 cursor = align_up_8(cursor);
236 let fsst_end = cursor + view_header.fsst_raw_size as usize;
237 if fsst_end > bytes.len() {
238 panic!("FSST raw buffer extends beyond input buffer");
239 }
240 let fsst_raw = bytes.slice(cursor..fsst_end);
241 let raw_buffer = RawFsstBuffer::from_bytes(fsst_raw);
242 cursor = fsst_end;
243
244 cursor = align_up_8(cursor);
246 let keys_end = cursor + view_header.keys_size as usize;
247 if keys_end > bytes.len() {
248 panic!("Keys data extends beyond input buffer");
249 }
250 let keys_data = bytes.slice(cursor..keys_end);
251 let bit_packed = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
252 let dictionary_keys = bit_packed.to_primitive();
253 cursor = keys_end;
254
255 cursor = align_up_8(cursor);
257 let offsets_end = cursor + view_header.compact_offsets_size as usize;
258 if offsets_end > bytes.len() {
259 panic!("Compact offsets data extends beyond input buffer");
260 }
261
262 let compact_offsets = if view_header.compact_offsets_size > 0 {
264 let chunk = bytes.slice(cursor..offsets_end);
265 decode_compact_offsets(chunk.as_ref())
266 } else {
267 empty_compact_offsets()
268 };
269 cursor = offsets_end;
270
271 cursor = align_up_8(cursor);
273 let prefix_count = compact_offsets.len().saturating_sub(1);
274 let prefix_keys_size = prefix_count * std::mem::size_of::<PrefixKey>();
275 let prefix_keys_end = cursor + prefix_keys_size;
276 if prefix_keys_end > bytes.len() {
277 panic!("Prefix keys data extends beyond input buffer");
278 }
279 let prefix_keys = if prefix_keys_size > 0 {
280 decode_prefix_keys(&bytes[cursor..prefix_keys_end])
281 } else {
282 Arc::<[PrefixKey]>::from([])
283 };
284 cursor = prefix_keys_end;
285
286 cursor = align_up_8(cursor);
288 let prefix_end = cursor + view_header.shared_prefix_size as usize;
289 if prefix_end > bytes.len() {
290 panic!("Shared prefix data extends beyond input buffer");
291 }
292 let shared_prefix = bytes[cursor..prefix_end].to_vec();
293 cursor = prefix_end;
294
295 cursor = align_up_8(cursor);
297 let fingerprint_end = cursor + view_header.fingerprint_size as usize;
298 if fingerprint_end > bytes.len() {
299 panic!("Fingerprint data extends beyond input buffer");
300 }
301 let string_fingerprints = if view_header.fingerprint_size == 0 {
302 None
303 } else {
304 if !(view_header.fingerprint_size as usize).is_multiple_of(std::mem::size_of::<u32>()) {
305 panic!("Invalid fingerprint data size");
306 }
307 let expected = prefix_count * std::mem::size_of::<u32>();
308 if view_header.fingerprint_size as usize != expected {
309 panic!("Fingerprint data size does not match dictionary size");
310 }
311 let mut fingerprints = Vec::with_capacity(view_header.fingerprint_size as usize / 4);
312 for chunk in bytes[cursor..fingerprint_end].chunks_exact(4) {
313 fingerprints.push(u32::from_le_bytes(chunk.try_into().unwrap()));
314 }
315 Some(Arc::from(fingerprints.into_boxed_slice()))
316 };
317
318 LiquidByteViewArray {
319 dictionary_keys,
320 prefix_keys,
321 fsst_buffer: FsstArray::new(Arc::new(raw_buffer), compact_offsets, compressor),
322 original_arrow_type,
323 shared_prefix,
324 string_fingerprints,
325 }
326 }
327}