Skip to main content

liquid_cache/liquid_array/byte_view_array/
mod.rs

1//! LiquidByteViewArray
2
3use arrow::array::BooleanArray;
4use arrow::array::{
5    Array, ArrayRef, BinaryArray, DictionaryArray, StringArray, UInt16Array, types::UInt16Type,
6};
7use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
8use arrow::compute::cast;
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion::physical_plan::PhysicalExpr;
12use std::any::Any;
13use std::sync::Arc;
14
15#[cfg(test)]
16use std::cell::Cell;
17
18use crate::cache::CacheExpression;
19use crate::liquid_array::byte_array::{ArrowByteType, build_dict_selection};
20use crate::liquid_array::byte_view_array::fingerprint::build_fingerprints;
21use crate::liquid_array::raw::FsstArray;
22use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
23use crate::liquid_array::{
24    LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, SqueezeIoHandler,
25};
26
27mod comparisons;
28mod conversions;
29mod fingerprint;
30mod helpers;
31mod operator;
32mod serialization;
33
34#[cfg(test)]
35mod tests;
36
37pub use helpers::ByteViewArrayMemoryUsage;
38pub use operator::{ByteViewOperator, Comparison, Equality, SubString};
39
40#[cfg(test)]
41thread_local! {
42    static DISK_READ_COUNTER: Cell<usize> = const { Cell::new(0)};
43    static FULL_DATA_COMPARISON_COUNTER: Cell<usize> = const { Cell::new(0)};
44}
45
46#[cfg(test)]
47fn get_disk_read_counter() -> usize {
48    DISK_READ_COUNTER.with(|counter| counter.get())
49}
50
51#[cfg(test)]
52fn reset_disk_read_counter() {
53    DISK_READ_COUNTER.with(|counter| counter.set(0));
54}
55
56/// An array that stores strings using the FSST format with compact offsets:
57/// - Dictionary keys with 2-byte keys stored in memory
58/// - Compact offsets with variable-size residuals (1, 2, or 4 bytes) stored in memory
59/// - Per-value prefix keys (7-byte prefix + len) stored in memory
60/// - FSST buffer can be stored in memory or on disk
61///
62/// # Initialization
63///
64/// The recommended way to create a `LiquidByteViewArray` is using the `from_*_array` constructors
65/// which build a compact (offset + prefix key) representation directly from Arrow inputs.
66///
67/// ```rust,ignore
68/// let liquid_array = LiquidByteViewArray::from_string_array(&input, compressor);
69/// ```
70///
71/// Data access flow:
72/// 1. Use dictionary key to index into compact offsets buffer
73/// 2. Reconstruct actual offset from linear regression (predicted + residual)
74/// 3. Use prefix keys for quick comparisons to avoid decompression when possible
75/// 4. Decompress bytes from FSST buffer to get the full value when needed
76#[derive(Clone)]
77pub struct LiquidByteViewArray<B: FsstBacking> {
78    /// Dictionary keys (u16) - one per array element, using Arrow's UInt16Array for zero-copy
79    pub(super) dictionary_keys: UInt16Array,
80    /// Per-value prefix keys (prefix7 + len metadata).
81    pub(super) prefix_keys: Arc<[PrefixKey]>,
82    /// FSST-compressed buffer (can be in memory or on disk)
83    pub(super) fsst_buffer: B,
84    /// Used to convert back to the original arrow type
85    pub(super) original_arrow_type: ArrowByteType,
86    /// Shared prefix across all strings in the array
87    pub(super) shared_prefix: Vec<u8>,
88    /// Optional per-dictionary string fingerprints (32 bins).
89    pub(super) string_fingerprints: Option<Arc<[u32]>>,
90}
91
92#[derive(Clone, Copy, Debug)]
93pub(crate) struct ByteViewBuildOptions {
94    pub(super) arrow_type: ArrowByteType,
95    pub(super) build_fingerprints: bool,
96}
97
98impl ByteViewBuildOptions {
99    pub(crate) fn new(arrow_type: ArrowByteType) -> Self {
100        Self {
101            arrow_type,
102            build_fingerprints: false,
103        }
104    }
105
106    pub(crate) fn for_data_type(data_type: &DataType, build_fingerprints: bool) -> Self {
107        Self {
108            arrow_type: ArrowByteType::from_arrow_type(data_type),
109            build_fingerprints,
110        }
111    }
112}
113
114impl<B: FsstBacking> std::fmt::Debug for LiquidByteViewArray<B> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("LiquidByteViewArray")
117            .field("dictionary_keys", &self.dictionary_keys)
118            .field("prefix_keys", &self.prefix_keys)
119            .field("fsst_buffer", &self.fsst_buffer)
120            .field("original_arrow_type", &self.original_arrow_type)
121            .field("shared_prefix", &self.shared_prefix)
122            .field("string_fingerprints", &self.string_fingerprints)
123            .finish()
124    }
125}
126
127impl<B: FsstBacking> LiquidByteViewArray<B> {
128    /// Convert to Arrow DictionaryArray
129    fn to_dict_arrow_inner(
130        &self,
131        keys_array: UInt16Array,
132        values_buffer: Buffer,
133        offsets_buffer: OffsetBuffer<i32>,
134    ) -> DictionaryArray<UInt16Type> {
135        let values = if self.original_arrow_type == ArrowByteType::Utf8
136            || self.original_arrow_type == ArrowByteType::Utf8View
137            || self.original_arrow_type == ArrowByteType::Dict16Utf8
138        {
139            let string_array =
140                unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
141            Arc::new(string_array) as ArrayRef
142        } else {
143            let binary_array =
144                unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
145            Arc::new(binary_array) as ArrayRef
146        };
147
148        unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys_array, values) }
149    }
150
151    fn should_decompress_keyed(&self) -> bool {
152        self.dictionary_keys.len() < 2048 || self.dictionary_keys.len() < self.prefix_keys.len()
153    }
154
155    /// Get the nulls buffer
156    pub fn nulls(&self) -> Option<&NullBuffer> {
157        self.dictionary_keys.nulls()
158    }
159
160    /// Get detailed memory usage of the byte view array
161    pub fn get_detailed_memory_usage(&self) -> ByteViewArrayMemoryUsage {
162        let fingerprint_bytes = self
163            .string_fingerprints
164            .as_ref()
165            .map(|fingerprints| fingerprints.len() * std::mem::size_of::<u32>())
166            .unwrap_or(0);
167        ByteViewArrayMemoryUsage {
168            dictionary_key: self.dictionary_keys.get_array_memory_size(),
169            prefix_keys: self.prefix_keys.len() * std::mem::size_of::<PrefixKey>(),
170            fsst_buffer: self.fsst_buffer.get_array_memory_size(),
171            shared_prefix: self.shared_prefix.len(),
172            string_fingerprints: fingerprint_bytes,
173            struct_size: std::mem::size_of::<Self>(),
174        }
175    }
176
177    /// Get the length of the array
178    pub fn len(&self) -> usize {
179        self.dictionary_keys.len()
180    }
181
182    /// Is the array empty?
183    pub fn is_empty(&self) -> bool {
184        self.dictionary_keys.is_empty()
185    }
186
187    /// Get disk read count for testing
188    #[cfg(test)]
189    pub fn get_disk_read_count(&self) -> usize {
190        get_disk_read_counter()
191    }
192
193    /// Reset disk read count for testing
194    #[cfg(test)]
195    pub fn reset_disk_read_count(&self) {
196        reset_disk_read_counter()
197    }
198}
199
200impl LiquidByteViewArray<FsstArray> {
201    /// Convert to Arrow DictionaryArray
202    pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
203        if self.should_decompress_keyed() {
204            self.to_dict_arrow_decompress_keyed()
205        } else {
206            self.to_dict_arrow_decompress_all()
207        }
208    }
209
210    fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
211        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
212        self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
213    }
214
215    fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
216        let (selected, new_keys) =
217            build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
218        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed_selected(&selected);
219        self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
220    }
221
222    /// Convert to Arrow array with original type
223    pub fn to_arrow_array(&self) -> ArrayRef {
224        let dict = self.to_dict_arrow();
225        cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
226    }
227
228    /// Check if the FSST buffer is currently stored on disk
229    pub fn is_fsst_buffer_on_disk(&self) -> bool {
230        false
231    }
232}
233
234impl LiquidByteViewArray<DiskBuffer> {
235    /// Check if the FSST buffer is currently stored on disk
236    pub fn is_fsst_buffer_on_disk(&self) -> bool {
237        true
238    }
239
240    /// Convert to Arrow DictionaryArray
241    pub async fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
242        if self.should_decompress_keyed() {
243            self.to_dict_arrow_decompress_keyed().await
244        } else {
245            self.to_dict_arrow_decompress_all().await
246        }
247    }
248
249    async fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
250        let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed().await;
251        self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
252    }
253
254    async fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
255        let (selected, new_keys) =
256            build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
257        let (values_buffer, offsets_buffer) =
258            self.fsst_buffer.to_uncompressed_selected(&selected).await;
259        self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
260    }
261
262    /// Convert to Arrow array with original type
263    pub async fn to_arrow_array(&self) -> ArrayRef {
264        let dict = self.to_dict_arrow().await;
265        cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
266    }
267}
268
269impl LiquidArray for LiquidByteViewArray<FsstArray> {
270    fn as_any(&self) -> &dyn Any {
271        self
272    }
273
274    fn get_array_memory_size(&self) -> usize {
275        self.get_detailed_memory_usage().total()
276    }
277
278    fn len(&self) -> usize {
279        self.dictionary_keys.len()
280    }
281
282    #[inline]
283    fn to_arrow_array(&self) -> ArrayRef {
284        let dict = self.to_arrow_array();
285        Arc::new(dict)
286    }
287
288    fn to_best_arrow_array(&self) -> ArrayRef {
289        let dict = self.to_dict_arrow();
290        Arc::new(dict)
291    }
292
293    fn try_eval_predicate(
294        &self,
295        expr: &Arc<dyn PhysicalExpr>,
296        filter: &BooleanBuffer,
297    ) -> Option<BooleanArray> {
298        let filtered = helpers::filter_inner(self, filter);
299
300        helpers::try_eval_predicate_in_memory(expr, &filtered)
301    }
302
303    fn to_bytes(&self) -> Vec<u8> {
304        self.to_bytes_inner().expect("InMemoryFsstBuffer")
305    }
306
307    fn original_arrow_data_type(&self) -> DataType {
308        self.original_arrow_type.to_arrow_type()
309    }
310
311    fn data_type(&self) -> LiquidDataType {
312        LiquidDataType::ByteViewArray
313    }
314
315    fn squeeze(
316        &self,
317        io: Arc<dyn SqueezeIoHandler>,
318        squeeze_hint: Option<&CacheExpression>,
319    ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
320        squeeze_hint?;
321
322        let string_fingerprints = if matches!(squeeze_hint, Some(CacheExpression::SubstringSearch))
323        {
324            self.string_fingerprints.clone().or_else(|| {
325                let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
326                Some(build_fingerprints(&values_buffer, &offsets_buffer))
327            })
328        } else {
329            None
330        };
331
332        // Serialize full IPC bytes first
333        let bytes = match self.to_bytes_inner() {
334            Ok(b) => b,
335            Err(_) => return None,
336        };
337
338        // Build the hybrid (disk-backed FSST) view
339        let disk_range = 0u64..(bytes.len() as u64);
340        let compressor = self.fsst_buffer.compressor_arc();
341        let disk = DiskBuffer::new(
342            self.fsst_buffer.uncompressed_bytes(),
343            io,
344            disk_range,
345            compressor,
346        );
347        let hybrid = LiquidByteViewArray::<DiskBuffer> {
348            dictionary_keys: self.dictionary_keys.clone(),
349            prefix_keys: self.prefix_keys.clone(),
350            fsst_buffer: disk,
351            original_arrow_type: self.original_arrow_type,
352            shared_prefix: self.shared_prefix.clone(),
353            string_fingerprints,
354        };
355
356        let bytes = Bytes::from(bytes);
357        Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, bytes))
358    }
359
360    fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
361        let filtered = helpers::filter_inner(self, selection);
362        filtered.to_arrow_array()
363    }
364}
365
366#[async_trait::async_trait]
367impl LiquidSqueezedArray for LiquidByteViewArray<DiskBuffer> {
368    /// Get the underlying any type.
369    fn as_any(&self) -> &dyn Any {
370        self
371    }
372
373    /// Get the memory size of the Liquid array.
374    fn get_array_memory_size(&self) -> usize {
375        self.get_detailed_memory_usage().total()
376    }
377
378    /// Get the length of the Liquid array.
379    fn len(&self) -> usize {
380        self.dictionary_keys.len()
381    }
382
383    /// Check if the Liquid array is empty.
384    fn is_empty(&self) -> bool {
385        self.len() == 0
386    }
387
388    /// Convert the Liquid array to an Arrow array.
389    async fn to_arrow_array(&self) -> ArrayRef {
390        let bytes = self
391            .fsst_buffer
392            .squeeze_io()
393            .read(Some(self.fsst_buffer.disk_range()))
394            .await
395            .expect("read squeezed backing");
396        let hydrated =
397            LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.fsst_buffer.compressor_arc());
398        LiquidByteViewArray::<FsstArray>::to_arrow_array(&hydrated)
399    }
400
401    /// Get the logical data type of the Liquid array.
402    fn data_type(&self) -> LiquidDataType {
403        LiquidDataType::ByteViewArray
404    }
405
406    fn original_arrow_data_type(&self) -> DataType {
407        self.original_arrow_type.to_arrow_type()
408    }
409
410    /// Filter the Liquid array with a boolean array and return an **arrow array**.
411    async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
412        let select_any = selection.count_set_bits() > 0;
413        if !select_any {
414            return arrow::array::new_empty_array(&self.original_arrow_data_type());
415        }
416        let filtered = helpers::filter_inner(self, selection);
417        filtered.to_arrow_array().await
418    }
419
420    /// Try to evaluate a predicate on the Liquid array with a filter.
421    /// Returns `Ok(None)` if the predicate is not supported.
422    ///
423    /// Note that the filter is a boolean buffer, not a boolean array, i.e., filter can't be nullable.
424    /// The returned boolean mask is nullable if the the original array is nullable.
425    async fn try_eval_predicate(
426        &self,
427        expr: &Arc<dyn PhysicalExpr>,
428        filter: &BooleanBuffer,
429    ) -> Option<BooleanArray> {
430        // Reuse generic filter path first to reduce input rows if any
431        let filtered = helpers::filter_inner(self, filter);
432        helpers::try_eval_predicate_on_disk(expr, &filtered).await
433    }
434}