Skip to main content

liquid_cache/liquid_array/byte_view_array/
serialization.rs

1use arrow::array::types::UInt16Type;
2use bytes::Bytes;
3use fsst::Compressor;
4use std::sync::Arc;
5
6use super::LiquidByteViewArray;
7use crate::liquid_array::byte_array::ArrowByteType;
8use crate::liquid_array::ipc::LiquidIPCHeader;
9use crate::liquid_array::raw::BitPackedArray;
10use crate::liquid_array::raw::fsst_buffer::{
11    FsstArray, PrefixKey, RawFsstBuffer, decode_compact_offsets, empty_compact_offsets,
12};
13use crate::liquid_array::{LiquidDataType, SqueezeResult};
14
15// Header for LiquidByteViewArray serialization
16#[repr(C)]
17pub(super) struct ByteViewArrayHeader {
18    pub(super) keys_size: u32,
19    pub(super) compact_offsets_size: u32,
20    pub(super) shared_prefix_size: u32,
21    pub(super) fsst_raw_size: u32,
22    pub(super) fingerprint_size: u32,
23}
24
25impl ByteViewArrayHeader {
26    pub(super) const fn size() -> usize {
27        const _: () =
28            assert!(std::mem::size_of::<ByteViewArrayHeader>() == ByteViewArrayHeader::size());
29        20
30    }
31
32    pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
33        let mut bytes = [0u8; Self::size()];
34        bytes[0..4].copy_from_slice(&self.keys_size.to_le_bytes());
35        bytes[4..8].copy_from_slice(&self.compact_offsets_size.to_le_bytes());
36        bytes[8..12].copy_from_slice(&self.shared_prefix_size.to_le_bytes());
37        bytes[12..16].copy_from_slice(&self.fsst_raw_size.to_le_bytes());
38        bytes[16..20].copy_from_slice(&self.fingerprint_size.to_le_bytes());
39        bytes
40    }
41
42    pub(super) fn from_bytes(bytes: &[u8]) -> Self {
43        if bytes.len() < Self::size() {
44            panic!(
45                "value too small for ByteViewArrayHeader, expected at least {} bytes, got {}",
46                Self::size(),
47                bytes.len()
48            );
49        }
50        let keys_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
51        let compact_offsets_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
52        let shared_prefix_size = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
53        let fsst_raw_size = u32::from_le_bytes(bytes[12..16].try_into().unwrap());
54        let fingerprint_size = u32::from_le_bytes(bytes[16..20].try_into().unwrap());
55        Self {
56            keys_size,
57            compact_offsets_size,
58            shared_prefix_size,
59            fsst_raw_size,
60            fingerprint_size,
61        }
62    }
63}
64
65pub(super) fn align_up_8(len: usize) -> usize {
66    (len + 7) & !7
67}
68
69fn decode_prefix_keys(bytes: &[u8]) -> Arc<[PrefixKey]> {
70    let entry_size = std::mem::size_of::<PrefixKey>();
71    if !bytes.len().is_multiple_of(entry_size) {
72        panic!("Invalid prefix keys size");
73    }
74    if bytes.is_empty() {
75        return Arc::<[PrefixKey]>::from([]);
76    }
77    let mut keys = Vec::with_capacity(bytes.len() / entry_size);
78    for chunk in bytes.chunks_exact(entry_size) {
79        let mut prefix7 = [0u8; 7];
80        prefix7.copy_from_slice(&chunk[..7]);
81        let len = chunk[7];
82        keys.push(PrefixKey::from_parts(prefix7, len));
83    }
84    keys.into()
85}
86
87impl LiquidByteViewArray<FsstArray> {
88    /*
89    Serialized LiquidByteViewArray Memory Layout:
90
91    +--------------------------------------------------+
92    | LiquidIPCHeader (16 bytes)                       |
93    +--------------------------------------------------+
94    | ByteViewArrayHeader (20 bytes)                   |  // keys_size, compact_offsets_size, shared_prefix_size, fsst_size, fingerprint_size
95    +--------------------------------------------------+
96    | Padding (to 8-byte alignment)                    |
97    +--------------------------------------------------+
98    | RawFsstBuffer bytes                              |
99    +--------------------------------------------------+
100    | Padding (to 8-byte alignment)                    |
101    +--------------------------------------------------+
102    | BitPackedArray Data (dictionary_keys)            |
103    +--------------------------------------------------+
104    | [BitPackedArray Header & Values]                 |
105    +--------------------------------------------------+
106    | Padding (to 8-byte alignment)                    |
107    +--------------------------------------------------+
108    | Compact offsets bytes (header + residuals)       |
109    +--------------------------------------------------+
110    | Padding (to 8-byte alignment)                    |
111    +--------------------------------------------------+
112    | Prefix keys bytes (prefix7 + len)                |
113    +--------------------------------------------------+
114    | Padding (to 8-byte alignment)                    |
115    +--------------------------------------------------+
116    | Shared prefix bytes                              |
117    +--------------------------------------------------+
118    | Padding (to 8-byte alignment)                    |
119    +--------------------------------------------------+
120    | Optional string fingerprints (u32 per entry)     |
121    +--------------------------------------------------+
122    */
123    pub(crate) fn to_bytes_inner(&self) -> SqueezeResult<Vec<u8>> {
124        let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
125        let mut result = Vec::with_capacity(header_size + 1024);
126        result.resize(header_size, 0);
127
128        // A) Align and serialize RawFsstBuffer first (near the start)
129        while !result.len().is_multiple_of(8) {
130            result.push(0);
131        }
132        let fsst_start = result.len();
133        let fsst_raw_bytes = self.fsst_buffer.raw_to_bytes();
134        result.extend_from_slice(&fsst_raw_bytes);
135        let fsst_raw_size = result.len() - fsst_start;
136
137        // B) Alignment before keys
138        while !result.len().is_multiple_of(8) {
139            result.push(0);
140        }
141
142        // C) Serialize dictionary keys
143        let keys_start = result.len();
144        {
145            use std::num::NonZero;
146            let bit_packed = BitPackedArray::<UInt16Type>::from_primitive(
147                self.dictionary_keys.clone(),
148                NonZero::new(16).unwrap(),
149            );
150            bit_packed.to_bytes(&mut result);
151        }
152        let keys_size = result.len() - keys_start;
153
154        // D) Alignment before compact offsets
155        while !result.len().is_multiple_of(8) {
156            result.push(0);
157        }
158
159        // E) Serialize compact offsets (header + residuals)
160        let offsets_start = result.len();
161        self.fsst_buffer.write_compact_offsets(&mut result);
162        let compact_offsets_size = result.len() - offsets_start;
163
164        // F) Alignment before prefix keys
165        while !result.len().is_multiple_of(8) {
166            result.push(0);
167        }
168
169        // G) Serialize prefix keys (prefix7 + len)
170        for prefix in self.prefix_keys.iter() {
171            result.extend_from_slice(prefix.prefix7());
172            result.push(prefix.len_byte());
173        }
174
175        // H) Alignment before shared prefix
176        while !result.len().is_multiple_of(8) {
177            result.push(0);
178        }
179
180        // I) Serialize shared prefix
181        let prefix_start = result.len();
182        result.extend_from_slice(&self.shared_prefix);
183        let shared_prefix_size = result.len() - prefix_start;
184
185        // J) Alignment before fingerprints
186        while !result.len().is_multiple_of(8) {
187            result.push(0);
188        }
189
190        // K) Serialize string fingerprints (u32 per entry)
191        if let Some(fingerprints) = self.string_fingerprints.as_ref() {
192            for &fingerprint in fingerprints.iter() {
193                result.extend_from_slice(&fingerprint.to_le_bytes());
194            }
195        }
196
197        // Prepare headers
198        let ipc = LiquidIPCHeader::new(
199            LiquidDataType::ByteViewArray as u16,
200            self.original_arrow_type as u16,
201        );
202        let view_header = ByteViewArrayHeader {
203            keys_size: keys_size as u32,
204            compact_offsets_size: compact_offsets_size as u32,
205            shared_prefix_size: shared_prefix_size as u32,
206            fsst_raw_size: fsst_raw_size as u32,
207            fingerprint_size: (self
208                .string_fingerprints
209                .as_ref()
210                .map(|fingerprints| fingerprints.len())
211                .unwrap_or(0)
212                * std::mem::size_of::<u32>()) as u32,
213        };
214
215        // Write headers into reserved space at start
216        let header_slice = &mut result[0..header_size];
217        header_slice[0..LiquidIPCHeader::size()].copy_from_slice(&ipc.to_bytes());
218        header_slice[LiquidIPCHeader::size()..header_size].copy_from_slice(&view_header.to_bytes());
219
220        Ok(result)
221    }
222
223    /// Deserialize a LiquidByteViewArray from bytes.
224    pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> LiquidByteViewArray<FsstArray> {
225        // 0) Read IPC header and our view header
226        let ipc = LiquidIPCHeader::from_bytes(&bytes);
227        let original_arrow_type = ArrowByteType::from(ipc.physical_type_id);
228        let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
229        let view_header =
230            ByteViewArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
231
232        let mut cursor = header_size;
233
234        // A) Align and read FSST raw buffer first
235        cursor = align_up_8(cursor);
236        let fsst_end = cursor + view_header.fsst_raw_size as usize;
237        if fsst_end > bytes.len() {
238            panic!("FSST raw buffer extends beyond input buffer");
239        }
240        let fsst_raw = bytes.slice(cursor..fsst_end);
241        let raw_buffer = RawFsstBuffer::from_bytes(fsst_raw);
242        cursor = fsst_end;
243
244        // B) Align and read keys
245        cursor = align_up_8(cursor);
246        let keys_end = cursor + view_header.keys_size as usize;
247        if keys_end > bytes.len() {
248            panic!("Keys data extends beyond input buffer");
249        }
250        let keys_data = bytes.slice(cursor..keys_end);
251        let bit_packed = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
252        let dictionary_keys = bit_packed.to_primitive();
253        cursor = keys_end;
254
255        // C) Align and read compact offsets
256        cursor = align_up_8(cursor);
257        let offsets_end = cursor + view_header.compact_offsets_size as usize;
258        if offsets_end > bytes.len() {
259            panic!("Compact offsets data extends beyond input buffer");
260        }
261
262        // Deserialize compact offsets.
263        let compact_offsets = if view_header.compact_offsets_size > 0 {
264            let chunk = bytes.slice(cursor..offsets_end);
265            decode_compact_offsets(chunk.as_ref())
266        } else {
267            empty_compact_offsets()
268        };
269        cursor = offsets_end;
270
271        // D) Align and read prefix keys
272        cursor = align_up_8(cursor);
273        let prefix_count = compact_offsets.len().saturating_sub(1);
274        let prefix_keys_size = prefix_count * std::mem::size_of::<PrefixKey>();
275        let prefix_keys_end = cursor + prefix_keys_size;
276        if prefix_keys_end > bytes.len() {
277            panic!("Prefix keys data extends beyond input buffer");
278        }
279        let prefix_keys = if prefix_keys_size > 0 {
280            decode_prefix_keys(&bytes[cursor..prefix_keys_end])
281        } else {
282            Arc::<[PrefixKey]>::from([])
283        };
284        cursor = prefix_keys_end;
285
286        // E) Align and read shared prefix
287        cursor = align_up_8(cursor);
288        let prefix_end = cursor + view_header.shared_prefix_size as usize;
289        if prefix_end > bytes.len() {
290            panic!("Shared prefix data extends beyond input buffer");
291        }
292        let shared_prefix = bytes[cursor..prefix_end].to_vec();
293        cursor = prefix_end;
294
295        // F) String fingerprints
296        cursor = align_up_8(cursor);
297        let fingerprint_end = cursor + view_header.fingerprint_size as usize;
298        if fingerprint_end > bytes.len() {
299            panic!("Fingerprint data extends beyond input buffer");
300        }
301        let string_fingerprints = if view_header.fingerprint_size == 0 {
302            None
303        } else {
304            if !(view_header.fingerprint_size as usize).is_multiple_of(std::mem::size_of::<u32>()) {
305                panic!("Invalid fingerprint data size");
306            }
307            let expected = prefix_count * std::mem::size_of::<u32>();
308            if view_header.fingerprint_size as usize != expected {
309                panic!("Fingerprint data size does not match dictionary size");
310            }
311            let mut fingerprints = Vec::with_capacity(view_header.fingerprint_size as usize / 4);
312            for chunk in bytes[cursor..fingerprint_end].chunks_exact(4) {
313                fingerprints.push(u32::from_le_bytes(chunk.try_into().unwrap()));
314            }
315            Some(Arc::from(fingerprints.into_boxed_slice()))
316        };
317
318        LiquidByteViewArray {
319            dictionary_keys,
320            prefix_keys,
321            fsst_buffer: FsstArray::new(Arc::new(raw_buffer), compact_offsets, compressor),
322            original_arrow_type,
323            shared_prefix,
324            string_fingerprints,
325        }
326    }
327}