Skip to main content

liquid_cache/liquid_array/byte_view_array/
mod.rs

1//! LiquidByteViewArray
2
3use arrow::array::BooleanArray;
4use arrow::array::{
5    Array, ArrayRef, BinaryArray, DictionaryArray, StringArray, UInt16Array, types::UInt16Type,
6};
7use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
8use arrow::compute::cast;
9use arrow_schema::DataType;
10use bytes::Bytes;
11use std::any::Any;
12use std::sync::Arc;
13
14#[cfg(test)]
15use std::cell::Cell;
16
17use crate::cache::{CacheExpression, LiquidExpr};
18use crate::liquid_array::byte_view_array::fingerprint::build_fingerprints;
19use crate::liquid_array::raw::FsstArray;
20use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
21use crate::liquid_array::{
22    LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, SqueezeIoHandler,
23    SqueezedBacking, eval_predicate_on_array,
24};
25
26mod comparisons;
27mod conversions;
28mod fingerprint;
29mod helpers;
30mod operator;
31mod serialization;
32
33#[cfg(test)]
34mod tests;
35
36pub use helpers::ByteViewArrayMemoryUsage;
37pub use operator::{ByteViewOperator, Comparison, Equality, SubString};
38
39#[cfg(test)]
40thread_local! {
41    static DISK_READ_COUNTER: Cell<usize> = const { Cell::new(0)};
42    static FULL_DATA_COMPARISON_COUNTER: Cell<usize> = const { Cell::new(0)};
43}
44
45#[cfg(test)]
46fn get_disk_read_counter() -> usize {
47    DISK_READ_COUNTER.with(|counter| counter.get())
48}
49
50#[cfg(test)]
51fn reset_disk_read_counter() {
52    DISK_READ_COUNTER.with(|counter| counter.set(0));
53}
54
55/// An array that stores strings using the FSST format with compact offsets:
56/// - Dictionary keys with 2-byte keys stored in memory
57/// - Compact offsets with variable-size residuals (1, 2, or 4 bytes) stored in memory
58/// - Per-value prefix keys (7-byte prefix + len) stored in memory
59/// - FSST buffer can be stored in memory or on disk
60///
61/// # Initialization
62///
63/// The recommended way to create a `LiquidByteViewArray` is using the `from_*_array` constructors
64/// which build a compact (offset + prefix key) representation directly from Arrow inputs.
65///
66/// ```rust,ignore
67/// let liquid_array = LiquidByteViewArray::from_string_array(&input, compressor);
68/// ```
69///
70/// Data access flow:
71/// 1. Use dictionary key to index into compact offsets buffer
72/// 2. Reconstruct actual offset from linear regression (predicted + residual)
73/// 3. Use prefix keys for quick comparisons to avoid decompression when possible
74/// 4. Decompress bytes from FSST buffer to get the full value when needed
75#[derive(Clone)]
76pub struct LiquidByteViewArray<B: FsstBacking> {
77    /// Dictionary keys (u16) - one per array element, using Arrow's UInt16Array for zero-copy
78    pub(super) dictionary_keys: UInt16Array,
79    /// Per-value prefix keys (prefix7 + len metadata).
80    pub(super) prefix_keys: Arc<[PrefixKey]>,
81    /// FSST-compressed buffer (can be in memory or on disk)
82    pub(super) fsst_buffer: B,
83    /// Used to convert back to the original arrow type
84    pub(super) original_arrow_type: ArrowByteType,
85    /// Shared prefix across all strings in the array
86    pub(super) shared_prefix: Vec<u8>,
87    /// Optional per-dictionary string fingerprints (32 bins).
88    pub(super) string_fingerprints: Option<Arc<[u32]>>,
89}
90
91#[derive(Clone, Copy, Debug)]
92pub(crate) struct ByteViewBuildOptions {
93    pub(super) arrow_type: ArrowByteType,
94    pub(super) build_fingerprints: bool,
95}
96
97impl ByteViewBuildOptions {
98    pub(crate) fn new(arrow_type: ArrowByteType) -> Self {
99        Self {
100            arrow_type,
101            build_fingerprints: false,
102        }
103    }
104
105    pub(crate) fn for_data_type(data_type: &DataType, build_fingerprints: bool) -> Self {
106        Self {
107            arrow_type: ArrowByteType::from_arrow_type(data_type),
108            build_fingerprints,
109        }
110    }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq)]
114#[repr(u16)]
115pub(crate) enum ArrowByteType {
116    Utf8 = 0,
117    Utf8View = 1,
118    Dict16Binary = 2,
119    Dict16Utf8 = 3,
120    Binary = 4,
121    BinaryView = 5,
122}
123
124impl From<u16> for ArrowByteType {
125    fn from(value: u16) -> Self {
126        match value {
127            0 => ArrowByteType::Utf8,
128            1 => ArrowByteType::Utf8View,
129            2 => ArrowByteType::Dict16Binary,
130            3 => ArrowByteType::Dict16Utf8,
131            4 => ArrowByteType::Binary,
132            5 => ArrowByteType::BinaryView,
133            _ => panic!("Invalid arrow byte type: {value}"),
134        }
135    }
136}
137
138impl ArrowByteType {
139    pub fn to_arrow_type(self) -> DataType {
140        match self {
141            ArrowByteType::Utf8 => DataType::Utf8,
142            ArrowByteType::Utf8View => DataType::Utf8View,
143            ArrowByteType::Dict16Binary => {
144                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
145            }
146            ArrowByteType::Dict16Utf8 => {
147                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
148            }
149            ArrowByteType::Binary => DataType::Binary,
150            ArrowByteType::BinaryView => DataType::BinaryView,
151        }
152    }
153
154    pub fn from_arrow_type(ty: &DataType) -> Self {
155        match ty {
156            DataType::Utf8 => ArrowByteType::Utf8,
157            DataType::Utf8View => ArrowByteType::Utf8View,
158            DataType::Binary => ArrowByteType::Binary,
159            DataType::BinaryView => ArrowByteType::BinaryView,
160            DataType::Dictionary(_, _) => {
161                if ty
162                    == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
163                {
164                    ArrowByteType::Dict16Binary
165                } else if ty
166                    == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
167                {
168                    ArrowByteType::Dict16Utf8
169                } else {
170                    panic!("Unsupported arrow type: {ty:?}")
171                }
172            }
173            _ => panic!("Unsupported arrow type: {ty:?}"),
174        }
175    }
176}
177
178impl<B: FsstBacking> std::fmt::Debug for LiquidByteViewArray<B> {
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        f.debug_struct("LiquidByteViewArray")
181            .field("dictionary_keys", &self.dictionary_keys)
182            .field("prefix_keys", &self.prefix_keys)
183            .field("fsst_buffer", &self.fsst_buffer)
184            .field("original_arrow_type", &self.original_arrow_type)
185            .field("shared_prefix", &self.shared_prefix)
186            .field("string_fingerprints", &self.string_fingerprints)
187            .finish()
188    }
189}
190
191impl<B: FsstBacking> LiquidByteViewArray<B> {
192    /// Convert to Arrow DictionaryArray
193    fn to_dict_arrow_inner(
194        &self,
195        keys_array: UInt16Array,
196        values_buffer: Buffer,
197        offsets_buffer: OffsetBuffer<i32>,
198    ) -> DictionaryArray<UInt16Type> {
199        let values = if self.original_arrow_type == ArrowByteType::Utf8
200            || self.original_arrow_type == ArrowByteType::Utf8View
201            || self.original_arrow_type == ArrowByteType::Dict16Utf8
202        {
203            let string_array =
204                unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
205            Arc::new(string_array) as ArrayRef
206        } else {
207            let binary_array =
208                unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
209            Arc::new(binary_array) as ArrayRef
210        };
211
212        unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys_array, values) }
213    }
214
215    fn should_decompress_keyed(&self) -> bool {
216        self.dictionary_keys.len() < 2048 || self.dictionary_keys.len() < self.prefix_keys.len()
217    }
218
219    /// Get the nulls buffer
220    pub fn nulls(&self) -> Option<&NullBuffer> {
221        self.dictionary_keys.nulls()
222    }
223
224    /// Get detailed memory usage of the byte view array
225    pub fn get_detailed_memory_usage(&self) -> ByteViewArrayMemoryUsage {
226        let fingerprint_bytes = self
227            .string_fingerprints
228            .as_ref()
229            .map(|fingerprints| fingerprints.len() * std::mem::size_of::<u32>())
230            .unwrap_or(0);
231        ByteViewArrayMemoryUsage {
232            dictionary_key: self.dictionary_keys.get_array_memory_size(),
233            prefix_keys: self.prefix_keys.len() * std::mem::size_of::<PrefixKey>(),
234            fsst_buffer: self.fsst_buffer.get_array_memory_size(),
235            shared_prefix: self.shared_prefix.len(),
236            string_fingerprints: fingerprint_bytes,
237            struct_size: std::mem::size_of::<Self>(),
238        }
239    }
240
241    /// Get the length of the array
242    pub fn len(&self) -> usize {
243        self.dictionary_keys.len()
244    }
245
246    /// Is the array empty?
247    pub fn is_empty(&self) -> bool {
248        self.dictionary_keys.is_empty()
249    }
250
251    /// Get disk read count for testing
252    #[cfg(test)]
253    pub fn get_disk_read_count(&self) -> usize {
254        get_disk_read_counter()
255    }
256
257    /// Reset disk read count for testing
258    #[cfg(test)]
259    pub fn reset_disk_read_count(&self) {
260        reset_disk_read_counter()
261    }
262}
263
264impl LiquidByteViewArray<FsstArray> {
265    /// Convert to Arrow DictionaryArray
266    pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
267        if self.should_decompress_keyed() {
268            self.to_dict_arrow_decompress_keyed()
269        } else {
270            self.to_dict_arrow_decompress_all()
271        }
272    }
273
274    fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
275        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
276        self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
277    }
278
279    fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
280        let (selected, new_keys) =
281            helpers::build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
282        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed_selected(&selected);
283        self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
284    }
285
286    /// Convert to Arrow array with original type
287    pub fn to_arrow_array(&self) -> ArrayRef {
288        let dict = self.to_dict_arrow();
289        cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
290    }
291
292    /// Check if the FSST buffer is currently stored on disk
293    pub fn is_fsst_buffer_on_disk(&self) -> bool {
294        false
295    }
296}
297
298impl LiquidByteViewArray<DiskBuffer> {
299    /// Check if the FSST buffer is currently stored on disk
300    pub fn is_fsst_buffer_on_disk(&self) -> bool {
301        true
302    }
303
304    /// Convert to Arrow DictionaryArray
305    pub async fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
306        if self.should_decompress_keyed() {
307            self.to_dict_arrow_decompress_keyed().await
308        } else {
309            self.to_dict_arrow_decompress_all().await
310        }
311    }
312
313    async fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
314        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed().await;
315        self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
316    }
317
318    async fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
319        let (selected, new_keys) =
320            helpers::build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
321        let (values_buffer, offsets_buffer) =
322            self.fsst_buffer.to_uncompressed_selected(&selected).await;
323        self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
324    }
325
326    /// Convert to Arrow array with original type
327    pub async fn to_arrow_array(&self) -> ArrayRef {
328        let dict = self.to_dict_arrow().await;
329        cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
330    }
331}
332
333impl LiquidArray for LiquidByteViewArray<FsstArray> {
334    fn as_any(&self) -> &dyn Any {
335        self
336    }
337
338    fn get_array_memory_size(&self) -> usize {
339        self.get_detailed_memory_usage().total()
340    }
341
342    fn len(&self) -> usize {
343        self.dictionary_keys.len()
344    }
345
346    #[inline]
347    fn to_arrow_array(&self) -> ArrayRef {
348        let dict = self.to_arrow_array();
349        Arc::new(dict)
350    }
351
352    fn to_best_arrow_array(&self) -> ArrayRef {
353        let dict = self.to_dict_arrow();
354        Arc::new(dict)
355    }
356
357    fn try_eval_predicate(&self, expr: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
358        let filtered = helpers::filter_inner(self, filter);
359
360        helpers::try_eval_predicate_in_memory(expr.physical_expr(), &filtered)
361            .unwrap_or_else(|| eval_predicate_on_array(filtered.to_arrow_array(), expr))
362    }
363
364    fn to_bytes(&self) -> Vec<u8> {
365        self.to_bytes_inner().expect("InMemoryFsstBuffer")
366    }
367
368    fn original_arrow_data_type(&self) -> DataType {
369        self.original_arrow_type.to_arrow_type()
370    }
371
372    fn data_type(&self) -> LiquidDataType {
373        LiquidDataType::ByteViewArray
374    }
375
376    fn squeeze(
377        &self,
378        io: Arc<dyn SqueezeIoHandler>,
379        squeeze_hint: Option<&CacheExpression>,
380    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
381        squeeze_hint?;
382
383        let string_fingerprints = if matches!(squeeze_hint, Some(CacheExpression::SubstringSearch))
384        {
385            self.string_fingerprints.clone().or_else(|| {
386                let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
387                Some(build_fingerprints(&values_buffer, &offsets_buffer))
388            })
389        } else {
390            None
391        };
392
393        // Serialize full IPC bytes first
394        let bytes = match self.to_bytes_inner() {
395            Ok(b) => b,
396            Err(_) => return None,
397        };
398
399        // Build the hybrid (disk-backed FSST) view
400        let disk_range = 0u64..(bytes.len() as u64);
401        let compressor = self.fsst_buffer.compressor_arc();
402        let disk = DiskBuffer::new(
403            self.fsst_buffer.uncompressed_bytes(),
404            io,
405            disk_range,
406            compressor,
407        );
408        let hybrid = LiquidByteViewArray::<DiskBuffer> {
409            dictionary_keys: self.dictionary_keys.clone(),
410            prefix_keys: self.prefix_keys.clone(),
411            fsst_buffer: disk,
412            original_arrow_type: self.original_arrow_type,
413            shared_prefix: self.shared_prefix.clone(),
414            string_fingerprints,
415        };
416
417        let bytes = Bytes::from(bytes);
418        Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, bytes))
419    }
420
421    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
422        let filtered = helpers::filter_inner(self, selection);
423        filtered.to_arrow_array()
424    }
425}
426
427#[async_trait::async_trait]
428impl LiquidSqueezedArray for LiquidByteViewArray<DiskBuffer> {
429    /// Get the underlying any type.
430    fn as_any(&self) -> &dyn Any {
431        self
432    }
433
434    /// Get the memory size of the Liquid array.
435    fn get_array_memory_size(&self) -> usize {
436        self.get_detailed_memory_usage().total()
437    }
438
439    /// Get the length of the Liquid array.
440    fn len(&self) -> usize {
441        self.dictionary_keys.len()
442    }
443
444    /// Check if the Liquid array is empty.
445    fn is_empty(&self) -> bool {
446        self.len() == 0
447    }
448
449    /// Convert the Liquid array to an Arrow array.
450    async fn to_arrow_array(&self) -> ArrayRef {
451        let bytes = self
452            .fsst_buffer
453            .squeeze_io()
454            .read(Some(self.fsst_buffer.disk_range()))
455            .await
456            .expect("read squeezed backing");
457        let hydrated =
458            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.fsst_buffer.compressor_arc());
459        LiquidByteViewArray::<FsstArray>::to_arrow_array(&hydrated)
460    }
461
462    /// Get the logical data type of the Liquid array.
463    fn data_type(&self) -> LiquidDataType {
464        LiquidDataType::ByteViewArray
465    }
466
467    fn original_arrow_data_type(&self) -> DataType {
468        self.original_arrow_type.to_arrow_type()
469    }
470
471    fn disk_backing(&self) -> SqueezedBacking {
472        SqueezedBacking::Liquid(self.fsst_buffer.disk_range_len())
473    }
474
475    /// Filter the Liquid array with a boolean array and return an **arrow array**.
476    async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
477        let select_any = selection.count_set_bits() > 0;
478        if !select_any {
479            return arrow::array::new_empty_array(&self.original_arrow_data_type());
480        }
481        let filtered = helpers::filter_inner(self, selection);
482        filtered.to_arrow_array().await
483    }
484
485    /// Try to evaluate a predicate on the Liquid array with a filter.
486    /// Returns `Ok(None)` if the predicate is not supported.
487    ///
488    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
489    /// The returned boolean mask is nullable if the the original array is nullable.
490    async fn try_eval_predicate(&self, expr: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
491        // Reuse generic filter path first to reduce input rows if any
492        let filtered = helpers::filter_inner(self, filter);
493        if let Some(mask) =
494            helpers::try_eval_predicate_on_disk(expr.physical_expr(), &filtered).await
495        {
496            mask
497        } else {
498            eval_predicate_on_array(filtered.to_arrow_array().await, expr)
499        }
500    }
501}