Skip to main content

liquid_cache/liquid_array/raw/
fsst_buffer.rs

1use arrow::{
2    array::{
3        ArrayDataBuilder, Decimal128Array, Decimal256Array, GenericByteArray, OffsetBufferBuilder,
4    },
5    buffer::{Buffer, OffsetBuffer},
6    datatypes::ByteArrayType,
7};
8use bytes;
9use fsst::{Compressor, Decompressor, Symbol};
10use std::io::Result;
11use std::io::{Error, ErrorKind};
12use std::ops::Range;
13use std::sync::Arc;
14
15use crate::liquid_array::fix_len_byte_array::ArrowFixedLenByteArrayType;
16use crate::liquid_array::{LiquidByteViewArray, SqueezeIoHandler};
17
18mod sealed {
19    pub trait Sealed {}
20}
21
22/// Raw FSST buffer that stores compressed data using Arrow Buffer.
23/// Offsets are managed externally as a `u32` slice (including the final sentinel offset).
24#[derive(Clone)]
25pub(crate) struct RawFsstBuffer {
26    values: Buffer,
27    uncompressed_bytes: usize,
28}
29
30impl std::fmt::Debug for RawFsstBuffer {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("RawFsstBuffer")
33            .field("values_len", &self.values.len())
34            .field("uncompressed_bytes", &self.uncompressed_bytes)
35            .finish()
36    }
37}
38
39impl RawFsstBuffer {
40    pub(crate) fn from_parts(values: Buffer, uncompressed_bytes: usize) -> Self {
41        Self {
42            values,
43            uncompressed_bytes,
44        }
45    }
46
47    /// Create RawFsstBuffer from an iterator of byte slices.
48    /// Returns the buffer and a vector of byte offsets (including the final sentinel).
49    pub(crate) fn from_byte_slices<I, T>(
50        iter: I,
51        compressor: Arc<Compressor>,
52        compress_buffer: &mut Vec<u8>,
53    ) -> (Self, Vec<u32>)
54    where
55        I: Iterator<Item = Option<T>>,
56        T: AsRef<[u8]>,
57    {
58        let mut values_buffer = Vec::new();
59        let mut offsets = Vec::new();
60        let mut uncompressed_len = 0;
61
62        offsets.push(0u32);
63        for item in iter {
64            if let Some(bytes) = item {
65                let bytes = bytes.as_ref();
66                uncompressed_len += bytes.len();
67
68                compress_buffer.clear();
69                // `fsst::Compressor::compress_into` requires capacity for the worst-case expansion
70                // (all bytes escaped) which is `2 * plaintext_len`.
71                compress_buffer.reserve(bytes.len().saturating_mul(2));
72                unsafe {
73                    compressor.compress_into(bytes, compress_buffer);
74                }
75
76                values_buffer.extend_from_slice(compress_buffer);
77            }
78            offsets.push(values_buffer.len() as u32);
79        }
80
81        values_buffer.shrink_to_fit();
82        let values_buffer = Buffer::from(values_buffer);
83        let raw_buffer = Self::from_parts(values_buffer, uncompressed_len);
84
85        (raw_buffer, offsets)
86    }
87
88    pub(crate) fn to_uncompressed(
89        &self,
90        decompressor: &Decompressor<'_>,
91        offsets: &[u32],
92    ) -> (Buffer, OffsetBuffer<i32>) {
93        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes + 8);
94        let num_values = offsets.len().saturating_sub(1);
95        let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(num_values);
96
97        for i in 0..num_values {
98            let start_offset = offsets[i];
99            let end_offset = offsets[i + 1];
100
101            if start_offset != end_offset {
102                let compressed_slice = self.get_compressed_slice(start_offset, end_offset);
103                let decompressed_len = decompressor
104                    .decompress_into(compressed_slice, value_buffer.spare_capacity_mut());
105
106                let new_len = value_buffer.len() + decompressed_len;
107                debug_assert!(new_len <= value_buffer.capacity());
108                unsafe {
109                    value_buffer.set_len(new_len);
110                }
111                out_offsets.push_length(decompressed_len);
112            } else {
113                out_offsets.push_length(0);
114            }
115        }
116
117        let buffer = Buffer::from(value_buffer);
118        (buffer, out_offsets.finish())
119    }
120
121    /// Get compressed data slice using byte offsets.
122    pub(crate) fn get_compressed_slice(&self, start_offset: u32, end_offset: u32) -> &[u8] {
123        let start = start_offset as usize;
124        let end = end_offset as usize;
125        debug_assert!(end <= self.values.len(), "Offset out of bounds");
126        debug_assert!(start <= end, "Invalid offset range");
127        &self.values.as_slice()[start..end]
128    }
129
130    pub(crate) fn values_len(&self) -> usize {
131        self.values.len()
132    }
133
134    pub(crate) fn get_memory_size(&self) -> usize {
135        self.values.len() + std::mem::size_of::<Self>()
136    }
137
138    pub(crate) fn to_bytes(&self) -> Vec<u8> {
139        let mut buffer = Vec::with_capacity(self.values.len() + 12);
140        buffer.extend_from_slice(&(self.uncompressed_bytes as u64).to_le_bytes());
141        buffer.extend_from_slice(&(self.values.len() as u32).to_le_bytes());
142        buffer.extend_from_slice(self.values.as_slice());
143        buffer
144    }
145
146    pub(crate) fn uncompressed_bytes(&self) -> usize {
147        self.uncompressed_bytes
148    }
149
150    pub(crate) fn from_bytes(bytes: bytes::Bytes) -> Self {
151        let uncompressed_bytes = u64::from_le_bytes(bytes[0..8].try_into().unwrap()) as usize;
152        let values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
153        let values = bytes.slice(12..12 + values_len);
154        let values = Buffer::from(values);
155        Self::from_parts(values, uncompressed_bytes)
156    }
157}
158
159/// PrefixKey stores a small suffix fingerprint (prefix bytes + length metadata).
160#[derive(Debug, Clone, Copy)]
161#[repr(C)]
162pub(crate) struct PrefixKey {
163    prefix7: [u8; 7],
164    /// Suffix length in bytes (after shared prefix), or 255 if >= 255 / unknown.
165    len: u8,
166}
167
168impl PrefixKey {
169    pub(crate) const fn prefix_len() -> usize {
170        7
171    }
172
173    /// Construct from the full suffix bytes (after shared prefix).
174    /// Embeds up to `prefix_len()` bytes into `prefix7` and stores length (or 255 if >=255).
175    pub(crate) fn new(suffix_bytes: &[u8]) -> Self {
176        let mut prefix7 = [0u8; 7];
177        let copy_len = std::cmp::min(Self::prefix_len(), suffix_bytes.len());
178        if copy_len > 0 {
179            prefix7[..copy_len].copy_from_slice(&suffix_bytes[..copy_len]);
180        }
181        let len = if suffix_bytes.len() >= 255 {
182            255u8
183        } else {
184            suffix_bytes.len() as u8
185        };
186        Self { prefix7, len }
187    }
188
189    /// Construct directly from stored parts (used by deserialization only)
190    pub(crate) fn from_parts(prefix7: [u8; 7], len: u8) -> Self {
191        Self { prefix7, len }
192    }
193
194    #[inline]
195    pub(crate) fn prefix7(&self) -> &[u8; 7] {
196        &self.prefix7
197    }
198
199    #[inline]
200    pub(crate) fn len_byte(&self) -> u8 {
201        self.len
202    }
203
204    #[cfg(test)]
205    pub(crate) fn known_suffix_len(&self) -> Option<usize> {
206        if self.len == 255 {
207            None
208        } else {
209            Some(self.len as usize)
210        }
211    }
212}
213
214const _: () = if std::mem::size_of::<PrefixKey>() != 8 {
215    panic!("PrefixKey must be 8 bytes")
216};
217
218#[derive(Debug, Clone, Copy)]
219struct CompactOffsetHeader {
220    slope: i32,
221    intercept: i32,
222    offset_bytes: u8, // 1, 2, or 4 bytes per residual
223}
224
225#[derive(Debug, Clone)]
226enum OffsetResiduals {
227    One(Arc<[i8]>),
228    Two(Arc<[i16]>),
229    Four(Arc<[i32]>),
230}
231
232impl OffsetResiduals {
233    fn len(&self) -> usize {
234        match self {
235            Self::One(values) => values.len(),
236            Self::Two(values) => values.len(),
237            Self::Four(values) => values.len(),
238        }
239    }
240
241    #[cfg(test)]
242    fn bytes_per(&self) -> usize {
243        match self {
244            Self::One(_) => 1,
245            Self::Two(_) => 2,
246            Self::Four(_) => 4,
247        }
248    }
249
250    fn get_i32(&self, index: usize) -> i32 {
251        match self {
252            Self::One(values) => values[index] as i32,
253            Self::Two(values) => values[index] as i32,
254            Self::Four(values) => values[index],
255        }
256    }
257}
258
259/// Compact offset index for FSST dictionary values (includes the final sentinel offset).
260#[derive(Debug, Clone)]
261pub(crate) struct CompactOffsets {
262    header: CompactOffsetHeader,
263    residuals: OffsetResiduals,
264}
265
266// Proper least-squares linear regression
267fn fit_line(offsets: &[u32]) -> (i32, i32) {
268    let n = offsets.len();
269    if n <= 1 {
270        return (0, offsets.first().copied().unwrap_or(0) as i32);
271    }
272
273    let n_f64 = n as f64;
274
275    // Sum of indices: 0 + 1 + 2 + ... + (n-1) = n*(n-1)/2
276    let sum_x = (n * (n - 1) / 2) as f64;
277
278    // Sum of offsets
279    let sum_y: f64 = offsets.iter().map(|&o| o as f64).sum();
280
281    // Sum of (index * offset)
282    let sum_xy: f64 = offsets
283        .iter()
284        .enumerate()
285        .map(|(i, &o)| i as f64 * o as f64)
286        .sum();
287
288    // Sum of index squared: 0² + 1² + 2² + ... + (n-1)² = n*(n-1)*(2n-1)/6
289    let sum_x_sq = (n * (n - 1) * (2 * n - 1) / 6) as f64;
290
291    // Least squares formulas
292    let slope = (n_f64 * sum_xy - sum_x * sum_y) / (n_f64 * sum_x_sq - sum_x * sum_x);
293    let intercept = (sum_y - slope * sum_x) / n_f64;
294
295    (slope.round() as i32, intercept.round() as i32)
296}
297
298impl CompactOffsets {
299    pub(crate) fn from_offsets(offsets: &[u32]) -> Self {
300        if offsets.is_empty() {
301            return Self {
302                header: CompactOffsetHeader {
303                    slope: 0,
304                    intercept: 0,
305                    offset_bytes: 1,
306                },
307                residuals: OffsetResiduals::One(Arc::new([])),
308            };
309        }
310
311        let (slope, intercept) = fit_line(offsets);
312
313        let mut offset_residuals: Vec<i32> = Vec::with_capacity(offsets.len());
314        let mut min_residual = i32::MAX;
315        let mut max_residual = i32::MIN;
316        for (index, &offset) in offsets.iter().enumerate() {
317            let predicted = slope * index as i32 + intercept;
318            let residual = offset as i32 - predicted;
319            offset_residuals.push(residual);
320            min_residual = min_residual.min(residual);
321            max_residual = max_residual.max(residual);
322        }
323
324        let offset_bytes = if min_residual >= i8::MIN as i32 && max_residual <= i8::MAX as i32 {
325            1
326        } else if min_residual >= i16::MIN as i32 && max_residual <= i16::MAX as i32 {
327            2
328        } else {
329            4
330        };
331
332        let residuals = match offset_bytes {
333            1 => OffsetResiduals::One(
334                offset_residuals
335                    .iter()
336                    .map(|&r| r as i8)
337                    .collect::<Vec<_>>()
338                    .into(),
339            ),
340            2 => OffsetResiduals::Two(
341                offset_residuals
342                    .iter()
343                    .map(|&r| r as i16)
344                    .collect::<Vec<_>>()
345                    .into(),
346            ),
347            4 => OffsetResiduals::Four(offset_residuals.into()),
348            _ => unreachable!("offset_bytes must be 1, 2, or 4"),
349        };
350
351        Self {
352            header: CompactOffsetHeader {
353                slope,
354                intercept,
355                offset_bytes,
356            },
357            residuals,
358        }
359    }
360
361    pub(crate) fn len(&self) -> usize {
362        self.residuals.len()
363    }
364
365    pub(crate) fn get_offset(&self, index: usize) -> u32 {
366        let predicted = self.header.slope * index as i32 + self.header.intercept;
367        (predicted + self.residuals.get_i32(index)) as u32
368    }
369
370    pub(crate) fn offsets(&self) -> Vec<u32> {
371        (0..self.len()).map(|i| self.get_offset(i)).collect()
372    }
373
374    pub(crate) fn memory_usage(&self) -> usize {
375        let header_size = std::mem::size_of::<CompactOffsetHeader>();
376        let residuals_size = match &self.residuals {
377            OffsetResiduals::One(values) => values.len() * std::mem::size_of::<i8>(),
378            OffsetResiduals::Two(values) => values.len() * std::mem::size_of::<i16>(),
379            OffsetResiduals::Four(values) => values.len() * std::mem::size_of::<i32>(),
380        };
381        header_size + residuals_size
382    }
383}
384
385pub(crate) fn empty_compact_offsets() -> CompactOffsets {
386    CompactOffsets::from_offsets(&[])
387}
388
389const SYMBOL_SIZE_BYTES: usize = std::mem::size_of::<Symbol>();
390
391pub(crate) fn train_compressor<'a, I>(iter: I) -> Compressor
392where
393    I: Iterator<Item = &'a [u8]>,
394{
395    let strings: Vec<&[u8]> = iter.collect();
396    fsst::Compressor::train(&strings)
397}
398
399/// In-memory FSST dictionary buffer that bundles compressed bytes, compact offsets, and the
400/// compressor needed to (de)compress values.
401#[derive(Clone)]
402pub struct FsstArray {
403    compressor: Arc<Compressor>,
404    raw: Arc<RawFsstBuffer>,
405    compact_offsets: CompactOffsets,
406}
407
408impl std::fmt::Debug for FsstArray {
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("FsstBuffer")
411            .field("raw", &self.raw)
412            .field("compact_offsets", &"<CompactOffsets>")
413            .field("compressor", &"<Compressor>")
414            .finish()
415    }
416}
417
418impl FsstArray {
419    pub(crate) fn new(
420        raw: Arc<RawFsstBuffer>,
421        compact_offsets: CompactOffsets,
422        compressor: Arc<Compressor>,
423    ) -> Self {
424        Self {
425            compressor,
426            raw,
427            compact_offsets,
428        }
429    }
430
431    pub(crate) fn from_byte_offsets(
432        raw: Arc<RawFsstBuffer>,
433        byte_offsets: &[u32],
434        compressor: Arc<Compressor>,
435    ) -> Self {
436        Self::new(raw, CompactOffsets::from_offsets(byte_offsets), compressor)
437    }
438
439    pub(crate) fn raw_to_bytes(&self) -> Vec<u8> {
440        self.raw.to_bytes()
441    }
442
443    pub(crate) fn write_compact_offsets(&self, out: &mut Vec<u8>) {
444        self.compact_offsets.write_residuals(out)
445    }
446
447    /// Trains a compressor on a sequence of strings.
448    pub fn train_compressor<'a>(input: impl Iterator<Item = &'a [u8]>) -> Compressor {
449        train_compressor(input)
450    }
451
452    /// Creates a new FSST buffer from a GenericByteArray and a compressor.
453    pub fn from_byte_array_with_compressor<T: ByteArrayType>(
454        input: &GenericByteArray<T>,
455        compressor: Arc<Compressor>,
456    ) -> Self {
457        let iter = input.iter();
458        let mut compress_buffer = Vec::with_capacity(2 * 1024 * 1024);
459        let (raw, offsets) =
460            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
461        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
462    }
463
464    /// Creates a new FSST buffer from a Decimal128Array and a compressor.
465    pub fn from_decimal128_array_with_compressor(
466        array: &Decimal128Array,
467        compressor: Arc<Compressor>,
468    ) -> Self {
469        let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
470        let mut compress_buffer = Vec::with_capacity(64);
471        let (raw, offsets) =
472            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
473        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
474    }
475
476    /// Creates a new FSST buffer from a Decimal256Array and a compressor.
477    pub fn from_decimal256_array_with_compressor(
478        array: &Decimal256Array,
479        compressor: Arc<Compressor>,
480    ) -> Self {
481        let iter = array.iter().map(|v| v.map(|v| v.to_le_bytes()));
482        let mut compress_buffer = Vec::with_capacity(128);
483        let (raw, offsets) =
484            RawFsstBuffer::from_byte_slices(iter, compressor.clone(), &mut compress_buffer);
485        Self::from_byte_offsets(Arc::new(raw), &offsets, compressor)
486    }
487
488    /// Returns the uncompressed byte size of this buffer.
489    pub fn uncompressed_bytes(&self) -> usize {
490        <Self as FsstBacking>::uncompressed_bytes(self)
491    }
492
493    /// Returns the in-memory size of this buffer.
494    pub fn get_array_memory_size(&self) -> usize {
495        <Self as FsstBacking>::get_array_memory_size(self)
496    }
497
498    /// Returns the number of values in this buffer.
499    #[allow(clippy::len_without_is_empty)]
500    pub fn len(&self) -> usize {
501        self.compact_offsets.len().saturating_sub(1)
502    }
503
504    /// Returns a decompressor for this buffer.
505    pub fn decompressor(&self) -> Decompressor<'_> {
506        self.compressor.decompressor()
507    }
508
509    /// Returns a reference to the compressor.
510    pub fn compressor(&self) -> &Compressor {
511        &self.compressor
512    }
513
514    /// Returns a clone of the shared compressor.
515    pub fn compressor_arc(&self) -> Arc<Compressor> {
516        self.compressor.clone()
517    }
518
519    /// Serializes this FSST buffer (raw bytes + compact offsets) to `out`.
520    pub fn to_bytes(&self, out: &mut Vec<u8>) {
521        out.extend_from_slice(&self.raw.to_bytes());
522        self.compact_offsets.write_residuals(out);
523    }
524
525    /// Deserializes a FSST buffer from the `to_bytes()` format.
526    pub fn from_bytes(bytes: bytes::Bytes, compressor: Arc<Compressor>) -> Self {
527        if bytes.len() < 12 {
528            panic!("Input buffer too small for RawFsstBuffer header");
529        }
530
531        let raw_values_len = u32::from_le_bytes(bytes[8..12].try_into().unwrap()) as usize;
532        let raw_len = 12 + raw_values_len;
533        if raw_len > bytes.len() {
534            panic!("RawFsstBuffer extends beyond input buffer");
535        }
536
537        let raw = RawFsstBuffer::from_bytes(bytes.slice(0..raw_len));
538        let compact = decode_compact_offsets(&bytes[raw_len..]);
539
540        if compact.len() > 0 {
541            let last = compact.get_offset(compact.len().saturating_sub(1)) as usize;
542            debug_assert_eq!(
543                last,
544                raw.values_len(),
545                "offsets must end at raw values length"
546            );
547        }
548
549        Self::new(Arc::new(raw), compact, compressor)
550    }
551
552    /// Decompress all values into an Arrow byte array.
553    pub fn to_arrow_byte_array<T: ByteArrayType<Offset = i32>>(&self) -> GenericByteArray<T> {
554        let (value_buffer, offsets) = self.to_uncompressed();
555        unsafe { GenericByteArray::<T>::new_unchecked(offsets, value_buffer, None) }
556    }
557
558    fn decompress_as_fixed_size_binary(&self, value_width: usize) -> Vec<u8> {
559        let decompressor = self.compressor.decompressor();
560        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.len() * value_width + 8);
561
562        for i in 0..self.len() {
563            let compressed = self.get_compressed_slice(i);
564            let required = decompressor.max_decompression_capacity(compressed) + 8;
565            value_buffer.reserve(required);
566            let len = decompressor.decompress_into(compressed, value_buffer.spare_capacity_mut());
567            debug_assert!(len == value_width);
568            let new_len = value_buffer.len() + len;
569            unsafe {
570                value_buffer.set_len(new_len);
571            }
572        }
573        value_buffer
574    }
575
576    fn to_decimal_array_inner(&self, data_type: &ArrowFixedLenByteArrayType) -> Buffer {
577        let value_width = data_type.value_width();
578        Buffer::from(self.decompress_as_fixed_size_binary(value_width))
579    }
580
581    /// Converts this FSST buffer to a Decimal128Array.
582    pub fn to_decimal128_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal128Array {
583        let value_buffer = self.to_decimal_array_inner(data_type);
584        let array_builder = ArrayDataBuilder::new(data_type.into())
585            .len(self.len())
586            .add_buffer(value_buffer);
587        let array_data = unsafe { array_builder.build_unchecked() };
588        Decimal128Array::from(array_data)
589    }
590
591    /// Converts this FSST buffer to a Decimal256Array.
592    pub fn to_decimal256_array(&self, data_type: &ArrowFixedLenByteArrayType) -> Decimal256Array {
593        let value_buffer = self.to_decimal_array_inner(data_type);
594        let array_builder = ArrayDataBuilder::new(data_type.into())
595            .len(self.len())
596            .add_buffer(value_buffer);
597        let array_data = unsafe { array_builder.build_unchecked() };
598        Decimal256Array::from(array_data)
599    }
600
601    #[cfg(test)]
602    pub(crate) fn offsets_len(&self) -> usize {
603        self.compact_offsets.len()
604    }
605
606    #[cfg(test)]
607    pub(crate) fn offset_bytes(&self) -> u8 {
608        self.compact_offsets.header.offset_bytes
609    }
610
611    #[cfg(test)]
612    pub(crate) fn offsets(&self) -> Vec<u32> {
613        self.compact_offsets.offsets()
614    }
615}
616/// FSST backing store for `LiquidByteViewArray` (in-memory or disk-only handle).
617pub trait FsstBacking: std::fmt::Debug + Clone + sealed::Sealed {
618    /// Get the uncompressed bytes of the FSST buffer (used for sizing / squeeze bookkeeping).
619    fn uncompressed_bytes(&self) -> usize;
620
621    /// Get the in-memory size of the FSST backing (raw bytes + any in-memory indices).
622    fn get_array_memory_size(&self) -> usize;
623}
624
625impl sealed::Sealed for FsstArray {}
626impl sealed::Sealed for DiskBuffer {}
627
628impl FsstArray {
629    pub(crate) fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
630        let offsets = self.compact_offsets.offsets();
631        self.raw
632            .to_uncompressed(&self.compressor.decompressor(), &offsets)
633    }
634
635    pub(crate) fn get_compressed_slice(&self, dict_index: usize) -> &[u8] {
636        let start_offset = self.compact_offsets.get_offset(dict_index);
637        let end_offset = self.compact_offsets.get_offset(dict_index + 1);
638        self.raw.get_compressed_slice(start_offset, end_offset)
639    }
640
641    /// Decompress the selected values into a buffer.
642    pub fn to_uncompressed_selected(&self, selected: &[usize]) -> (Buffer, OffsetBuffer<i32>) {
643        let decompressor = self.compressor.decompressor();
644        let mut value_buffer: Vec<u8> = Vec::with_capacity(self.uncompressed_bytes() + 8);
645        let mut out_offsets: OffsetBufferBuilder<i32> = OffsetBufferBuilder::new(selected.len());
646
647        for &dict_index in selected {
648            let start_offset = self.compact_offsets.get_offset(dict_index);
649            let end_offset = self.compact_offsets.get_offset(dict_index + 1);
650
651            let compressed_value = self.raw.get_compressed_slice(start_offset, end_offset);
652            let decompressed_len =
653                decompressor.decompress_into(compressed_value, value_buffer.spare_capacity_mut());
654            let new_len = value_buffer.len() + decompressed_len;
655            debug_assert!(new_len <= value_buffer.capacity());
656            unsafe {
657                value_buffer.set_len(new_len);
658            }
659            out_offsets.push_length(decompressed_len);
660        }
661
662        (Buffer::from(value_buffer), out_offsets.finish())
663    }
664}
665
666impl FsstBacking for FsstArray {
667    fn uncompressed_bytes(&self) -> usize {
668        self.raw.uncompressed_bytes()
669    }
670
671    fn get_array_memory_size(&self) -> usize {
672        self.raw.get_memory_size()
673            + self.compact_offsets.memory_usage()
674            + std::mem::size_of::<Self>()
675    }
676}
677
678/// Disk buffer for FSST buffer.
679#[derive(Clone)]
680pub struct DiskBuffer {
681    uncompressed_bytes: usize,
682    io: Arc<dyn SqueezeIoHandler>,
683    disk_range: Range<u64>,
684    compressor: Arc<Compressor>,
685}
686
687impl std::fmt::Debug for DiskBuffer {
688    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
689        f.debug_struct("DiskBuffer")
690            .field("uncompressed_bytes", &self.uncompressed_bytes)
691            .field("disk_range", &self.disk_range)
692            .field("io", &self.io)
693            .field("compressor", &"<Compressor>")
694            .finish()
695    }
696}
697
698impl DiskBuffer {
699    pub(crate) fn new(
700        uncompressed_bytes: usize,
701        io: Arc<dyn SqueezeIoHandler>,
702        disk_range: Range<u64>,
703        compressor: Arc<Compressor>,
704    ) -> Self {
705        Self {
706            uncompressed_bytes,
707            io,
708            disk_range,
709            compressor,
710        }
711    }
712
713    pub(crate) fn squeeze_io(&self) -> &Arc<dyn SqueezeIoHandler> {
714        &self.io
715    }
716
717    pub(crate) fn disk_range(&self) -> Range<u64> {
718        self.disk_range.clone()
719    }
720
721    pub(crate) fn compressor_arc(&self) -> Arc<Compressor> {
722        self.compressor.clone()
723    }
724}
725
726impl DiskBuffer {
727    pub(crate) async fn to_uncompressed(&self) -> (Buffer, OffsetBuffer<i32>) {
728        let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
729        let byte_view =
730            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
731        byte_view.fsst_buffer.to_uncompressed()
732    }
733
734    pub(crate) async fn to_uncompressed_selected(
735        &self,
736        selected: &[usize],
737    ) -> (Buffer, OffsetBuffer<i32>) {
738        let bytes = self.io.read(Some(self.disk_range.clone())).await.unwrap();
739        let byte_view =
740            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.compressor.clone());
741        let total_count = byte_view.prefix_keys.len();
742        self.io
743            .tracing_decompress_count(selected.len(), total_count);
744        byte_view.fsst_buffer.to_uncompressed_selected(selected)
745    }
746}
747
748impl FsstBacking for DiskBuffer {
749    fn uncompressed_bytes(&self) -> usize {
750        self.uncompressed_bytes
751    }
752
753    fn get_array_memory_size(&self) -> usize {
754        0
755    }
756}
757
758impl CompactOffsets {
759    fn write_residuals(&self, out: &mut Vec<u8>) {
760        out.extend_from_slice(&self.header.slope.to_le_bytes());
761        out.extend_from_slice(&self.header.intercept.to_le_bytes());
762        out.push(self.header.offset_bytes);
763
764        match &self.residuals {
765            OffsetResiduals::One(residuals) => {
766                out.extend(residuals.iter().map(|r| *r as u8));
767            }
768            OffsetResiduals::Two(residuals) => {
769                for r in residuals.iter() {
770                    out.extend_from_slice(&r.to_le_bytes());
771                }
772            }
773            OffsetResiduals::Four(residuals) => {
774                for r in residuals.iter() {
775                    out.extend_from_slice(&r.to_le_bytes());
776                }
777            }
778        }
779    }
780}
781
782pub(crate) fn decode_compact_offsets(bytes: &[u8]) -> CompactOffsets {
783    if bytes.len() < 9 {
784        panic!("CompactOffsets requires at least 9 bytes for header");
785    }
786
787    let slope = i32::from_le_bytes(bytes[0..4].try_into().unwrap());
788    let intercept = i32::from_le_bytes(bytes[4..8].try_into().unwrap());
789    let offset_bytes = bytes[8] as usize;
790    if !matches!(offset_bytes, 1 | 2 | 4) {
791        panic!("Invalid offset_bytes value: {}", offset_bytes);
792    }
793
794    let header = CompactOffsetHeader {
795        slope,
796        intercept,
797        offset_bytes: offset_bytes as u8,
798    };
799
800    let payload = &bytes[9..];
801    if !payload.len().is_multiple_of(offset_bytes) {
802        panic!("Invalid payload size for CompactOffsets");
803    }
804    let count = payload.len() / offset_bytes;
805
806    match offset_bytes {
807        1 => {
808            let residuals: Arc<[i8]> = payload.iter().map(|b| *b as i8).collect::<Vec<_>>().into();
809            CompactOffsets {
810                header,
811                residuals: OffsetResiduals::One(residuals),
812            }
813        }
814        2 => {
815            let mut residuals = Vec::with_capacity(count);
816            for i in 0..count {
817                let base = i * 2;
818                residuals.push(i16::from_le_bytes(
819                    payload[base..base + 2].try_into().unwrap(),
820                ));
821            }
822            CompactOffsets {
823                header,
824                residuals: OffsetResiduals::Two(residuals.into()),
825            }
826        }
827        4 => {
828            let mut residuals = Vec::with_capacity(count);
829            for i in 0..count {
830                let base = i * 4;
831                residuals.push(i32::from_le_bytes(
832                    payload[base..base + 4].try_into().unwrap(),
833                ));
834            }
835            CompactOffsets {
836                header,
837                residuals: OffsetResiduals::Four(residuals.into()),
838            }
839        }
840        _ => unreachable!("validated offset_bytes"),
841    }
842}
843
844/// Saves symbol table from the compressor to a buffer.
845///
846/// Format:
847/// 1. The first byte is the length of the symbol table as a u8.
848/// 2. The next bytes are the lengths of each symbol as u8.
849/// 3. The next bytes are the symbols as u64.
850pub fn save_symbol_table(compressor: Arc<Compressor>, buffer: &mut Vec<u8>) -> Result<()> {
851    let symbols = compressor.symbol_table();
852    let symbols_lengths = compressor.symbol_lengths();
853
854    if symbols.len() != symbols_lengths.len() {
855        return Err(Error::new(
856            ErrorKind::InvalidInput,
857            "Symbol table and symbol lengths have different lengths",
858        ));
859    }
860
861    if symbols.len() > u8::MAX as usize {
862        return Err(Error::new(
863            ErrorKind::InvalidInput,
864            "Symbol table too large",
865        ));
866    }
867
868    buffer.push(symbols.len() as u8);
869
870    for &len in symbols_lengths.iter() {
871        buffer.push(len);
872    }
873
874    for sym in symbols.iter() {
875        buffer.extend_from_slice(&sym.to_u64().to_le_bytes());
876    }
877
878    Ok(())
879}
880
881/// Loads symbol table from a buffer saved by `save_symbol_table`.
882pub fn load_symbol_table(data: bytes::Bytes) -> Result<Compressor> {
883    if data.is_empty() {
884        return Err(Error::new(ErrorKind::InvalidInput, "Empty symbol table"));
885    }
886
887    let symbol_count = data[0] as usize;
888    let lengths_start = 1;
889    let lengths_end = lengths_start + symbol_count;
890    if lengths_end > data.len() {
891        return Err(Error::new(
892            ErrorKind::InvalidInput,
893            "Buffer too small for symbol lengths",
894        ));
895    }
896
897    let lengths = &data[lengths_start..lengths_end];
898    let symbols_start = lengths_end;
899    let symbols_end = symbols_start + symbol_count * SYMBOL_SIZE_BYTES;
900    if symbols_end > data.len() {
901        return Err(Error::new(
902            ErrorKind::InvalidInput,
903            "Buffer too small for symbols",
904        ));
905    }
906
907    let mut symbols = Vec::with_capacity(symbol_count);
908    for i in 0..symbol_count {
909        let base = symbols_start + i * SYMBOL_SIZE_BYTES;
910        let bytes: [u8; SYMBOL_SIZE_BYTES] =
911            data[base..base + SYMBOL_SIZE_BYTES].try_into().unwrap();
912        symbols.push(Symbol::from_slice(&bytes));
913    }
914
915    Ok(fsst::Compressor::rebuild_from(symbols, lengths))
916}
917
918#[cfg(test)]
919mod tests {
920    use super::*;
921    use arrow::{
922        array::{Array, Decimal128Builder, StringBuilder},
923        datatypes::DataType,
924    };
925
926    #[test]
927    fn test_compact_offset_view_round_trip() {
928        // Test 1: Small offsets (should use OneByte variant)
929        let small_offsets = vec![100u32, 105, 110, 115];
930        test_round_trip(&small_offsets, "small offsets");
931
932        // Test 2: Medium offsets (should use TwoBytes variant)
933        let medium_offsets = vec![1000u32, 2000, 3000, 3500];
934        test_round_trip(&medium_offsets, "medium offsets");
935
936        // Test 3: Large offsets (should use FourBytes variant)
937        let large_offsets = vec![100000u32, 200000, 300000, 310000];
938        test_round_trip(&large_offsets, "large offsets");
939
940        // Test 4: Mixed scenario with varying prefix lengths
941        let mixed_offsets = vec![1000u32, 1010, 1020, 1030, 1040, 1050];
942        test_round_trip(&mixed_offsets, "mixed scenarios");
943
944        // Test 5: Edge case - empty values (single sentinel offset)
945        let empty_offsets: Vec<u32> = vec![0];
946        test_round_trip(&empty_offsets, "empty values");
947
948        // Test 6: Single value (one prefix, two offsets)
949        let single_offset = vec![42u32, 50];
950        test_round_trip(&single_offset, "single offset");
951    }
952
953    fn test_round_trip(offsets: &[u32], test_name: &str) {
954        let compact_offsets = CompactOffsets::from_offsets(offsets);
955
956        assert_eq!(
957            offsets.len(),
958            compact_offsets.len(),
959            "Length mismatch in {}",
960            test_name
961        );
962        for (i, offset) in offsets.iter().enumerate() {
963            assert_eq!(
964                compact_offsets.get_offset(i),
965                *offset,
966                "Offset mismatch at index {} in {}",
967                i,
968                test_name
969            );
970        }
971
972        let mut bytes = Vec::new();
973        compact_offsets.write_residuals(&mut bytes);
974        let reconstructed = decode_compact_offsets(&bytes);
975
976        assert_eq!(
977            offsets.len(),
978            reconstructed.len(),
979            "Reconstructed length mismatch in {}",
980            test_name
981        );
982        for (i, o) in offsets.iter().enumerate() {
983            assert_eq!(*o, reconstructed.get_offset(i));
984        }
985    }
986
987    #[test]
988    fn test_compact_offset_view_memory_efficiency() {
989        // test that compaction actually saves memory
990        let offsets = vec![1000u32, 1010, 1020, 1030, 1040];
991
992        let original_size = offsets.len() * std::mem::size_of::<u32>();
993        let compact_offsets = CompactOffsets::from_offsets(&offsets);
994        let compact_size = compact_offsets.memory_usage();
995
996        // for this test case, we should see some savings due to using smaller residuals
997        assert!(
998            compact_size <= original_size,
999            "Compact representation should not be larger"
1000        );
1001    }
1002
1003    #[test]
1004    fn test_compact_offset_view_struct_methods() {
1005        let key = PrefixKey::from_parts([1, 2, 3, 4, 5, 6, 7], 15);
1006        assert_eq!(key.prefix7(), &[1, 2, 3, 4, 5, 6, 7]);
1007        assert_eq!(key.len_byte(), 15);
1008        assert_eq!(key.known_suffix_len(), Some(15));
1009
1010        let unknown = PrefixKey::from_parts([7, 6, 5, 4, 3, 2, 1], 255);
1011        assert_eq!(unknown.len_byte(), 255);
1012        assert_eq!(unknown.known_suffix_len(), None);
1013
1014        let r1 = OffsetResiduals::One(vec![-42i8, 7].into());
1015        assert_eq!(r1.bytes_per(), 1);
1016        assert_eq!(r1.get_i32(0), -42);
1017
1018        let r2 = OffsetResiduals::Two(vec![12345i16].into());
1019        assert_eq!(r2.bytes_per(), 2);
1020        assert_eq!(r2.get_i32(0), 12345);
1021
1022        let r4 = OffsetResiduals::Four(vec![-1000000i32].into());
1023        assert_eq!(r4.bytes_per(), 4);
1024        assert_eq!(r4.get_i32(0), -1000000);
1025
1026        assert_eq!(PrefixKey::prefix_len(), 7);
1027    }
1028
1029    #[test]
1030    fn test_compact_offset_view_group_from_bytes_errors() {
1031        // Test with insufficient bytes for header
1032        let short_bytes = vec![1, 2, 3]; // only 3 bytes, need at least 9
1033        let result = std::panic::catch_unwind(|| decode_compact_offsets(&short_bytes));
1034        assert!(result.is_err(), "Should panic with insufficient bytes");
1035
1036        // Test with invalid offset_bytes value
1037        let mut invalid_header = vec![0; 9];
1038        invalid_header[8] = 3; // invalid offset_bytes (should be 1, 2, or 4)
1039        let result = std::panic::catch_unwind(|| decode_compact_offsets(&invalid_header));
1040        assert!(result.is_err(), "Should panic with invalid offset_bytes");
1041
1042        // Test with misaligned residual data for TwoBytes variant
1043        let mut misaligned_two_bytes = vec![0; 9 + 1]; // header + incomplete residual
1044        misaligned_two_bytes[8] = 2; // offset_bytes = 2
1045        let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_two_bytes));
1046        assert!(
1047            result.is_err(),
1048            "Should panic with misaligned TwoBytes residuals"
1049        );
1050
1051        // Test with misaligned residual data for FourBytes variant
1052        let mut misaligned_four_bytes = vec![0; 9 + 2]; // header + incomplete residual
1053        misaligned_four_bytes[8] = 4; // offset_bytes = 4
1054        let result = std::panic::catch_unwind(|| decode_compact_offsets(&misaligned_four_bytes));
1055        assert!(
1056            result.is_err(),
1057            "Should panic with misaligned FourBytes residuals"
1058        );
1059    }
1060
1061    #[test]
1062    fn test_compact_offset_view_group_from_bytes_valid() {
1063        // Test OneByte variant roundtrip
1064        let offsets = vec![100u32, 101, 105];
1065        let original = CompactOffsets::from_offsets(&offsets);
1066
1067        let mut bytes = Vec::new();
1068        original.write_residuals(&mut bytes);
1069        let reconstructed = decode_compact_offsets(&bytes);
1070
1071        // Verify they match
1072        assert_eq!(offsets.len(), reconstructed.len());
1073        for (i, o) in offsets.iter().enumerate() {
1074            assert_eq!(*o, reconstructed.get_offset(i));
1075        }
1076    }
1077
1078    #[test]
1079    fn test_fsst_buffer_bytes_roundtrip() {
1080        let mut builder = StringBuilder::new();
1081        for i in 0..1000 {
1082            builder.append_value(format!("test string value {i}"));
1083        }
1084        let original = builder.finish();
1085
1086        let compressor =
1087            FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1088        let compressor_arc = Arc::new(compressor);
1089        let original_fsst =
1090            FsstArray::from_byte_array_with_compressor(&original, compressor_arc.clone());
1091
1092        let mut buffer = Vec::new();
1093        original_fsst.to_bytes(&mut buffer);
1094
1095        let bytes = bytes::Bytes::from(buffer);
1096        let deserialized = FsstArray::from_bytes(bytes, compressor_arc);
1097
1098        let original_strings = original_fsst.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1099        let deserialized_strings = deserialized.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1100        assert_eq!(original_strings.len(), deserialized_strings.len());
1101        for (orig, deser) in original_strings.iter().zip(deserialized_strings.iter()) {
1102            assert_eq!(orig, deser);
1103        }
1104    }
1105
1106    #[test]
1107    fn test_decimal_compression_smoke() {
1108        let mut builder = Decimal128Builder::new().with_data_type(DataType::Decimal128(10, 2));
1109        for i in 0..4096 {
1110            builder.append_value(i128::from_le_bytes([(i % 16) as u8; 16]));
1111        }
1112        let original = builder.finish();
1113        let original_size = original.get_array_memory_size();
1114
1115        let values = original
1116            .iter()
1117            .filter_map(|v| v.map(|v| v.to_le_bytes()))
1118            .collect::<Vec<_>>();
1119        let compressor = FsstArray::train_compressor(values.iter().map(|b| b.as_slice()));
1120        let compressor_arc = Arc::new(compressor);
1121
1122        let fsst = FsstArray::from_decimal128_array_with_compressor(&original, compressor_arc);
1123        let compressed_size = fsst.get_array_memory_size();
1124        assert!(compressed_size < original_size);
1125    }
1126
1127    #[test]
1128    fn test_save_and_load_symbol_table_roundtrip() {
1129        let mut builder = StringBuilder::new();
1130        for i in 0..1000 {
1131            builder.append_value(format!("hello world {i}"));
1132        }
1133        let original = builder.finish();
1134
1135        let compressor =
1136            FsstArray::train_compressor(original.iter().flat_map(|s| s.map(|s| s.as_bytes())));
1137        let compressor_arc = Arc::new(compressor);
1138
1139        let mut bytes = Vec::new();
1140        save_symbol_table(compressor_arc.clone(), &mut bytes).unwrap();
1141        let reloaded = load_symbol_table(bytes::Bytes::from(bytes)).unwrap();
1142
1143        let fsst_original = FsstArray::from_byte_array_with_compressor(&original, compressor_arc);
1144        let fsst_reloaded =
1145            FsstArray::from_byte_array_with_compressor(&original, Arc::new(reloaded));
1146
1147        let a = fsst_original.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1148        let b = fsst_reloaded.to_arrow_byte_array::<arrow::datatypes::Utf8Type>();
1149        assert_eq!(a, b);
1150    }
1151}