liquid_cache/liquid_array/byte_view_array/
helpers.rs1use arrow::array::{
2 BooleanArray, BooleanBufferBuilder, UInt16Array, cast::AsArray, types::UInt16Type,
3};
4use arrow::buffer::BooleanBuffer;
5use datafusion_physical_expr::PhysicalExpr;
6use std::sync::Arc;
7
8use super::LiquidByteViewArray;
9use super::operator::{ByteViewExpression, ByteViewOperator};
10use crate::liquid_array::byte_view_array::operator::UnsupportedExpression;
11use crate::liquid_array::raw::FsstArray;
12use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking};
13
14pub(super) fn build_dict_selection(
15 keys: &UInt16Array,
16 dict_len: usize,
17) -> (Vec<usize>, UInt16Array) {
18 let mut hit_mask = BooleanBufferBuilder::new(dict_len);
19 hit_mask.advance(dict_len);
20 for value in keys.iter().flatten() {
21 hit_mask.set_bit(value as usize, true);
22 }
23 let hit_mask = hit_mask.finish();
24 let selected_cnt = hit_mask.count_set_bits();
25
26 let mut key_map = vec![u16::MAX; dict_len];
27 let mut selected = Vec::with_capacity(selected_cnt);
28 let mut remapped: u16 = 0;
29 for (index, selected_flag) in hit_mask.iter().enumerate() {
30 if selected_flag {
31 key_map[index] = remapped;
32 selected.push(index);
33 remapped += 1;
34 }
35 }
36
37 let new_keys = UInt16Array::from_iter(
38 keys.iter()
39 .map(|value| value.map(|value| key_map[value as usize])),
40 );
41 (selected, new_keys)
42}
43
44pub(super) fn filter_inner<B: FsstBacking>(
45 array: &LiquidByteViewArray<B>,
46 filter: &BooleanBuffer,
47) -> LiquidByteViewArray<B> {
48 let filter = BooleanArray::new(filter.clone(), None);
53 let filtered_keys = arrow::compute::filter(&array.dictionary_keys, &filter).unwrap();
54 let filtered_keys = filtered_keys.as_primitive::<UInt16Type>().clone();
55
56 LiquidByteViewArray {
57 dictionary_keys: filtered_keys,
58 prefix_keys: array.prefix_keys.clone(),
59 fsst_buffer: array.fsst_buffer.clone(),
60 original_arrow_type: array.original_arrow_type,
61 shared_prefix: array.shared_prefix.clone(),
62 string_fingerprints: array.string_fingerprints.clone(),
63 }
64}
65
66pub(super) fn try_eval_predicate_in_memory(
67 expr: &Arc<dyn PhysicalExpr>,
68 array: &LiquidByteViewArray<FsstArray>,
69) -> Option<BooleanArray> {
70 let expr = match ByteViewExpression::try_from(expr) {
71 Ok(expr) => expr,
72 Err(UnsupportedExpression::Constant(v)) => {
73 let bool_array = if v {
74 BooleanBuffer::new_set(array.len())
75 } else {
76 BooleanBuffer::new_unset(array.len())
77 };
78 return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
79 }
80 Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
81 return None;
82 }
83 };
84 let op = expr.op();
85 let needle = expr.literal();
86 if let ByteViewOperator::SubString(_substring_op) = op
87 && array.string_fingerprints.as_ref().is_none()
88 {
89 return None;
90 }
91 Some(array.compare_with(needle, op))
92}
93
94pub(super) async fn try_eval_predicate_on_disk(
95 expr: &Arc<dyn PhysicalExpr>,
96 array: &LiquidByteViewArray<DiskBuffer>,
97) -> Option<BooleanArray> {
98 let cur_expr = match ByteViewExpression::try_from(expr) {
99 Ok(expr) => expr,
100 Err(UnsupportedExpression::Constant(v)) => {
101 let bool_array = if v {
102 BooleanBuffer::new_set(array.len())
103 } else {
104 BooleanBuffer::new_unset(array.len())
105 };
106 return Some(BooleanArray::new(bool_array, array.nulls().cloned()));
107 }
108 Err(UnsupportedExpression::Expr) | Err(UnsupportedExpression::Op) => {
109 return None;
110 }
111 };
112
113 let op = cur_expr.op();
114 let needle = cur_expr.literal();
115
116 if let ByteViewOperator::SubString(_substring_op) = op
117 && array.string_fingerprints.as_ref().is_none()
118 {
119 return None;
120 }
121 let result = array.compare_with(needle, op).await;
122 Some(result)
123}
124
125use std::fmt::Display;
126
127pub struct ByteViewArrayMemoryUsage {
129 pub dictionary_key: usize,
131 pub prefix_keys: usize,
133 pub fsst_buffer: usize,
135 pub shared_prefix: usize,
137 pub string_fingerprints: usize,
139 pub struct_size: usize,
141}
142
143impl Display for ByteViewArrayMemoryUsage {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("ByteViewArrayMemoryUsage")
146 .field("dictionary_key", &self.dictionary_key)
147 .field("prefix_keys", &self.prefix_keys)
148 .field("fsst_buffer", &self.fsst_buffer)
149 .field("shared_prefix", &self.shared_prefix)
150 .field("string_fingerprints", &self.string_fingerprints)
151 .field("struct_size", &self.struct_size)
152 .field("total", &self.total())
153 .finish()
154 }
155}
156
157impl ByteViewArrayMemoryUsage {
158 pub fn total(&self) -> usize {
160 self.dictionary_key
161 + self.prefix_keys
162 + self.fsst_buffer
163 + self.shared_prefix
164 + self.string_fingerprints
165 + self.struct_size
166 }
167}
168
169impl std::ops::AddAssign for ByteViewArrayMemoryUsage {
170 fn add_assign(&mut self, other: Self) {
171 self.dictionary_key += other.dictionary_key;
172 self.prefix_keys += other.prefix_keys;
173 self.fsst_buffer += other.fsst_buffer;
174 self.shared_prefix += other.shared_prefix;
175 self.string_fingerprints += other.string_fingerprints;
176 self.struct_size += other.struct_size;
177 }
178}