liquid_cache/liquid_array/byte_view_array/
helpers.rs1use arrow::array::{BooleanArray, cast::AsArray, types::UInt16Type};
2use arrow::buffer::BooleanBuffer;
3use datafusion::physical_plan::PhysicalExpr;
4use std::sync::Arc;
5
6use super::LiquidByteViewArray;
7use super::operator::{ByteViewExpression, ByteViewOperator};
8use crate::liquid_array::byte_view_array::operator::UnsupportedExpression;
9use crate::liquid_array::raw::FsstArray;
10use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking};
11
12pub(super) fn filter_inner<B: FsstBacking>(
13 array: &LiquidByteViewArray<B>,
14 filter: &BooleanBuffer,
15) -> LiquidByteViewArray<B> {
16 let filter = BooleanArray::new(filter.clone(), None);
21 let filtered_keys = arrow::compute::filter(&array.dictionary_keys, &filter).unwrap();
22 let filtered_keys = filtered_keys.as_primitive::<UInt16Type>().clone();
23
24 LiquidByteViewArray {
25 dictionary_keys: filtered_keys,
26 prefix_keys: array.prefix_keys.clone(),
27 fsst_buffer: array.fsst_buffer.clone(),
28 original_arrow_type: array.original_arrow_type,
29 shared_prefix: array.shared_prefix.clone(),
30 string_fingerprints: array.string_fingerprints.clone(),
31 }
32}
33
34pub(super) fn try_eval_predicate_in_memory(
35 expr: &Arc<dyn PhysicalExpr>,
36 array: &LiquidByteViewArray<FsstArray>,
37) -> Option<BooleanArray> {
38 let expr = match ByteViewExpression::try_from(expr) {
39 Ok(expr) => expr,
40 Err(UnsupportedExpression::Constant(v)) => {
41 let bool_array = if v {
42 BooleanBuffer::new_set(array.len())
43 } else {
44 BooleanBuffer::new_unset(array.len())
45 };
46 return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
47 }
48 Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
49 return None;
50 }
51 };
52 let op = expr.op();
53 let needle = expr.literal();
54 if let ByteViewOperator::SubString(_substring_op) = op
55 && array.string_fingerprints.as_ref().is_none()
56 {
57 return None;
58 }
59 Some(array.compare_with(needle, op))
60}
61
62pub(super) async fn try_eval_predicate_on_disk(
63 expr: &Arc<dyn PhysicalExpr>,
64 array: &LiquidByteViewArray<DiskBuffer>,
65) -> Option<BooleanArray> {
66 let cur_expr = match ByteViewExpression::try_from(expr) {
67 Ok(expr) => expr,
68 Err(UnsupportedExpression::Constant(v)) => {
69 let bool_array = if v {
70 BooleanBuffer::new_set(array.len())
71 } else {
72 BooleanBuffer::new_unset(array.len())
73 };
74 return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
75 }
76 Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
77 return None;
78 }
79 };
80
81 let op = cur_expr.op();
82 let needle = cur_expr.literal();
83
84 if let ByteViewOperator::SubString(_substring_op) = op
85 && array.string_fingerprints.as_ref().is_none()
86 {
87 return None;
88 }
89 let result = array.compare_with(needle, op).await;
90 Some(result)
91}
92
93use std::fmt::Display;
94
95pub struct ByteViewArrayMemoryUsage {
97 pub dictionary_key: usize,
99 pub prefix_keys: usize,
101 pub fsst_buffer: usize,
103 pub shared_prefix: usize,
105 pub string_fingerprints: usize,
107 pub struct_size: usize,
109}
110
111impl Display for ByteViewArrayMemoryUsage {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct("ByteViewArrayMemoryUsage")
114 .field("dictionary_key", &self.dictionary_key)
115 .field("prefix_keys", &self.prefix_keys)
116 .field("fsst_buffer", &self.fsst_buffer)
117 .field("shared_prefix", &self.shared_prefix)
118 .field("string_fingerprints", &self.string_fingerprints)
119 .field("struct_size", &self.struct_size)
120 .field("total", &self.total())
121 .finish()
122 }
123}
124
125impl ByteViewArrayMemoryUsage {
126 pub fn total(&self) -> usize {
128 self.dictionary_key
129 + self.prefix_keys
130 + self.fsst_buffer
131 + self.shared_prefix
132 + self.string_fingerprints
133 + self.struct_size
134 }
135}
136
137impl std::ops::AddAssign for ByteViewArrayMemoryUsage {
138 fn add_assign(&mut self, other: Self) {
139 self.dictionary_key += other.dictionary_key;
140 self.prefix_keys += other.prefix_keys;
141 self.fsst_buffer += other.fsst_buffer;
142 self.shared_prefix += other.shared_prefix;
143 self.string_fingerprints += other.string_fingerprints;
144 self.struct_size += other.struct_size;
145 }
146}