Skip to main content

liquid_cache/liquid_array/byte_view_array/
helpers.rs

1use arrow::array::{
2    BooleanArray, BooleanBufferBuilder, UInt16Array, cast::AsArray, types::UInt16Type,
3};
4use arrow::buffer::BooleanBuffer;
5use datafusion_physical_expr::PhysicalExpr;
6use std::sync::Arc;
7
8use super::LiquidByteViewArray;
9use super::operator::{ByteViewExpression, ByteViewOperator};
10use crate::liquid_array::byte_view_array::operator::UnsupportedExpression;
11use crate::liquid_array::raw::FsstArray;
12use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking};
13
14pub(super) fn build_dict_selection(
15    keys: &UInt16Array,
16    dict_len: usize,
17) -> (Vec<usize>, UInt16Array) {
18    let mut hit_mask = BooleanBufferBuilder::new(dict_len);
19    hit_mask.advance(dict_len);
20    for value in keys.iter().flatten() {
21        hit_mask.set_bit(value as usize, true);
22    }
23    let hit_mask = hit_mask.finish();
24    let selected_cnt = hit_mask.count_set_bits();
25
26    let mut key_map = vec![u16::MAX; dict_len];
27    let mut selected = Vec::with_capacity(selected_cnt);
28    let mut remapped: u16 = 0;
29    for (index, selected_flag) in hit_mask.iter().enumerate() {
30        if selected_flag {
31            key_map[index] = remapped;
32            selected.push(index);
33            remapped += 1;
34        }
35    }
36
37    let new_keys = UInt16Array::from_iter(
38        keys.iter()
39            .map(|value| value.map(|value| key_map[value as usize])),
40    );
41    (selected, new_keys)
42}
43
44pub(super) fn filter_inner<B: FsstBacking>(
45    array: &LiquidByteViewArray<B>,
46    filter: &BooleanBuffer,
47) -> LiquidByteViewArray<B> {
48    // Only filter the dictionary keys, not the offsets!
49    // Offset views reference unique values in FSST buffer and should remain unchanged
50
51    // Filter the dictionary keys using Arrow's built-in filter functionality
52    let filter = BooleanArray::new(filter.clone(), None);
53    let filtered_keys = arrow::compute::filter(&array.dictionary_keys, &filter).unwrap();
54    let filtered_keys = filtered_keys.as_primitive::<UInt16Type>().clone();
55
56    LiquidByteViewArray {
57        dictionary_keys: filtered_keys,
58        prefix_keys: array.prefix_keys.clone(),
59        fsst_buffer: array.fsst_buffer.clone(),
60        original_arrow_type: array.original_arrow_type,
61        shared_prefix: array.shared_prefix.clone(),
62        string_fingerprints: array.string_fingerprints.clone(),
63    }
64}
65
66pub(super) fn try_eval_predicate_in_memory(
67    expr: &Arc<dyn PhysicalExpr>,
68    array: &LiquidByteViewArray<FsstArray>,
69) -> Option<BooleanArray> {
70    let expr = match ByteViewExpression::try_from(expr) {
71        Ok(expr) => expr,
72        Err(UnsupportedExpression::Constant(v)) => {
73            let bool_array = if v {
74                BooleanBuffer::new_set(array.len())
75            } else {
76                BooleanBuffer::new_unset(array.len())
77            };
78            return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
79        }
80        Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
81            return None;
82        }
83    };
84    let op = expr.op();
85    let needle = expr.literal();
86    if let ByteViewOperator::SubString(_substring_op) = op
87        && array.string_fingerprints.as_ref().is_none()
88    {
89        return None;
90    }
91    Some(array.compare_with(needle, op))
92}
93
94pub(super) async fn try_eval_predicate_on_disk(
95    expr: &Arc<dyn PhysicalExpr>,
96    array: &LiquidByteViewArray<DiskBuffer>,
97) -> Option<BooleanArray> {
98    let cur_expr = match ByteViewExpression::try_from(expr) {
99        Ok(expr) => expr,
100        Err(UnsupportedExpression::Constant(v)) => {
101            let bool_array = if v {
102                BooleanBuffer::new_set(array.len())
103            } else {
104                BooleanBuffer::new_unset(array.len())
105            };
106            return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
107        }
108        Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
109            return None;
110        }
111    };
112
113    let op = cur_expr.op();
114    let needle = cur_expr.literal();
115
116    if let ByteViewOperator::SubString(_substring_op) = op
117        && array.string_fingerprints.as_ref().is_none()
118    {
119        return None;
120    }
121    let result = array.compare_with(needle, op).await;
122    Some(result)
123}
124
125use std::fmt::Display;
126
127/// Detailed memory usage of the byte view array
128pub struct ByteViewArrayMemoryUsage {
129    /// Memory usage of the dictionary key
130    pub dictionary_key: usize,
131    /// Memory usage of the prefix keys
132    pub prefix_keys: usize,
133    /// Memory usage of the raw FSST buffer
134    pub fsst_buffer: usize,
135    /// Memory usage of the shared prefix
136    pub shared_prefix: usize,
137    /// Memory usage of the string fingerprints
138    pub string_fingerprints: usize,
139    /// Memory usage of the struct size
140    pub struct_size: usize,
141}
142
143impl Display for ByteViewArrayMemoryUsage {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("ByteViewArrayMemoryUsage")
146            .field("dictionary_key", &self.dictionary_key)
147            .field("prefix_keys", &self.prefix_keys)
148            .field("fsst_buffer", &self.fsst_buffer)
149            .field("shared_prefix", &self.shared_prefix)
150            .field("string_fingerprints", &self.string_fingerprints)
151            .field("struct_size", &self.struct_size)
152            .field("total", &self.total())
153            .finish()
154    }
155}
156
157impl ByteViewArrayMemoryUsage {
158    /// Get the total memory usage of the byte view array
159    pub fn total(&self) -> usize {
160        self.dictionary_key
161            + self.prefix_keys
162            + self.fsst_buffer
163            + self.shared_prefix
164            + self.string_fingerprints
165            + self.struct_size
166    }
167}
168
169impl std::ops::AddAssign for ByteViewArrayMemoryUsage {
170    fn add_assign(&mut self, other: Self) {
171        self.dictionary_key += other.dictionary_key;
172        self.prefix_keys += other.prefix_keys;
173        self.fsst_buffer += other.fsst_buffer;
174        self.shared_prefix += other.shared_prefix;
175        self.string_fingerprints += other.string_fingerprints;
176        self.struct_size += other.struct_size;
177    }
178}