liquid_cache/liquid_array/byte_view_array/
serialization.rs1use arrow::array::types::UInt16Type;
2use bytes::Bytes;
3use fsst::Compressor;
4use std::sync::Arc;
5
6use super::{ArrowByteType, LiquidByteViewArray};
7use crate::liquid_array::ipc::LiquidIPCHeader;
8use crate::liquid_array::raw::BitPackedArray;
9use crate::liquid_array::raw::fsst_buffer::{
10 FsstArray, PrefixKey, RawFsstBuffer, decode_compact_offsets, empty_compact_offsets,
11};
12use crate::liquid_array::{LiquidDataType, SqueezeResult};
13
14#[repr(C)]
16pub(super) struct ByteViewArrayHeader {
17 pub(super) keys_size: u32,
18 pub(super) compact_offsets_size: u32,
19 pub(super) shared_prefix_size: u32,
20 pub(super) fsst_raw_size: u32,
21 pub(super) fingerprint_size: u32,
22}
23
24impl ByteViewArrayHeader {
25 pub(super) const fn size() -> usize {
26 const _: () =
27 assert!(std::mem::size_of::<ByteViewArrayHeader>() == ByteViewArrayHeader::size());
28 20
29 }
30
31 pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
32 let mut bytes = [0u8; Self::size()];
33 bytes[0..4].copy_from_slice(&self.keys_size.to_le_bytes());
34 bytes[4..8].copy_from_slice(&self.compact_offsets_size.to_le_bytes());
35 bytes[8..12].copy_from_slice(&self.shared_prefix_size.to_le_bytes());
36 bytes[12..16].copy_from_slice(&self.fsst_raw_size.to_le_bytes());
37 bytes[16..20].copy_from_slice(&self.fingerprint_size.to_le_bytes());
38 bytes
39 }
40
41 pub(super) fn from_bytes(bytes: &[u8]) -> Self {
42 if bytes.len() < Self::size() {
43 panic!(
44 "value too small for ByteViewArrayHeader, expected at least {} bytes, got {}",
45 Self::size(),
46 bytes.len()
47 );
48 }
49 let keys_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
50 let compact_offsets_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
51 let shared_prefix_size = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
52 let fsst_raw_size = u32::from_le_bytes(bytes[12..16].try_into().unwrap());
53 let fingerprint_size = u32::from_le_bytes(bytes[16..20].try_into().unwrap());
54 Self {
55 keys_size,
56 compact_offsets_size,
57 shared_prefix_size,
58 fsst_raw_size,
59 fingerprint_size,
60 }
61 }
62}
63
64pub(super) fn align_up_8(len: usize) -> usize {
65 (len + 7) & !7
66}
67
68fn decode_prefix_keys(bytes: &[u8]) -> Arc<[PrefixKey]> {
69 let entry_size = std::mem::size_of::<PrefixKey>();
70 if !bytes.len().is_multiple_of(entry_size) {
71 panic!("Invalid prefix keys size");
72 }
73 if bytes.is_empty() {
74 return Arc::<[PrefixKey]>::from([]);
75 }
76 let mut keys = Vec::with_capacity(bytes.len() / entry_size);
77 for chunk in bytes.chunks_exact(entry_size) {
78 let mut prefix7 = [0u8; 7];
79 prefix7.copy_from_slice(&chunk[..7]);
80 let len = chunk[7];
81 keys.push(PrefixKey::from_parts(prefix7, len));
82 }
83 keys.into()
84}
85
86impl LiquidByteViewArray<FsstArray> {
87 pub(crate) fn to_bytes_inner(&self) -> SqueezeResult<Vec<u8>> {
123 let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
124 let mut result = Vec::with_capacity(header_size + 1024);
125 result.resize(header_size, 0);
126
127 while !result.len().is_multiple_of(8) {
129 result.push(0);
130 }
131 let fsst_start = result.len();
132 let fsst_raw_bytes = self.fsst_buffer.raw_to_bytes();
133 result.extend_from_slice(&fsst_raw_bytes);
134 let fsst_raw_size = result.len() - fsst_start;
135
136 while !result.len().is_multiple_of(8) {
138 result.push(0);
139 }
140
141 let keys_start = result.len();
143 {
144 use std::num::NonZero;
145 let bit_packed = BitPackedArray::<UInt16Type>::from_primitive(
146 self.dictionary_keys.clone(),
147 NonZero::new(16).unwrap(),
148 );
149 bit_packed.to_bytes(&mut result);
150 }
151 let keys_size = result.len() - keys_start;
152
153 while !result.len().is_multiple_of(8) {
155 result.push(0);
156 }
157
158 let offsets_start = result.len();
160 self.fsst_buffer.write_compact_offsets(&mut result);
161 let compact_offsets_size = result.len() - offsets_start;
162
163 while !result.len().is_multiple_of(8) {
165 result.push(0);
166 }
167
168 for prefix in self.prefix_keys.iter() {
170 result.extend_from_slice(prefix.prefix7());
171 result.push(prefix.len_byte());
172 }
173
174 while !result.len().is_multiple_of(8) {
176 result.push(0);
177 }
178
179 let prefix_start = result.len();
181 result.extend_from_slice(&self.shared_prefix);
182 let shared_prefix_size = result.len() - prefix_start;
183
184 while !result.len().is_multiple_of(8) {
186 result.push(0);
187 }
188
189 if let Some(fingerprints) = self.string_fingerprints.as_ref() {
191 for &fingerprint in fingerprints.iter() {
192 result.extend_from_slice(&fingerprint.to_le_bytes());
193 }
194 }
195
196 let ipc = LiquidIPCHeader::new(
198 LiquidDataType::ByteViewArray as u16,
199 self.original_arrow_type as u16,
200 );
201 let view_header = ByteViewArrayHeader {
202 keys_size: keys_size as u32,
203 compact_offsets_size: compact_offsets_size as u32,
204 shared_prefix_size: shared_prefix_size as u32,
205 fsst_raw_size: fsst_raw_size as u32,
206 fingerprint_size: (self
207 .string_fingerprints
208 .as_ref()
209 .map(|fingerprints| fingerprints.len())
210 .unwrap_or(0)
211 * std::mem::size_of::<u32>()) as u32,
212 };
213
214 let header_slice = &mut result[0..header_size];
216 header_slice[0..LiquidIPCHeader::size()].copy_from_slice(&ipc.to_bytes());
217 header_slice[LiquidIPCHeader::size()..header_size].copy_from_slice(&view_header.to_bytes());
218
219 Ok(result)
220 }
221
222 pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> LiquidByteViewArray<FsstArray> {
224 let ipc = LiquidIPCHeader::from_bytes(&bytes);
226 let original_arrow_type = ArrowByteType::from(ipc.physical_type_id);
227 let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
228 let view_header =
229 ByteViewArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
230
231 let mut cursor = header_size;
232
233 cursor = align_up_8(cursor);
235 let fsst_end = cursor + view_header.fsst_raw_size as usize;
236 if fsst_end > bytes.len() {
237 panic!("FSST raw buffer extends beyond input buffer");
238 }
239 let fsst_raw = bytes.slice(cursor..fsst_end);
240 let raw_buffer = RawFsstBuffer::from_bytes(fsst_raw);
241 cursor = fsst_end;
242
243 cursor = align_up_8(cursor);
245 let keys_end = cursor + view_header.keys_size as usize;
246 if keys_end > bytes.len() {
247 panic!("Keys data extends beyond input buffer");
248 }
249 let keys_data = bytes.slice(cursor..keys_end);
250 let bit_packed = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
251 let dictionary_keys = bit_packed.to_primitive();
252 cursor = keys_end;
253
254 cursor = align_up_8(cursor);
256 let offsets_end = cursor + view_header.compact_offsets_size as usize;
257 if offsets_end > bytes.len() {
258 panic!("Compact offsets data extends beyond input buffer");
259 }
260
261 let compact_offsets = if view_header.compact_offsets_size > 0 {
263 let chunk = bytes.slice(cursor..offsets_end);
264 decode_compact_offsets(chunk.as_ref())
265 } else {
266 empty_compact_offsets()
267 };
268 cursor = offsets_end;
269
270 cursor = align_up_8(cursor);
272 let prefix_count = compact_offsets.len().saturating_sub(1);
273 let prefix_keys_size = prefix_count * std::mem::size_of::<PrefixKey>();
274 let prefix_keys_end = cursor + prefix_keys_size;
275 if prefix_keys_end > bytes.len() {
276 panic!("Prefix keys data extends beyond input buffer");
277 }
278 let prefix_keys = if prefix_keys_size > 0 {
279 decode_prefix_keys(&bytes[cursor..prefix_keys_end])
280 } else {
281 Arc::<[PrefixKey]>::from([])
282 };
283 cursor = prefix_keys_end;
284
285 cursor = align_up_8(cursor);
287 let prefix_end = cursor + view_header.shared_prefix_size as usize;
288 if prefix_end > bytes.len() {
289 panic!("Shared prefix data extends beyond input buffer");
290 }
291 let shared_prefix = bytes[cursor..prefix_end].to_vec();
292 cursor = prefix_end;
293
294 cursor = align_up_8(cursor);
296 let fingerprint_end = cursor + view_header.fingerprint_size as usize;
297 if fingerprint_end > bytes.len() {
298 panic!("Fingerprint data extends beyond input buffer");
299 }
300 let string_fingerprints = if view_header.fingerprint_size == 0 {
301 None
302 } else {
303 if !(view_header.fingerprint_size as usize).is_multiple_of(std::mem::size_of::<u32>()) {
304 panic!("Invalid fingerprint data size");
305 }
306 let expected = prefix_count * std::mem::size_of::<u32>();
307 if view_header.fingerprint_size as usize != expected {
308 panic!("Fingerprint data size does not match dictionary size");
309 }
310 let mut fingerprints = Vec::with_capacity(view_header.fingerprint_size as usize / 4);
311 for chunk in bytes[cursor..fingerprint_end].chunks_exact(4) {
312 fingerprints.push(u32::from_le_bytes(chunk.try_into().unwrap()));
313 }
314 Some(Arc::from(fingerprints.into_boxed_slice()))
315 };
316
317 LiquidByteViewArray {
318 dictionary_keys,
319 prefix_keys,
320 fsst_buffer: FsstArray::new(Arc::new(raw_buffer), compact_offsets, compressor),
321 original_arrow_type,
322 shared_prefix,
323 string_fingerprints,
324 }
325 }
326}