Skip to main content

liquid_cache/liquid_array/byte_view_array/
helpers.rs

1use arrow::array::{BooleanArray, cast::AsArray, types::UInt16Type};
2use arrow::buffer::BooleanBuffer;
3use datafusion::physical_plan::PhysicalExpr;
4use std::sync::Arc;
5
6use super::LiquidByteViewArray;
7use super::operator::{ByteViewExpression, ByteViewOperator};
8use crate::liquid_array::byte_view_array::operator::UnsupportedExpression;
9use crate::liquid_array::raw::FsstArray;
10use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking};
11
12pub(super) fn filter_inner<B: FsstBacking>(
13    array: &LiquidByteViewArray<B>,
14    filter: &BooleanBuffer,
15) -> LiquidByteViewArray<B> {
16    // Only filter the dictionary keys, not the offsets!
17    // Offset views reference unique values in FSST buffer and should remain unchanged
18
19    // Filter the dictionary keys using Arrow's built-in filter functionality
20    let filter = BooleanArray::new(filter.clone(), None);
21    let filtered_keys = arrow::compute::filter(&array.dictionary_keys, &filter).unwrap();
22    let filtered_keys = filtered_keys.as_primitive::<UInt16Type>().clone();
23
24    LiquidByteViewArray {
25        dictionary_keys: filtered_keys,
26        prefix_keys: array.prefix_keys.clone(),
27        fsst_buffer: array.fsst_buffer.clone(),
28        original_arrow_type: array.original_arrow_type,
29        shared_prefix: array.shared_prefix.clone(),
30        string_fingerprints: array.string_fingerprints.clone(),
31    }
32}
33
34pub(super) fn try_eval_predicate_in_memory(
35    expr: &Arc<dyn PhysicalExpr>,
36    array: &LiquidByteViewArray<FsstArray>,
37) -> Option<BooleanArray> {
38    let expr = match ByteViewExpression::try_from(expr) {
39        Ok(expr) => expr,
40        Err(UnsupportedExpression::Constant(v)) => {
41            let bool_array = if v {
42                BooleanBuffer::new_set(array.len())
43            } else {
44                BooleanBuffer::new_unset(array.len())
45            };
46            return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
47        }
48        Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
49            return None;
50        }
51    };
52    let op = expr.op();
53    let needle = expr.literal();
54    if let ByteViewOperator::SubString(_substring_op) = op
55        && array.string_fingerprints.as_ref().is_none()
56    {
57        return None;
58    }
59    Some(array.compare_with(needle, op))
60}
61
62pub(super) async fn try_eval_predicate_on_disk(
63    expr: &Arc<dyn PhysicalExpr>,
64    array: &LiquidByteViewArray<DiskBuffer>,
65) -> Option<BooleanArray> {
66    let cur_expr = match ByteViewExpression::try_from(expr) {
67        Ok(expr) => expr,
68        Err(UnsupportedExpression::Constant(v)) => {
69            let bool_array = if v {
70                BooleanBuffer::new_set(array.len())
71            } else {
72                BooleanBuffer::new_unset(array.len())
73            };
74            return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
75        }
76        Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
77            return None;
78        }
79    };
80
81    let op = cur_expr.op();
82    let needle = cur_expr.literal();
83
84    if let ByteViewOperator::SubString(_substring_op) = op
85        && array.string_fingerprints.as_ref().is_none()
86    {
87        return None;
88    }
89    let result = array.compare_with(needle, op).await;
90    Some(result)
91}
92
93use std::fmt::Display;
94
95/// Detailed memory usage of the byte view array
96pub struct ByteViewArrayMemoryUsage {
97    /// Memory usage of the dictionary key
98    pub dictionary_key: usize,
99    /// Memory usage of the prefix keys
100    pub prefix_keys: usize,
101    /// Memory usage of the raw FSST buffer
102    pub fsst_buffer: usize,
103    /// Memory usage of the shared prefix
104    pub shared_prefix: usize,
105    /// Memory usage of the string fingerprints
106    pub string_fingerprints: usize,
107    /// Memory usage of the struct size
108    pub struct_size: usize,
109}
110
111impl Display for ByteViewArrayMemoryUsage {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("ByteViewArrayMemoryUsage")
114            .field("dictionary_key", &self.dictionary_key)
115            .field("prefix_keys", &self.prefix_keys)
116            .field("fsst_buffer", &self.fsst_buffer)
117            .field("shared_prefix", &self.shared_prefix)
118            .field("string_fingerprints", &self.string_fingerprints)
119            .field("struct_size", &self.struct_size)
120            .field("total", &self.total())
121            .finish()
122    }
123}
124
125impl ByteViewArrayMemoryUsage {
126    /// Get the total memory usage of the byte view array
127    pub fn total(&self) -> usize {
128        self.dictionary_key
129            + self.prefix_keys
130            + self.fsst_buffer
131            + self.shared_prefix
132            + self.string_fingerprints
133            + self.struct_size
134    }
135}
136
137impl std::ops::AddAssign for ByteViewArrayMemoryUsage {
138    fn add_assign(&mut self, other: Self) {
139        self.dictionary_key += other.dictionary_key;
140        self.prefix_keys += other.prefix_keys;
141        self.fsst_buffer += other.fsst_buffer;
142        self.shared_prefix += other.shared_prefix;
143        self.string_fingerprints += other.string_fingerprints;
144        self.struct_size += other.struct_size;
145    }
146}