liquid_cache/liquid_array/byte_view_array/
mod.rs1use arrow::array::BooleanArray;
4use arrow::array::{
5 Array, ArrayRef, BinaryArray, DictionaryArray, StringArray, UInt16Array, types::UInt16Type,
6};
7use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
8use arrow::compute::cast;
9use arrow_schema::DataType;
10use bytes::Bytes;
11use datafusion::physical_plan::PhysicalExpr;
12use std::any::Any;
13use std::sync::Arc;
14
15#[cfg(test)]
16use std::cell::Cell;
17
18use crate::cache::CacheExpression;
19use crate::liquid_array::byte_array::{ArrowByteType, build_dict_selection};
20use crate::liquid_array::byte_view_array::fingerprint::build_fingerprints;
21use crate::liquid_array::raw::FsstArray;
22use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
23use crate::liquid_array::{
24 LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, SqueezeIoHandler,
25};
26
27mod comparisons;
28mod conversions;
29mod fingerprint;
30mod helpers;
31mod operator;
32mod serialization;
33
34#[cfg(test)]
35mod tests;
36
37pub use helpers::ByteViewArrayMemoryUsage;
38pub use operator::{ByteViewOperator, Comparison, Equality, SubString};
39
40#[cfg(test)]
41thread_local! {
42 static DISK_READ_COUNTER: Cell<usize> = const { Cell::new(0)};
43 static FULL_DATA_COMPARISON_COUNTER: Cell<usize> = const { Cell::new(0)};
44}
45
46#[cfg(test)]
47fn get_disk_read_counter() -> usize {
48 DISK_READ_COUNTER.with(|counter| counter.get())
49}
50
51#[cfg(test)]
52fn reset_disk_read_counter() {
53 DISK_READ_COUNTER.with(|counter| counter.set(0));
54}
55
56#[derive(Clone)]
77pub struct LiquidByteViewArray<B: FsstBacking> {
78 pub(super) dictionary_keys: UInt16Array,
80 pub(super) prefix_keys: Arc<[PrefixKey]>,
82 pub(super) fsst_buffer: B,
84 pub(super) original_arrow_type: ArrowByteType,
86 pub(super) shared_prefix: Vec<u8>,
88 pub(super) string_fingerprints: Option<Arc<[u32]>>,
90}
91
92#[derive(Clone, Copy, Debug)]
93pub(crate) struct ByteViewBuildOptions {
94 pub(super) arrow_type: ArrowByteType,
95 pub(super) build_fingerprints: bool,
96}
97
98impl ByteViewBuildOptions {
99 pub(crate) fn new(arrow_type: ArrowByteType) -> Self {
100 Self {
101 arrow_type,
102 build_fingerprints: false,
103 }
104 }
105
106 pub(crate) fn for_data_type(data_type: &DataType, build_fingerprints: bool) -> Self {
107 Self {
108 arrow_type: ArrowByteType::from_arrow_type(data_type),
109 build_fingerprints,
110 }
111 }
112}
113
114impl<B: FsstBacking> std::fmt::Debug for LiquidByteViewArray<B> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("LiquidByteViewArray")
117 .field("dictionary_keys", &self.dictionary_keys)
118 .field("prefix_keys", &self.prefix_keys)
119 .field("fsst_buffer", &self.fsst_buffer)
120 .field("original_arrow_type", &self.original_arrow_type)
121 .field("shared_prefix", &self.shared_prefix)
122 .field("string_fingerprints", &self.string_fingerprints)
123 .finish()
124 }
125}
126
127impl<B: FsstBacking> LiquidByteViewArray<B> {
128 fn to_dict_arrow_inner(
130 &self,
131 keys_array: UInt16Array,
132 values_buffer: Buffer,
133 offsets_buffer: OffsetBuffer<i32>,
134 ) -> DictionaryArray<UInt16Type> {
135 let values = if self.original_arrow_type == ArrowByteType::Utf8
136 || self.original_arrow_type == ArrowByteType::Utf8View
137 || self.original_arrow_type == ArrowByteType::Dict16Utf8
138 {
139 let string_array =
140 unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
141 Arc::new(string_array) as ArrayRef
142 } else {
143 let binary_array =
144 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
145 Arc::new(binary_array) as ArrayRef
146 };
147
148 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys_array, values) }
149 }
150
151 fn should_decompress_keyed(&self) -> bool {
152 self.dictionary_keys.len() < 2048 || self.dictionary_keys.len() < self.prefix_keys.len()
153 }
154
155 pub fn nulls(&self) -> Option<&NullBuffer> {
157 self.dictionary_keys.nulls()
158 }
159
160 pub fn get_detailed_memory_usage(&self) -> ByteViewArrayMemoryUsage {
162 let fingerprint_bytes = self
163 .string_fingerprints
164 .as_ref()
165 .map(|fingerprints| fingerprints.len() * std::mem::size_of::<u32>())
166 .unwrap_or(0);
167 ByteViewArrayMemoryUsage {
168 dictionary_key: self.dictionary_keys.get_array_memory_size(),
169 prefix_keys: self.prefix_keys.len() * std::mem::size_of::<PrefixKey>(),
170 fsst_buffer: self.fsst_buffer.get_array_memory_size(),
171 shared_prefix: self.shared_prefix.len(),
172 string_fingerprints: fingerprint_bytes,
173 struct_size: std::mem::size_of::<Self>(),
174 }
175 }
176
177 pub fn len(&self) -> usize {
179 self.dictionary_keys.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.dictionary_keys.is_empty()
185 }
186
187 #[cfg(test)]
189 pub fn get_disk_read_count(&self) -> usize {
190 get_disk_read_counter()
191 }
192
193 #[cfg(test)]
195 pub fn reset_disk_read_count(&self) {
196 reset_disk_read_counter()
197 }
198}
199
200impl LiquidByteViewArray<FsstArray> {
201 pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
203 if self.should_decompress_keyed() {
204 self.to_dict_arrow_decompress_keyed()
205 } else {
206 self.to_dict_arrow_decompress_all()
207 }
208 }
209
210 fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
211 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
212 self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
213 }
214
215 fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
216 let (selected, new_keys) =
217 build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
218 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed_selected(&selected);
219 self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
220 }
221
222 pub fn to_arrow_array(&self) -> ArrayRef {
224 let dict = self.to_dict_arrow();
225 cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
226 }
227
228 pub fn is_fsst_buffer_on_disk(&self) -> bool {
230 false
231 }
232}
233
234impl LiquidByteViewArray<DiskBuffer> {
235 pub fn is_fsst_buffer_on_disk(&self) -> bool {
237 true
238 }
239
240 pub async fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
242 if self.should_decompress_keyed() {
243 self.to_dict_arrow_decompress_keyed().await
244 } else {
245 self.to_dict_arrow_decompress_all().await
246 }
247 }
248
249 async fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
250 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed().await;
251 self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
252 }
253
254 async fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
255 let (selected, new_keys) =
256 build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
257 let (values_buffer, offsets_buffer) =
258 self.fsst_buffer.to_uncompressed_selected(&selected).await;
259 self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
260 }
261
262 pub async fn to_arrow_array(&self) -> ArrayRef {
264 let dict = self.to_dict_arrow().await;
265 cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
266 }
267}
268
269impl LiquidArray for LiquidByteViewArray<FsstArray> {
270 fn as_any(&self) -> &dyn Any {
271 self
272 }
273
274 fn get_array_memory_size(&self) -> usize {
275 self.get_detailed_memory_usage().total()
276 }
277
278 fn len(&self) -> usize {
279 self.dictionary_keys.len()
280 }
281
282 #[inline]
283 fn to_arrow_array(&self) -> ArrayRef {
284 let dict = self.to_arrow_array();
285 Arc::new(dict)
286 }
287
288 fn to_best_arrow_array(&self) -> ArrayRef {
289 let dict = self.to_dict_arrow();
290 Arc::new(dict)
291 }
292
293 fn try_eval_predicate(
294 &self,
295 expr: &Arc<dyn PhysicalExpr>,
296 filter: &BooleanBuffer,
297 ) -> Option<BooleanArray> {
298 let filtered = helpers::filter_inner(self, filter);
299
300 helpers::try_eval_predicate_in_memory(expr, &filtered)
301 }
302
303 fn to_bytes(&self) -> Vec<u8> {
304 self.to_bytes_inner().expect("InMemoryFsstBuffer")
305 }
306
307 fn original_arrow_data_type(&self) -> DataType {
308 self.original_arrow_type.to_arrow_type()
309 }
310
311 fn data_type(&self) -> LiquidDataType {
312 LiquidDataType::ByteViewArray
313 }
314
315 fn squeeze(
316 &self,
317 io: Arc<dyn SqueezeIoHandler>,
318 squeeze_hint: Option<&CacheExpression>,
319 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
320 squeeze_hint?;
321
322 let string_fingerprints = if matches!(squeeze_hint, Some(CacheExpression::SubstringSearch))
323 {
324 self.string_fingerprints.clone().or_else(|| {
325 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
326 Some(build_fingerprints(&values_buffer, &offsets_buffer))
327 })
328 } else {
329 None
330 };
331
332 let bytes = match self.to_bytes_inner() {
334 Ok(b) => b,
335 Err(_) => return None,
336 };
337
338 let disk_range = 0u64..(bytes.len() as u64);
340 let compressor = self.fsst_buffer.compressor_arc();
341 let disk = DiskBuffer::new(
342 self.fsst_buffer.uncompressed_bytes(),
343 io,
344 disk_range,
345 compressor,
346 );
347 let hybrid = LiquidByteViewArray::<DiskBuffer> {
348 dictionary_keys: self.dictionary_keys.clone(),
349 prefix_keys: self.prefix_keys.clone(),
350 fsst_buffer: disk,
351 original_arrow_type: self.original_arrow_type,
352 shared_prefix: self.shared_prefix.clone(),
353 string_fingerprints,
354 };
355
356 let bytes = Bytes::from(bytes);
357 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, bytes))
358 }
359
360 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
361 let filtered = helpers::filter_inner(self, selection);
362 filtered.to_arrow_array()
363 }
364}
365
366#[async_trait::async_trait]
367impl LiquidSqueezedArray for LiquidByteViewArray<DiskBuffer> {
368 fn as_any(&self) -> &dyn Any {
370 self
371 }
372
373 fn get_array_memory_size(&self) -> usize {
375 self.get_detailed_memory_usage().total()
376 }
377
378 fn len(&self) -> usize {
380 self.dictionary_keys.len()
381 }
382
383 fn is_empty(&self) -> bool {
385 self.len() == 0
386 }
387
388 async fn to_arrow_array(&self) -> ArrayRef {
390 let bytes = self
391 .fsst_buffer
392 .squeeze_io()
393 .read(Some(self.fsst_buffer.disk_range()))
394 .await
395 .expect("read squeezed backing");
396 let hydrated =
397 LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.fsst_buffer.compressor_arc());
398 LiquidByteViewArray::<FsstArray>::to_arrow_array(&hydrated)
399 }
400
401 fn data_type(&self) -> LiquidDataType {
403 LiquidDataType::ByteViewArray
404 }
405
406 fn original_arrow_data_type(&self) -> DataType {
407 self.original_arrow_type.to_arrow_type()
408 }
409
410 async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
412 let select_any = selection.count_set_bits() > 0;
413 if !select_any {
414 return arrow::array::new_empty_array(&self.original_arrow_data_type());
415 }
416 let filtered = helpers::filter_inner(self, selection);
417 filtered.to_arrow_array().await
418 }
419
420 async fn try_eval_predicate(
426 &self,
427 expr: &Arc<dyn PhysicalExpr>,
428 filter: &BooleanBuffer,
429 ) -> Option<BooleanArray> {
430 let filtered = helpers::filter_inner(self, filter);
432 helpers::try_eval_predicate_on_disk(expr, &filtered).await
433 }
434}