Skip to main content

liquid_cache/liquid_array/raw/
fsst_buffer.rs

1use arrow::{
2    array::{
3        ArrayDataBuilder, Decimal128Array, Decimal256Array, GenericByteArray, OffsetBufferBuilder,
4    },
5    buffer::{Buffer, OffsetBuffer},
6    datatypes::ByteArrayType,
7};
8use bytes;
9use fsst::{Compressor, Decompressor, Symbol};
10use std::io::Result;
11use std::io::{Error, ErrorKind};
12use std::ops::Range;
13use std::sync::Arc;
14
15use crate::liquid_array::fix_len_byte_array::ArrowFixedLenByteArrayType;
16use crate::liquid_array::{LiquidByteViewArray, SqueezeIoHandler};
17
18mod sealed {
19    pub trait Sealed {}
20}
21
22/// Raw FSST buffer that stores compressed data using Arrow Buffer.
23/// Offsets are managed externally as a `u32` slice (including the final sentinel offset).
24#[derive(Clone)]
25pub(crate) struct RawFsstBuffer {
26    values: Buffer,
27    uncompressed_bytes: usize,
28}
29
30impl std::fmt::Debug for RawFsstBuffer {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("RawFsstBuffer")
33            .field("values_len", &self.values.len())
34            .field("uncompressed_bytes", &self.uncompressed_bytes)
35            .finish()
36    }
37}
38
39impl RawFsstBuffer {
40    pub(crate) fn from_parts(values: Buffer, uncompressed_bytes: usize) -> Self {
41        Self {
42            values,
43            uncompressed_bytes,
44        }
45    }
46
47    /// Create RawFsstBuffer from an iterator of byte slices.
48    /// Returns the buffer and a vector of byte offsets (including the final sentinel).
49    pub(crate) fn from_byte_slices<I, T>(
50        iter: I,
51        compressor: Arc<Compressor>,
52        compress_buffer: &mut Vec<u8>,
53    ) -> (Self, Vec<u32>)
54    where
55        I: Iterator<Item = Option<T>>,
56        T: AsRef<[u8]>,
57    {
58        let mut values_buffer = Vec::new();
59        let mut offsets = Vec::new();
60        let mut uncompressed_len = 0;
61
62        offsets.push(0u32);
63        for item in iter {
64            if let Some(bytes) = item {
65                let bytes = bytes.as_ref();
66                uncompressed_len += bytes.len();
67
68                compress_buffer.clear();
69                // `fsst::Compressor::compress_into` requires capacity for the worst-case expansion
70                // (all bytes escaped) which is `2 * plaintext_len`.
71                compress_buffer.reserve(bytes.len().saturating_mul(2));
72                unsafe {
73                    compressor.compress_into(bytes, compress_buffer);
74                }
75
76                values_buffer.extend_from_slice(compress_buffer);
77            }
78            offsets.push(values_buffer.len() as u32);
79        }
80
81        values_buffer.shrink_to_fit();
82        let values_buffer = Buffer::from(values_buffer);
83        let raw_buffer = Self::from_parts(values_buffer, uncompressed_len);
84
85        (raw_buffer, offsets)
86    }
87
88    pub(crate) fn to_uncompressed(
89        &self,
90        decompressor: &Decompressor<'_>,
91        offsets: &[u32],
92    ) -> (Buffer, OffsetBuffer<i32>) {
93        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes + 8);
94        let num_values = offsets.len().saturating_sub(1);
95        let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(num_values);
96
97        for i in 0..num_values {
98            let start_offset = offsets[i];
99            let end_offset = offsets[i + 1];
100
101            if start_offset != end_offset {
102                let compressed_slice = self.get_compressed_slice(start_offset, end_offset);
103                let decompressed_len = decompressor
104                    .decompress_into(compressed_slice, value_buffer.spare_capacity_mut());
105
106                let new_len = value_buffer.len() + decompressed_len;
107                debug_assert!(new_len <= value_buffer.capacity());
108                unsafe {
109                    value_buffer.set_len(new_len);
110                }
111                out_offsets.push_length(decompressed_len);
112            } else {
113                out_offsets.push_length(0);
114            }
115        }
116
117        let buffer = Buffer::from(value_buffer);
118        (buffer, out_offsets.finish())
119    }
120
121    /// Get compressed data slice using byte offsets.
122    pub(crate) fn get_compressed_slice(&self, start_offset: u32, end_offset: u32) -> &[u8] {
123        let start = start_offset as usize;
124        let end = end_offset as usize;
125        debug_assert!(end <= self.values.len(), "Offset out of bounds");
126        debug_assert!(start <= end, "Invalid offset range");
127        &self.values.as_slice()[start..end]
128    }
129
130    pub(crate) fn values_len(&self) -> usize {
131        self.values.len()
132    }
133
134    pub(crate) fn get_memory_size(&self) -> usize {
135        self.values.len() + std::mem::size_of::<Self>()
136    }
137
138    pub(crate) fn to_bytes(&self) -> Vec<u8> {
139        let mut buffer = Vec::with_capacity(self.values.len() + 12);
140        buffer.extend_from_slice(&(self.uncompressed_bytes as u64).to_le_bytes());
141        buffer.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
142        buffer.extend_from_slice(self.values.as_slice());
143        buffer
144    }
145
146    pub(crate) fn uncompressed_bytes(&self) -> usize {
147        self.uncompressed_bytes
148    }
149
150    pub(crate) fn from_bytes(bytes: bytes::Bytes) -> Self {
151        let uncompressed_bytes = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
152        let values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
153        let values = bytes.slice(12..12 + values_len);
154        let values = Buffer::from(values);
155        Self::from_parts(values, uncompressed_bytes)
156    }
157}
158
159/// PrefixKey stores a small suffix fingerprint (prefix bytes + length metadata).
160#[derive(Debug, Clone, Copy)]
161#[repr(C)]
162pub(crate) struct PrefixKey {
163    prefix7: [u8; 7],
164    /// Suffix length in bytes (after shared prefix), or 255 if >= 255 / unknown.
165    len: u8,
166}
167
168impl PrefixKey {
169    pub(crate) const fn prefix_len() -> usize {
170        7
171    }
172
173    /// Construct from the full suffix bytes (after shared prefix).
174    /// Embeds up to `prefix_len()` bytes into `prefix7` and stores length (or 255 if >=255).
175    pub(crate) fn new(suffix_bytes: &[u8]) -> Self {
176        let mut prefix7 = [0u8; 7];
177        let copy_len = std::cmp::min(Self::prefix_len(), suffix_bytes.len());
178        if copy_len > 0 {
179            prefix7[..copy_len].copy_from_slice(&suffix_bytes[..copy_len]);
180        }
181        let len = if suffix_bytes.len() >= 255 {
182            255u8
183        } else {
184            suffix_bytes.len() as u8
185        };
186        Self { prefix7, len }
187    }
188
189    /// Construct directly from stored parts (used by deserialization only)
190    pub(crate) fn from_parts(prefix7: [u8; 7], len: u8) -> Self {
191        Self { prefix7, len }
192    }
193
194    #[inline]
195    pub(crate) fn prefix7(&self) -> &[u8; 7] {
196        &self.prefix7
197    }
198
199    #[inline]
200    pub(crate) fn len_byte(&self) -> u8 {
201        self.len
202    }
203
204    #[cfg(test)]
205    pub(crate) fn known_suffix_len(&self) -> Option<usize> {
206        if self.len == 255 {
207            None
208        } else {
209            Some(self.len as usize)
210        }
211    }
212}
213
214const _: () = if std::mem::size_of::<PrefixKey>() != 8 {
215    panic!("PrefixKey must be 8 bytes")
216};
217
218#[derive(Debug, Clone, Copy)]
219struct CompactOffsetHeader {
220    slope: i32,
221    intercept: i32,
222    offset_bytes: u8, // 1, 2, or 4 bytes per residual
223}
224
225#[derive(Debug, Clone)]
226enum OffsetResiduals {
227    One(Arc<[i8]>),
228    Two(Arc<[i16]>),
229    Four(Arc<[i32]>),
230}
231
232impl OffsetResiduals {
233    fn len(&self) -> usize {
234        match self {
235            Self::One(values) => values.len(),
236            Self::Two(values) => values.len(),
237            Self::Four(values) => values.len(),
238        }
239    }
240
241    #[cfg(test)]
242    fn bytes_per(&self) -> usize {
243        match self {
244            Self::One(_) => 1,
245            Self::Two(_) => 2,
246            Self::Four(_) => 4,
247        }
248    }
249
250    fn get_i32(&self, index: usize) -> i32 {
251        match self {
252            Self::One(values) => values[index] as i32,
253            Self::Two(values) => values[index] as i32,
254            Self::Four(values) => values[index],
255        }
256    }
257}
258
259/// Compact offset index for FSST dictionary values (includes the final sentinel offset).
260#[derive(Debug, Clone)]
261pub(crate) struct CompactOffsets {
262    header: CompactOffsetHeader,
263    residuals: OffsetResiduals,
264}
265
266// Proper least-squares linear regression
267fn fit_line(offsets: &[u32]) -> (i32, i32) {
268    let n = offsets.len();
269    if n <= 1 {
270        return (0, offsets.first().copied().unwrap_or(0) as i32);
271    }
272
273    let n_f64 = n as f64;
274
275    // Sum of indices: 0 + 1 + 2 + ... + (n-1) = n*(n-1)/2
276    let sum_x = (n * (n - 1) / 2) as f64;
277
278    // Sum of offsets
279    let sum_y: f64 = offsets.iter().map(|&o| o as f64).sum();
280
281    // Sum of (index * offset)
282    let sum_xy: f64 = offsets
283        .iter()
284        .enumerate()
285        .map(|(i, &o)| i as f64 * o as f64)
286        .sum();
287
288    // Sum of index squared: 0² + 1² + 2² + ... + (n-1)² = n*(n-1)*(2n-1)/6
289    let sum_x_sq = (n * (n - 1) * (2 * n - 1) / 6) as f64;
290
291    // Least squares formulas
292    let slope = (n_f64 * sum_xy - sum_x * sum_y) / (n_f64 * sum_x_sq - sum_x * sum_x);
293    let intercept = (sum_y - slope * sum_x) / n_f64;
294
295    (slope.round() as i32, intercept.round() as i32)
296}
297
298impl CompactOffsets {
299    pub(crate) fn from_offsets(offsets: &[u32]) -> Self {
300        if offsets.is_empty() {
301            return Self {
302                header: CompactOffsetHeader {
303                    slope: 0,
304                    intercept: 0,
305                    offset_bytes: 1,
306                },
307                residuals: OffsetResiduals::One(Arc::new([])),
308            };
309        }
310
311        let (slope, intercept) = fit_line(offsets);
312
313        let mut offset_residuals: Vec<i32> = Vec::with_capacity(offsets.len());
314        let mut min_residual = i32::MAX;
315        let mut max_residual = i32::MIN;
316        for (index, &offset) in offsets.iter().enumerate() {
317            let predicted = slope * index as i32 + intercept;
318            let residual = offset as i32 - predicted;
319            offset_residuals.push(residual);
320            min_residual = min_residual.min(residual);
321            max_residual = max_residual.max(residual);
322        }
323
324        let offset_bytes = if min_residual >= i8::MIN as i32 && max_residual <= i8::MAX as i32 {
325            1
326        } else if min_residual >= i16::MIN as i32 && max_residual <= i16::MAX as i32 {
327            2
328        } else {
329            4
330        };
331
332        let residuals = match offset_bytes {
333            1 => OffsetResiduals::One(
334                offset_residuals
335                    .iter()
336                    .map(|&r| r as i8)
337                    .collect::<Vec<_>>()
338                    .into(),
339            ),
340            2 => OffsetResiduals::Two(
341                offset_residuals
342                    .iter()
343                    .map(|&r| r as i16)
344                    .collect::<Vec<_>>()
345                    .into(),
346            ),
347            4 => OffsetResiduals::Four(offset_residuals.into()),
348            _ => unreachable!("offset_bytes must be 1, 2, or 4"),
349        };
350
351        Self {
352            header: CompactOffsetHeader {
353                slope,
354                intercept,
355                offset_bytes,
356            },
357            residuals,
358        }
359    }
360
361    pub(crate) fn len(&self) -> usize {
362        self.residuals.len()
363    }
364
365    pub(crate) fn get_offset(&self, index: usize) -> u32 {
366        let predicted = self.header.slope * index as i32 + self.header.intercept;
367        (predicted + self.residuals.get_i32(index)) as u32
368    }
369
370    pub(crate) fn offsets(&self) -> Vec<u32> {
371        (0..self.len()).map(|i| self.get_offset(i)).collect()
372    }
373
374    pub(crate) fn memory_usage(&self) -> usize {
375        let header_size = std::mem::size_of::<CompactOffsetHeader>();
376        let residuals_size = match &self.residuals {
377            OffsetResiduals::One(values) => values.len() * std::mem::size_of::<i8>(),
378            OffsetResiduals::Two(values) => values.len() * std::mem::size_of::<i16>(),
379            OffsetResiduals::Four(values) => values.len() * std::mem::size_of::<i32>(),
380        };
381        header_size + residuals_size
382    }
383}
384
385pub(crate) fn empty_compact_offsets() -> CompactOffsets {
386    CompactOffsets::from_offsets(&[])
387}
388
389const SYMBOL_SIZE_BYTES: usize = std::mem::size_of::<Symbol>();
390
391pub(crate) fn train_compressor<'a, I>(iter: I) -> Compressor
392where
393    I: Iterator<Item = &'a [u8]>,
394{
395    let strings: Vec<&[u8]> = iter.collect();
396    fsst::Compressor::train(&strings)
397}
398
399/// In-memory FSST dictionary buffer that bundles compressed bytes, compact offsets, and the
400/// compressor needed to (de)compress values.
401#[derive(Clone)]
402pub struct FsstArray {
403    compressor: Arc<Compressor>,
404    raw: Arc<RawFsstBuffer>,
405    compact_offsets: CompactOffsets,
406}
407
408impl std::fmt::Debug for FsstArray {
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("FsstBuffer")
411            .field("raw", &self.raw)
412            .field("compact_offsets", &"<CompactOffsets>")
413            .field("compressor", &"<Compressor>")
414            .finish()
415    }
416}
417
418impl FsstArray {
419    pub(crate) fn new(
420        raw: Arc<RawFsstBuffer>,
421        compact_offsets: CompactOffsets,
422        compressor: Arc<Compressor>,
423    ) -> Self {
424        Self {
425            compressor,
426            raw,
427            compact_offsets,
428        }
429    }
430
431    pub(crate) fn from_byte_offsets(
432        raw: Arc<RawFsstBuffer>,
433        byte_offsets: &[u32],
434        compressor: Arc<Compressor>,
435    ) -> Self {
436        Self::new(raw, CompactOffsets::from_offsets(byte_offsets), compressor)
437    }
438
439    pub(crate) fn raw_to_bytes(&self) -> Vec<u8> {
440        self.raw.to_bytes()
441    }
442
443    pub(crate) fn write_compact_offsets(&self, out: &mut Vec<u8>) {
444        self.compact_offsets.write_residuals(out)
445    }
446
447    /// Trains a compressor on a sequence of strings.
448    pub fn train_compressor<'a>(input: impl Iterator<Item = &'a [u8]>) -> Compressor {
449        train_compressor(input)
450    }
451
452    /// Creates a new FSST buffer from a GenericByteArray and a compressor.
453    pub fn from_byte_array_with_compressor<T: ByteArrayType>(
454        input: &GenericByteArray<T>,
455        compressor: Arc<Compressor>,
456    ) -> Self {
457        let iter = input.iter();
458        let mut compress_buffer = Vec::with_capacity(2 * 1024 * 1024);
459        let (raw, offsets) =
460            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
461        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
462    }
463
464    /// Creates a new FSST buffer from a Decimal128Array and a compressor.
465    pub fn from_decimal128_array_with_compressor(
466        array: &Decimal128Array,
467        compressor: Arc<Compressor>,
468    ) -> Self {
469        let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
470        let mut compress_buffer = Vec::with_capacity(64);
471        let (raw, offsets) =
472            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
473        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
474    }
475
476    /// Creates a new FSST buffer from a Decimal256Array and a compressor.
477    pub fn from_decimal256_array_with_compressor(
478        array: &Decimal256Array,
479        compressor: Arc<Compressor>,
480    ) -> Self {
481        let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
482        let mut compress_buffer = Vec::with_capacity(128);
483        let (raw, offsets) =
484            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
485        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
486    }
487
488    /// Returns the uncompressed byte size of this buffer.
489    pub fn uncompressed_bytes(&self) -> usize {
490        <Self as FsstBacking>::uncompressed_bytes(self)
491    }
492
493    /// Returns the in-memory size of this buffer.
494    pub fn get_array_memory_size(&self) -> usize {
495        <Self as FsstBacking>::get_array_memory_size(self)
496    }
497
498    /// Returns the number of values in this buffer.
499    #[allow(clippy::len_without_is_empty)]
500    pub fn len(&self) -> usize {
501        self.compact_offsets.len().saturating_sub(1)
502    }
503
504    /// Returns a decompressor for this buffer.
505    pub fn decompressor(&self) -> Decompressor<'_> {
506        self.compressor.decompressor()
507    }
508
509    /// Returns a reference to the compressor.
510    pub fn compressor(&self) -> &Compressor {
511        &self.compressor
512    }
513
514    /// Returns a clone of the shared compressor.
515    pub fn compressor_arc(&self) -> Arc<Compressor> {
516        self.compressor.clone()
517    }
518
519    /// Serializes this FSST buffer (raw bytes + compact offsets) to `out`.
520    pub fn to_bytes(&self, out: &mut Vec<u8>) {
521        out.extend_from_slice(&self.raw.to_bytes());
522        self.compact_offsets.write_residuals(out);
523    }
524
525    /// Deserializes a FSST buffer from the `to_bytes()` format.
526    pub fn from_bytes(bytes: bytes::Bytes, compressor: Arc<Compressor>) -> Self {
527        if bytes.len() < 12 {
528            panic!("Input buffer too small for RawFsstBuffer header");
529        }
530
531        let raw_values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
532        let raw_len = 12 + raw_values_len;
533        if raw_len > bytes.len() {
534            panic!("RawFsstBuffer extends beyond input buffer");
535        }
536
537        let raw = RawFsstBuffer::from_bytes(bytes.slice(0..raw_len));
538        let compact = decode_compact_offsets(&bytes[raw_len..]);
539
540        if compact.len() > 0 {
541            let last = compact.get_offset(compact.len().saturating_sub(1)) as usize;
542            debug_assert_eq!(
543                last,
544                raw.values_len(),
545                "offsets must end at raw values length"
546            );
547        }
548
549        Self::new(Arc::new(raw), compact, compressor)
550    }
551
552    /// Decompress all values into an Arrow byte array.
553    pub fn to_arrow_byte_array<T: ByteArrayType<Offset = i32>>(&self) -> GenericByteArray<T> {
554        let (value_buffer, offsets) = self.to_uncompressed();
555        unsafe { GenericByteArray::<T>::new_unchecked(offsets, value_buffer, None) }
556    }
557
558    fn decompress_as_fixed_size_binary(&self, value_width: usize) -> Vec<u8> {
559        let decompressor = self.compressor.decompressor();
560        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.len() * value_width + 8);
561
562        for i in 0..self.len() {
563            let compressed = self.get_compressed_slice(i);
564            let required = decompressor.max_decompression_capacity(compressed) + 8;
565            value_buffer.reserve(required);
566            let len = decompressor.decompress_into(compressed, value_buffer.spare_capacity_mut());
567            debug_assert!(len == value_width);
568            let new_len = value_buffer.len() + len;
569            unsafe {
570                value_buffer.set_len(new_len);
571            }
572        }
573        value_buffer
574    }
575
576    fn to_decimal_array_inner(&self, data_type: &ArrowFixedLenByteArrayType) -> Buffer {
577        let value_width = data_type.value_width();
578        Buffer::from(self.decompress_as_fixed_size_binary(value_width))
579    }
580
581    /// Converts this FSST buffer to a Decimal128Array.
582    pub fn to_decimal128_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal128Array {
583        let value_buffer = self.to_decimal_array_inner(data_type);
584        let array_builder = ArrayDataBuilder::new(data_type.into())
585            .len(self.len())
586            .add_buffer(value_buffer);
587        let array_data = unsafe { array_builder.build_unchecked() };
588        Decimal128Array::from(array_data)
589    }
590
591    /// Converts this FSST buffer to a Decimal256Array.
592    pub fn to_decimal256_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal256Array {
593        let value_buffer = self.to_decimal_array_inner(data_type);
594        let array_builder = ArrayDataBuilder::new(data_type.into())
595            .len(self.len())
596            .add_buffer(value_buffer);
597        let array_data = unsafe { array_builder.build_unchecked() };
598        Decimal256Array::from(array_data)
599    }
600
601    #[cfg(test)]
602    pub(crate) fn offsets_len(&self) -> usize {
603        self.compact_offsets.len()
604    }
605
606    #[cfg(test)]
607    pub(crate) fn offset_bytes(&self) -> u8 {
608        self.compact_offsets.header.offset_bytes
609    }
610
611    #[cfg(test)]
612    pub(crate) fn offsets(&self) -> Vec<u32> {
613        self.compact_offsets.offsets()
614    }
615}
616/// FSST backing store for `LiquidByteViewArray` (in-memory or disk-only handle).
617pub trait FsstBacking: std::fmt::Debug + Clone + sealed::Sealed {
618    /// Get the uncompressed bytes of the FSST buffer (used for sizing / squeeze bookkeeping).
619    fn uncompressed_bytes(&self) -> usize;
620
621    /// Get the in-memory size of the FSST backing (raw bytes + any in-memory indices).
622    fn get_array_memory_size(&self) -> usize;
623}
624
625impl sealed::Sealed for FsstArray {}
626impl sealed::Sealed for DiskBuffer {}
627
628impl FsstArray {
629    pub(crate) fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
630        let offsets = self.compact_offsets.offsets();
631        self.raw
632            .to_uncompressed(&self.compressor.decompressor(), &offsets)
633    }
634
635    pub(crate) fn get_compressed_slice(&self, dict_index: usize) -> &[u8] {
636        let start_offset = self.compact_offsets.get_offset(dict_index);
637        let end_offset = self.compact_offsets.get_offset(dict_index + 1);
638        self.raw.get_compressed_slice(start_offset, end_offset)
639    }
640
641    /// Decompress the selected values into a buffer.
642    pub fn to_uncompressed_selected(&self, selected: &[usize]) -> (Buffer, OffsetBuffer<i32>) {
643        let decompressor = self.compressor.decompressor();
644        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes() + 8);
645        let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(selected.len());
646
647        for &dict_index in selected {
648            let start_offset = self.compact_offsets.get_offset(dict_index);
649            let end_offset = self.compact_offsets.get_offset(dict_index + 1);
650
651            let compressed_value = self.raw.get_compressed_slice(start_offset, end_offset);
652            let decompressed_len =
653                decompressor.decompress_into(compressed_value, value_buffer.spare_capacity_mut());
654            let new_len = value_buffer.len() + decompressed_len;
655            debug_assert!(new_len <= value_buffer.capacity());
656            unsafe {
657                value_buffer.set_len(new_len);
658            }
659            out_offsets.push_length(decompressed_len);
660        }
661
662        (Buffer::from(value_buffer), out_offsets.finish())
663    }
664}
665
666impl FsstBacking for FsstArray {
667    fn uncompressed_bytes(&self) -> usize {
668        self.raw.uncompressed_bytes()
669    }
670
671    fn get_array_memory_size(&self) -> usize {
672        self.raw.get_memory_size()
673            + self.compact_offsets.memory_usage()
674            + std::mem::size_of::<Self>()
675    }
676}
677
678/// Disk buffer for FSST buffer.
679#[derive(Clone)]
680pub struct DiskBuffer {
681    uncompressed_bytes: usize,
682    io: Arc<dyn SqueezeIoHandler>,
683    disk_range: Range<u64>,
684    compressor: Arc<Compressor>,
685}
686
687impl std::fmt::Debug for DiskBuffer {
688    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689        f.debug_struct("DiskBuffer")
690            .field("uncompressed_bytes", &self.uncompressed_bytes)
691            .field("disk_range", &self.disk_range)
692            .field("io", &self.io)
693            .field("compressor", &"<Compressor>")
694            .finish()
695    }
696}
697
698impl DiskBuffer {
699    pub(crate) fn new(
700        uncompressed_bytes: usize,
701        io: Arc<dyn SqueezeIoHandler>,
702        disk_range: Range<u64>,
703        compressor: Arc<Compressor>,
704    ) -> Self {
705        Self {
706            uncompressed_bytes,
707            io,
708            disk_range,
709            compressor,
710        }
711    }
712
713    pub(crate) fn squeeze_io(&self) -> &Arc<dyn SqueezeIoHandler> {
714        &self.io
715    }
716
717    pub(crate) fn disk_range(&self) -> Range<u64> {
718        self.disk_range.clone()
719    }
720
721    pub(crate) fn disk_range_len(&self) -> usize {
722        (self.disk_range.end - self.disk_range.start) as usize
723    }
724
725    pub(crate) fn compressor_arc(&self) -> Arc<Compressor> {
726        self.compressor.clone()
727    }
728}
729
730impl DiskBuffer {
731    pub(crate) async fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
732        let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
733        let byte_view =
734            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
735        byte_view.fsst_buffer.to_uncompressed()
736    }
737
738    pub(crate) async fn to_uncompressed_selected(
739        &self,
740        selected: &[usize],
741    ) -> (Buffer, OffsetBuffer<i32>) {
742        let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
743        let byte_view =
744            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
745        let total_count = byte_view.prefix_keys.len();
746        self.io
747            .tracing_decompress_count(selected.len(), total_count);
748        byte_view.fsst_buffer.to_uncompressed_selected(selected)
749    }
750}
751
752impl FsstBacking for DiskBuffer {
753    fn uncompressed_bytes(&self) -> usize {
754        self.uncompressed_bytes
755    }
756
757    fn get_array_memory_size(&self) -> usize {
758        0
759    }
760}
761
762impl CompactOffsets {
763    fn write_residuals(&self, out: &mut Vec<u8>) {
764        out.extend_from_slice(&self.header.slope.to_le_bytes());
765        out.extend_from_slice(&self.header.intercept.to_le_bytes());
766        out.push(self.header.offset_bytes);
767
768        match &self.residuals {
769            OffsetResiduals::One(residuals) => {
770                out.extend(residuals.iter().map(|r| *r as u8));
771            }
772            OffsetResiduals::Two(residuals) => {
773                for r in residuals.iter() {
774                    out.extend_from_slice(&r.to_le_bytes());
775                }
776            }
777            OffsetResiduals::Four(residuals) => {
778                for r in residuals.iter() {
779                    out.extend_from_slice(&r.to_le_bytes());
780                }
781            }
782        }
783    }
784}
785
786pub(crate) fn decode_compact_offsets(bytes: &[u8]) -> CompactOffsets {
787    if bytes.len() < 9 {
788        panic!("CompactOffsets requires at least 9 bytes for header");
789    }
790
791    let slope = i32::from_le_bytes(bytes[0..4].try_into().unwrap());
792    let intercept = i32::from_le_bytes(bytes[4..8].try_into().unwrap());
793    let offset_bytes = bytes[8] as usize;
794    if !matches!(offset_bytes, 1 | 2 | 4) {
795        panic!("Invalid offset_bytes value: {}", offset_bytes);
796    }
797
798    let header = CompactOffsetHeader {
799        slope,
800        intercept,
801        offset_bytes: offset_bytes as u8,
802    };
803
804    let payload = &bytes[9..];
805    if !payload.len().is_multiple_of(offset_bytes) {
806        panic!("Invalid payload size for CompactOffsets");
807    }
808    let count = payload.len() / offset_bytes;
809
810    match offset_bytes {
811        1 => {
812            let residuals: Arc<[i8]> = payload.iter().map(|b| *b as i8).collect::<Vec<_>>().into();
813            CompactOffsets {
814                header,
815                residuals: OffsetResiduals::One(residuals),
816            }
817        }
818        2 => {
819            let mut residuals = Vec::with_capacity(count);
820            for i in 0..count {
821                let base = i * 2;
822                residuals.push(i16::from_le_bytes(
823                    payload[base..base + 2].try_into().unwrap(),
824                ));
825            }
826            CompactOffsets {
827                header,
828                residuals: OffsetResiduals::Two(residuals.into()),
829            }
830        }
831        4 => {
832            let mut residuals = Vec::with_capacity(count);
833            for i in 0..count {
834                let base = i * 4;
835                residuals.push(i32::from_le_bytes(
836                    payload[base..base + 4].try_into().unwrap(),
837                ));
838            }
839            CompactOffsets {
840                header,
841                residuals: OffsetResiduals::Four(residuals.into()),
842            }
843        }
844        _ => unreachable!("validated offset_bytes"),
845    }
846}
847
848/// Saves symbol table from the compressor to a buffer.
849///
850/// Format:
851/// 1. The first byte is the length of the symbol table as a u8.
852/// 2. The next bytes are the lengths of each symbol as u8.
853/// 3. The next bytes are the symbols as u64.
854pub fn save_symbol_table(compressor: Arc<Compressor>, buffer: &mut Vec<u8>) -> Result<()> {
855    let symbols = compressor.symbol_table();
856    let symbols_lengths = compressor.symbol_lengths();
857
858    if symbols.len() != symbols_lengths.len() {
859        return Err(Error::new(
860            ErrorKind::InvalidInput,
861            "Symbol table and symbol lengths have different lengths",
862        ));
863    }
864
865    if symbols.len() > u8::MAX as usize {
866        return Err(Error::new(
867            ErrorKind::InvalidInput,
868            "Symbol table too large",
869        ));
870    }
871
872    buffer.push(symbols.len() as u8);
873
874    for &len in symbols_lengths.iter() {
875        buffer.push(len);
876    }
877
878    for sym in symbols.iter() {
879        buffer.extend_from_slice(&sym.to_u64().to_le_bytes());
880    }
881
882    Ok(())
883}
884
885/// Loads symbol table from a buffer saved by `save_symbol_table`.
886pub fn load_symbol_table(data: bytes::Bytes) -> Result<Compressor> {
887    if data.is_empty() {
888        return Err(Error::new(ErrorKind::InvalidInput, "Empty symbol table"));
889    }
890
891    let symbol_count = data[0] as usize;
892    let lengths_start = 1;
893    let lengths_end = lengths_start + symbol_count;
894    if lengths_end > data.len() {
895        return Err(Error::new(
896            ErrorKind::InvalidInput,
897            "Buffer too small for symbol lengths",
898        ));
899    }
900
901    let lengths = &data[lengths_start..lengths_end];
902    let symbols_start = lengths_end;
903    let symbols_end = symbols_start + symbol_count * SYMBOL_SIZE_BYTES;
904    if symbols_end > data.len() {
905        return Err(Error::new(
906            ErrorKind::InvalidInput,
907            "Buffer too small for symbols",
908        ));
909    }
910
911    let mut symbols = Vec::with_capacity(symbol_count);
912    for i in 0..symbol_count {
913        let base = symbols_start + i * SYMBOL_SIZE_BYTES;
914        let bytes: [u8; SYMBOL_SIZE_BYTES] =
915            data[base..base + SYMBOL_SIZE_BYTES].try_into().unwrap();
916        symbols.push(Symbol::from_slice(&bytes));
917    }
918
919    Ok(fsst::Compressor::rebuild_from(symbols, lengths))
920}
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925    use arrow::{
926        array::{Array, Decimal128Builder, StringBuilder},
927        datatypes::DataType,
928    };
929
930    #[test]
931    fn test_compact_offset_view_round_trip() {
932        // Test 1: Small offsets (should use OneByte variant)
933        let small_offsets = vec![100u32, 105, 110, 115];
934        test_round_trip(&small_offsets, "small offsets");
935
936        // Test 2: Medium offsets (should use TwoBytes variant)
937        let medium_offsets = vec![1000u32, 2000, 3000, 3500];
938        test_round_trip(&medium_offsets, "medium offsets");
939
940        // Test 3: Large offsets (should use FourBytes variant)
941        let large_offsets = vec![100000u32, 200000, 300000, 310000];
942        test_round_trip(&large_offsets, "large offsets");
943
944        // Test 4: Mixed scenario with varying prefix lengths
945        let mixed_offsets = vec![1000u32, 1010, 1020, 1030, 1040, 1050];
946        test_round_trip(&mixed_offsets, "mixed scenarios");
947
948        // Test 5: Edge case - empty values (single sentinel offset)
949        let empty_offsets: Vec<u32> = vec![0];
950        test_round_trip(&empty_offsets, "empty values");
951
952        // Test 6: Single value (one prefix, two offsets)
953        let single_offset = vec![42u32, 50];
954        test_round_trip(&single_offset, "single offset");
955    }
956
957    fn test_round_trip(offsets: &[u32], test_name: &str) {
958        let compact_offsets = CompactOffsets::from_offsets(offsets);
959
960        assert_eq!(
961            offsets.len(),
962            compact_offsets.len(),
963            "Length mismatch in {}",
964            test_name
965        );
966        for (i, offset) in offsets.iter().enumerate() {
967            assert_eq!(
968                compact_offsets.get_offset(i),
969                *offset,
970                "Offset mismatch at index {} in {}",
971                i,
972                test_name
973            );
974        }
975
976        let mut bytes = Vec::new();
977        compact_offsets.write_residuals(&mut bytes);
978        let reconstructed = decode_compact_offsets(&bytes);
979
980        assert_eq!(
981            offsets.len(),
982            reconstructed.len(),
983            "Reconstructed length mismatch in {}",
984            test_name
985        );
986        for (i, o) in offsets.iter().enumerate() {
987            assert_eq!(*o, reconstructed.get_offset(i));
988        }
989    }
990
991    #[test]
992    fn test_compact_offset_view_memory_efficiency() {
993        // test that compaction actually saves memory
994        let offsets = vec![1000u32, 1010, 1020, 1030, 1040];
995
996        let original_size = offsets.len() * std::mem::size_of::<u32>();
997        let compact_offsets = CompactOffsets::from_offsets(&offsets);
998        let compact_size = compact_offsets.memory_usage();
999
1000        // for this test case, we should see some savings due to using smaller residuals
1001        assert!(
1002            compact_size <= original_size,
1003            "Compact representation should not be larger"
1004        );
1005    }
1006
1007    #[test]
1008    fn test_compact_offset_view_struct_methods() {
1009        let key = PrefixKey::from_parts([1, 2, 3, 4, 5, 6, 7], 15);
1010        assert_eq!(key.prefix7(), &[1, 2, 3, 4, 5, 6, 7]);
1011        assert_eq!(key.len_byte(), 15);
1012        assert_eq!(key.known_suffix_len(), Some(15));
1013
1014        let unknown = PrefixKey::from_parts([7, 6, 5, 4, 3, 2, 1], 255);
1015        assert_eq!(unknown.len_byte(), 255);
1016        assert_eq!(unknown.known_suffix_len(), None);
1017
1018        let r1 = OffsetResiduals::One(vec![-42i8, 7].into());
1019        assert_eq!(r1.bytes_per(), 1);
1020        assert_eq!(r1.get_i32(0), -42);
1021
1022        let r2 = OffsetResiduals::Two(vec![12345i16].into());
1023        assert_eq!(r2.bytes_per(), 2);
1024        assert_eq!(r2.get_i32(0), 12345);
1025
1026        let r4 = OffsetResiduals::Four(vec![-1000000i32].into());
1027        assert_eq!(r4.bytes_per(), 4);
1028        assert_eq!(r4.get_i32(0), -1000000);
1029
1030        assert_eq!(PrefixKey::prefix_len(), 7);
1031    }
1032
1033    #[test]
1034    fn test_compact_offset_view_group_from_bytes_errors() {
1035        // Test with insufficient bytes for header
1036        let short_bytes = vec![1, 2, 3]; // only 3 bytes, need at least 9
1037        let result = std::panic::catch_unwind(|| decode_compact_offsets(&short_bytes));
1038        assert!(result.is_err(), "Should panic with insufficient bytes");
1039
1040        // Test with invalid offset_bytes value
1041        let mut invalid_header = vec![0; 9];
1042        invalid_header[8] = 3; // invalid offset_bytes (should be 1, 2, or 4)
1043        let result = std::panic::catch_unwind(|| decode_compact_offsets(&invalid_header));
1044        assert!(result.is_err(), "Should panic with invalid offset_bytes");
1045
1046        // Test with misaligned residual data for TwoBytes variant
1047        let mut misaligned_two_bytes = vec![0; 9 + 1]; // header + incomplete residual
1048        misaligned_two_bytes[8] = 2; // offset_bytes = 2
1049        let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_two_bytes));
1050        assert!(
1051            result.is_err(),
1052            "Should panic with misaligned TwoBytes residuals"
1053        );
1054
1055        // Test with misaligned residual data for FourBytes variant
1056        let mut misaligned_four_bytes = vec![0; 9 + 2]; // header + incomplete residual
1057        misaligned_four_bytes[8] = 4; // offset_bytes = 4
1058        let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_four_bytes));
1059        assert!(
1060            result.is_err(),
1061            "Should panic with misaligned FourBytes residuals"
1062        );
1063    }
1064
1065    #[test]
1066    fn test_compact_offset_view_group_from_bytes_valid() {
1067        // Test OneByte variant roundtrip
1068        let offsets = vec![100u32, 101, 105];
1069        let original = CompactOffsets::from_offsets(&offsets);
1070
1071        let mut bytes = Vec::new();
1072        original.write_residuals(&mut bytes);
1073        let reconstructed = decode_compact_offsets(&bytes);
1074
1075        // Verify they match
1076        assert_eq!(offsets.len(), reconstructed.len());
1077        for (i, o) in offsets.iter().enumerate() {
1078            assert_eq!(*o, reconstructed.get_offset(i));
1079        }
1080    }
1081
1082    #[test]
1083    fn test_fsst_buffer_bytes_roundtrip() {
1084        let mut builder = StringBuilder::new();
1085        for i in 0..1000 {
1086            builder.append_value(format!("test string value {i}"));
1087        }
1088        let original = builder.finish();
1089
1090        let compressor =
1091            FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1092        let compressor_arc = Arc::new(compressor);
1093        let original_fsst =
1094            FsstArray::from_byte_array_with_compressor(&original, compressor_arc.clone());
1095
1096        let mut buffer = Vec::new();
1097        original_fsst.to_bytes(&mut buffer);
1098
1099        let bytes = bytes::Bytes::from(buffer);
1100        let deserialized = FsstArray::from_bytes(bytes, compressor_arc);
1101
1102        let original_strings = original_fsst.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1103        let deserialized_strings = deserialized.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1104        assert_eq!(original_strings.len(), deserialized_strings.len());
1105        for (orig, deser) in original_strings.iter().zip(deserialized_strings.iter()) {
1106            assert_eq!(orig, deser);
1107        }
1108    }
1109
1110    #[test]
1111    fn test_decimal_compression_smoke() {
1112        let mut builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(10, 2));
1113        for i in 0..4096 {
1114            builder.append_value(i128::from_le_bytes([(i % 16) as u8; 16]));
1115        }
1116        let original = builder.finish();
1117        let original_size = original.get_array_memory_size();
1118
1119        let values = original
1120            .iter()
1121            .filter_map(|v| v.map(|v| v.to_le_bytes()))
1122            .collect::<Vec<_>>();
1123        let compressor = FsstArray::train_compressor(values.iter().map(|b| b.as_slice()));
1124        let compressor_arc = Arc::new(compressor);
1125
1126        let fsst = FsstArray::from_decimal128_array_with_compressor(&original, compressor_arc);
1127        let compressed_size = fsst.get_array_memory_size();
1128        assert!(compressed_size < original_size);
1129    }
1130
1131    #[test]
1132    fn test_save_and_load_symbol_table_roundtrip() {
1133        let mut builder = StringBuilder::new();
1134        for i in 0..1000 {
1135            builder.append_value(format!("hello world {i}"));
1136        }
1137        let original = builder.finish();
1138
1139        let compressor =
1140            FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1141        let compressor_arc = Arc::new(compressor);
1142
1143        let mut bytes = Vec::new();
1144        save_symbol_table(compressor_arc.clone(), &mut bytes).unwrap();
1145        let reloaded = load_symbol_table(bytes::Bytes::from(bytes)).unwrap();
1146
1147        let fsst_original = FsstArray::from_byte_array_with_compressor(&original, compressor_arc);
1148        let fsst_reloaded =
1149            FsstArray::from_byte_array_with_compressor(&original, Arc::new(reloaded));
1150
1151        let a = fsst_original.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1152        let b = fsst_reloaded.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1153        assert_eq!(a, b);
1154    }
1155}