Skip to main content

liquid_cache/liquid_array/byte_view_array/
serialization.rs

1use arrow::array::types::UInt16Type;
2use bytes::Bytes;
3use fsst::Compressor;
4use std::sync::Arc;
5
6use super::{ArrowByteType, LiquidByteViewArray};
7use crate::liquid_array::ipc::LiquidIPCHeader;
8use crate::liquid_array::raw::BitPackedArray;
9use crate::liquid_array::raw::fsst_buffer::{
10    FsstArray, PrefixKey, RawFsstBuffer, decode_compact_offsets, empty_compact_offsets,
11};
12use crate::liquid_array::{LiquidDataType, SqueezeResult};
13
14// Header for LiquidByteViewArray serialization
15#[repr(C)]
16pub(super) struct ByteViewArrayHeader {
17    pub(super) keys_size: u32,
18    pub(super) compact_offsets_size: u32,
19    pub(super) shared_prefix_size: u32,
20    pub(super) fsst_raw_size: u32,
21    pub(super) fingerprint_size: u32,
22}
23
24impl ByteViewArrayHeader {
25    pub(super) const fn size() -> usize {
26        const _: () =
27            assert!(std::mem::size_of::<ByteViewArrayHeader>() == ByteViewArrayHeader::size());
28        20
29    }
30
31    pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
32        let mut bytes = [0u8; Self::size()];
33        bytes[0..4].copy_from_slice(&self.keys_size.to_le_bytes());
34        bytes[4..8].copy_from_slice(&self.compact_offsets_size.to_le_bytes());
35        bytes[8..12].copy_from_slice(&self.shared_prefix_size.to_le_bytes());
36        bytes[12..16].copy_from_slice(&self.fsst_raw_size.to_le_bytes());
37        bytes[16..20].copy_from_slice(&self.fingerprint_size.to_le_bytes());
38        bytes
39    }
40
41    pub(super) fn from_bytes(bytes: &[u8]) -> Self {
42        if bytes.len() < Self::size() {
43            panic!(
44                "value too small for ByteViewArrayHeader, expected at least {} bytes, got {}",
45                Self::size(),
46                bytes.len()
47            );
48        }
49        let keys_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
50        let compact_offsets_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
51        let shared_prefix_size = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
52        let fsst_raw_size = u32::from_le_bytes(bytes[12..16].try_into().unwrap());
53        let fingerprint_size = u32::from_le_bytes(bytes[16..20].try_into().unwrap());
54        Self {
55            keys_size,
56            compact_offsets_size,
57            shared_prefix_size,
58            fsst_raw_size,
59            fingerprint_size,
60        }
61    }
62}
63
64pub(super) fn align_up_8(len: usize) -> usize {
65    (len + 7) & !7
66}
67
68fn decode_prefix_keys(bytes: &[u8]) -> Arc<[PrefixKey]> {
69    let entry_size = std::mem::size_of::<PrefixKey>();
70    if !bytes.len().is_multiple_of(entry_size) {
71        panic!("Invalid prefix keys size");
72    }
73    if bytes.is_empty() {
74        return Arc::<[PrefixKey]>::from([]);
75    }
76    let mut keys = Vec::with_capacity(bytes.len() / entry_size);
77    for chunk in bytes.chunks_exact(entry_size) {
78        let mut prefix7 = [0u8; 7];
79        prefix7.copy_from_slice(&chunk[..7]);
80        let len = chunk[7];
81        keys.push(PrefixKey::from_parts(prefix7, len));
82    }
83    keys.into()
84}
85
86impl LiquidByteViewArray<FsstArray> {
87    /*
88    Serialized LiquidByteViewArray Memory Layout:
89
90    +--------------------------------------------------+
91    | LiquidIPCHeader (16 bytes)                       |
92    +--------------------------------------------------+
93    | ByteViewArrayHeader (20 bytes)                   |  // keys_size, compact_offsets_size, shared_prefix_size, fsst_size, fingerprint_size
94    +--------------------------------------------------+
95    | Padding (to 8-byte alignment)                    |
96    +--------------------------------------------------+
97    | RawFsstBuffer bytes                              |
98    +--------------------------------------------------+
99    | Padding (to 8-byte alignment)                    |
100    +--------------------------------------------------+
101    | BitPackedArray Data (dictionary_keys)            |
102    +--------------------------------------------------+
103    | [BitPackedArray Header & Values]                 |
104    +--------------------------------------------------+
105    | Padding (to 8-byte alignment)                    |
106    +--------------------------------------------------+
107    | Compact offsets bytes (header + residuals)       |
108    +--------------------------------------------------+
109    | Padding (to 8-byte alignment)                    |
110    +--------------------------------------------------+
111    | Prefix keys bytes (prefix7 + len)                |
112    +--------------------------------------------------+
113    | Padding (to 8-byte alignment)                    |
114    +--------------------------------------------------+
115    | Shared prefix bytes                              |
116    +--------------------------------------------------+
117    | Padding (to 8-byte alignment)                    |
118    +--------------------------------------------------+
119    | Optional string fingerprints (u32 per entry)     |
120    +--------------------------------------------------+
121    */
122    pub(crate) fn to_bytes_inner(&self) -> SqueezeResult<Vec<u8>> {
123        let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
124        let mut result = Vec::with_capacity(header_size + 1024);
125        result.resize(header_size, 0);
126
127        // A) Align and serialize RawFsstBuffer first (near the start)
128        while !result.len().is_multiple_of(8) {
129            result.push(0);
130        }
131        let fsst_start = result.len();
132        let fsst_raw_bytes = self.fsst_buffer.raw_to_bytes();
133        result.extend_from_slice(&fsst_raw_bytes);
134        let fsst_raw_size = result.len() - fsst_start;
135
136        // B) Alignment before keys
137        while !result.len().is_multiple_of(8) {
138            result.push(0);
139        }
140
141        // C) Serialize dictionary keys
142        let keys_start = result.len();
143        {
144            use std::num::NonZero;
145            let bit_packed = BitPackedArray::<UInt16Type>::from_primitive(
146                self.dictionary_keys.clone(),
147                NonZero::new(16).unwrap(),
148            );
149            bit_packed.to_bytes(&mut result);
150        }
151        let keys_size = result.len() - keys_start;
152
153        // D) Alignment before compact offsets
154        while !result.len().is_multiple_of(8) {
155            result.push(0);
156        }
157
158        // E) Serialize compact offsets (header + residuals)
159        let offsets_start = result.len();
160        self.fsst_buffer.write_compact_offsets(&mut result);
161        let compact_offsets_size = result.len() - offsets_start;
162
163        // F) Alignment before prefix keys
164        while !result.len().is_multiple_of(8) {
165            result.push(0);
166        }
167
168        // G) Serialize prefix keys (prefix7 + len)
169        for prefix in self.prefix_keys.iter() {
170            result.extend_from_slice(prefix.prefix7());
171            result.push(prefix.len_byte());
172        }
173
174        // H) Alignment before shared prefix
175        while !result.len().is_multiple_of(8) {
176            result.push(0);
177        }
178
179        // I) Serialize shared prefix
180        let prefix_start = result.len();
181        result.extend_from_slice(&self.shared_prefix);
182        let shared_prefix_size = result.len() - prefix_start;
183
184        // J) Alignment before fingerprints
185        while !result.len().is_multiple_of(8) {
186            result.push(0);
187        }
188
189        // K) Serialize string fingerprints (u32 per entry)
190        if let Some(fingerprints) = self.string_fingerprints.as_ref() {
191            for &fingerprint in fingerprints.iter() {
192                result.extend_from_slice(&fingerprint.to_le_bytes());
193            }
194        }
195
196        // Prepare headers
197        let ipc = LiquidIPCHeader::new(
198            LiquidDataType::ByteViewArray as u16,
199            self.original_arrow_type as u16,
200        );
201        let view_header = ByteViewArrayHeader {
202            keys_size: keys_size as u32,
203            compact_offsets_size: compact_offsets_size as u32,
204            shared_prefix_size: shared_prefix_size as u32,
205            fsst_raw_size: fsst_raw_size as u32,
206            fingerprint_size: (self
207                .string_fingerprints
208                .as_ref()
209                .map(|fingerprints| fingerprints.len())
210                .unwrap_or(0)
211                * std::mem::size_of::<u32>()) as u32,
212        };
213
214        // Write headers into reserved space at start
215        let header_slice = &mut result[0..header_size];
216        header_slice[0..LiquidIPCHeader::size()].copy_from_slice(&ipc.to_bytes());
217        header_slice[LiquidIPCHeader::size()..header_size].copy_from_slice(&view_header.to_bytes());
218
219        Ok(result)
220    }
221
222    /// Deserialize a LiquidByteViewArray from bytes.
223    pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> LiquidByteViewArray<FsstArray> {
224        // 0) Read IPC header and our view header
225        let ipc = LiquidIPCHeader::from_bytes(&bytes);
226        let original_arrow_type = ArrowByteType::from(ipc.physical_type_id);
227        let header_size = LiquidIPCHeader::size() + ByteViewArrayHeader::size();
228        let view_header =
229            ByteViewArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
230
231        let mut cursor = header_size;
232
233        // A) Align and read FSST raw buffer first
234        cursor = align_up_8(cursor);
235        let fsst_end = cursor + view_header.fsst_raw_size as usize;
236        if fsst_end > bytes.len() {
237            panic!("FSST raw buffer extends beyond input buffer");
238        }
239        let fsst_raw = bytes.slice(cursor..fsst_end);
240        let raw_buffer = RawFsstBuffer::from_bytes(fsst_raw);
241        cursor = fsst_end;
242
243        // B) Align and read keys
244        cursor = align_up_8(cursor);
245        let keys_end = cursor + view_header.keys_size as usize;
246        if keys_end > bytes.len() {
247            panic!("Keys data extends beyond input buffer");
248        }
249        let keys_data = bytes.slice(cursor..keys_end);
250        let bit_packed = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
251        let dictionary_keys = bit_packed.to_primitive();
252        cursor = keys_end;
253
254        // C) Align and read compact offsets
255        cursor = align_up_8(cursor);
256        let offsets_end = cursor + view_header.compact_offsets_size as usize;
257        if offsets_end > bytes.len() {
258            panic!("Compact offsets data extends beyond input buffer");
259        }
260
261        // Deserialize compact offsets.
262        let compact_offsets = if view_header.compact_offsets_size > 0 {
263            let chunk = bytes.slice(cursor..offsets_end);
264            decode_compact_offsets(chunk.as_ref())
265        } else {
266            empty_compact_offsets()
267        };
268        cursor = offsets_end;
269
270        // D) Align and read prefix keys
271        cursor = align_up_8(cursor);
272        let prefix_count = compact_offsets.len().saturating_sub(1);
273        let prefix_keys_size = prefix_count * std::mem::size_of::<PrefixKey>();
274        let prefix_keys_end = cursor + prefix_keys_size;
275        if prefix_keys_end > bytes.len() {
276            panic!("Prefix keys data extends beyond input buffer");
277        }
278        let prefix_keys = if prefix_keys_size > 0 {
279            decode_prefix_keys(&bytes[cursor..prefix_keys_end])
280        } else {
281            Arc::<[PrefixKey]>::from([])
282        };
283        cursor = prefix_keys_end;
284
285        // E) Align and read shared prefix
286        cursor = align_up_8(cursor);
287        let prefix_end = cursor + view_header.shared_prefix_size as usize;
288        if prefix_end > bytes.len() {
289            panic!("Shared prefix data extends beyond input buffer");
290        }
291        let shared_prefix = bytes[cursor..prefix_end].to_vec();
292        cursor = prefix_end;
293
294        // F) String fingerprints
295        cursor = align_up_8(cursor);
296        let fingerprint_end = cursor + view_header.fingerprint_size as usize;
297        if fingerprint_end > bytes.len() {
298            panic!("Fingerprint data extends beyond input buffer");
299        }
300        let string_fingerprints = if view_header.fingerprint_size == 0 {
301            None
302        } else {
303            if !(view_header.fingerprint_size as usize).is_multiple_of(std::mem::size_of::<u32>()) {
304                panic!("Invalid fingerprint data size");
305            }
306            let expected = prefix_count * std::mem::size_of::<u32>();
307            if view_header.fingerprint_size as usize != expected {
308                panic!("Fingerprint data size does not match dictionary size");
309            }
310            let mut fingerprints = Vec::with_capacity(view_header.fingerprint_size as usize / 4);
311            for chunk in bytes[cursor..fingerprint_end].chunks_exact(4) {
312                fingerprints.push(u32::from_le_bytes(chunk.try_into().unwrap()));
313            }
314            Some(Arc::from(fingerprints.into_boxed_slice()))
315        };
316
317        LiquidByteViewArray {
318            dictionary_keys,
319            prefix_keys,
320            fsst_buffer: FsstArray::new(Arc::new(raw_buffer), compact_offsets, compressor),
321            original_arrow_type,
322            shared_prefix,
323            string_fingerprints,
324        }
325    }
326}