liquid_cache/liquid_array/byte_view_array/
mod.rs1use arrow::array::BooleanArray;
4use arrow::array::{
5 Array, ArrayRef, BinaryArray, DictionaryArray, StringArray, UInt16Array, types::UInt16Type,
6};
7use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
8use arrow::compute::cast;
9use arrow_schema::DataType;
10use bytes::Bytes;
11use std::any::Any;
12use std::sync::Arc;
13
14#[cfg(test)]
15use std::cell::Cell;
16
17use crate::cache::{CacheExpression, LiquidExpr};
18use crate::liquid_array::byte_view_array::fingerprint::build_fingerprints;
19use crate::liquid_array::raw::FsstArray;
20use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
21use crate::liquid_array::{
22 LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, SqueezeIoHandler,
23 SqueezedBacking, eval_predicate_on_array,
24};
25
26mod comparisons;
27mod conversions;
28mod fingerprint;
29mod helpers;
30mod operator;
31mod serialization;
32
33#[cfg(test)]
34mod tests;
35
36pub use helpers::ByteViewArrayMemoryUsage;
37pub use operator::{ByteViewOperator, Comparison, Equality, SubString};
38
39#[cfg(test)]
40thread_local! {
41 static DISK_READ_COUNTER: Cell<usize> = const { Cell::new(0)};
42 static FULL_DATA_COMPARISON_COUNTER: Cell<usize> = const { Cell::new(0)};
43}
44
45#[cfg(test)]
46fn get_disk_read_counter() -> usize {
47 DISK_READ_COUNTER.with(|counter| counter.get())
48}
49
50#[cfg(test)]
51fn reset_disk_read_counter() {
52 DISK_READ_COUNTER.with(|counter| counter.set(0));
53}
54
55#[derive(Clone)]
76pub struct LiquidByteViewArray<B: FsstBacking> {
77 pub(super) dictionary_keys: UInt16Array,
79 pub(super) prefix_keys: Arc<[PrefixKey]>,
81 pub(super) fsst_buffer: B,
83 pub(super) original_arrow_type: ArrowByteType,
85 pub(super) shared_prefix: Vec<u8>,
87 pub(super) string_fingerprints: Option<Arc<[u32]>>,
89}
90
91#[derive(Clone, Copy, Debug)]
92pub(crate) struct ByteViewBuildOptions {
93 pub(super) arrow_type: ArrowByteType,
94 pub(super) build_fingerprints: bool,
95}
96
97impl ByteViewBuildOptions {
98 pub(crate) fn new(arrow_type: ArrowByteType) -> Self {
99 Self {
100 arrow_type,
101 build_fingerprints: false,
102 }
103 }
104
105 pub(crate) fn for_data_type(data_type: &DataType, build_fingerprints: bool) -> Self {
106 Self {
107 arrow_type: ArrowByteType::from_arrow_type(data_type),
108 build_fingerprints,
109 }
110 }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq)]
114#[repr(u16)]
115pub(crate) enum ArrowByteType {
116 Utf8 = 0,
117 Utf8View = 1,
118 Dict16Binary = 2,
119 Dict16Utf8 = 3,
120 Binary = 4,
121 BinaryView = 5,
122}
123
124impl From<u16> for ArrowByteType {
125 fn from(value: u16) -> Self {
126 match value {
127 0 => ArrowByteType::Utf8,
128 1 => ArrowByteType::Utf8View,
129 2 => ArrowByteType::Dict16Binary,
130 3 => ArrowByteType::Dict16Utf8,
131 4 => ArrowByteType::Binary,
132 5 => ArrowByteType::BinaryView,
133 _ => panic!("Invalid arrow byte type: {value}"),
134 }
135 }
136}
137
138impl ArrowByteType {
139 pub fn to_arrow_type(self) -> DataType {
140 match self {
141 ArrowByteType::Utf8 => DataType::Utf8,
142 ArrowByteType::Utf8View => DataType::Utf8View,
143 ArrowByteType::Dict16Binary => {
144 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
145 }
146 ArrowByteType::Dict16Utf8 => {
147 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
148 }
149 ArrowByteType::Binary => DataType::Binary,
150 ArrowByteType::BinaryView => DataType::BinaryView,
151 }
152 }
153
154 pub fn from_arrow_type(ty: &DataType) -> Self {
155 match ty {
156 DataType::Utf8 => ArrowByteType::Utf8,
157 DataType::Utf8View => ArrowByteType::Utf8View,
158 DataType::Binary => ArrowByteType::Binary,
159 DataType::BinaryView => ArrowByteType::BinaryView,
160 DataType::Dictionary(_, _) => {
161 if ty
162 == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
163 {
164 ArrowByteType::Dict16Binary
165 } else if ty
166 == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
167 {
168 ArrowByteType::Dict16Utf8
169 } else {
170 panic!("Unsupported arrow type: {ty:?}")
171 }
172 }
173 _ => panic!("Unsupported arrow type: {ty:?}"),
174 }
175 }
176}
177
178impl<B: FsstBacking> std::fmt::Debug for LiquidByteViewArray<B> {
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 f.debug_struct("LiquidByteViewArray")
181 .field("dictionary_keys", &self.dictionary_keys)
182 .field("prefix_keys", &self.prefix_keys)
183 .field("fsst_buffer", &self.fsst_buffer)
184 .field("original_arrow_type", &self.original_arrow_type)
185 .field("shared_prefix", &self.shared_prefix)
186 .field("string_fingerprints", &self.string_fingerprints)
187 .finish()
188 }
189}
190
191impl<B: FsstBacking> LiquidByteViewArray<B> {
192 fn to_dict_arrow_inner(
194 &self,
195 keys_array: UInt16Array,
196 values_buffer: Buffer,
197 offsets_buffer: OffsetBuffer<i32>,
198 ) -> DictionaryArray<UInt16Type> {
199 let values = if self.original_arrow_type == ArrowByteType::Utf8
200 || self.original_arrow_type == ArrowByteType::Utf8View
201 || self.original_arrow_type == ArrowByteType::Dict16Utf8
202 {
203 let string_array =
204 unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
205 Arc::new(string_array) as ArrayRef
206 } else {
207 let binary_array =
208 unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
209 Arc::new(binary_array) as ArrayRef
210 };
211
212 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys_array, values) }
213 }
214
215 fn should_decompress_keyed(&self) -> bool {
216 self.dictionary_keys.len() < 2048 || self.dictionary_keys.len() < self.prefix_keys.len()
217 }
218
219 pub fn nulls(&self) -> Option<&NullBuffer> {
221 self.dictionary_keys.nulls()
222 }
223
224 pub fn get_detailed_memory_usage(&self) -> ByteViewArrayMemoryUsage {
226 let fingerprint_bytes = self
227 .string_fingerprints
228 .as_ref()
229 .map(|fingerprints| fingerprints.len() * std::mem::size_of::<u32>())
230 .unwrap_or(0);
231 ByteViewArrayMemoryUsage {
232 dictionary_key: self.dictionary_keys.get_array_memory_size(),
233 prefix_keys: self.prefix_keys.len() * std::mem::size_of::<PrefixKey>(),
234 fsst_buffer: self.fsst_buffer.get_array_memory_size(),
235 shared_prefix: self.shared_prefix.len(),
236 string_fingerprints: fingerprint_bytes,
237 struct_size: std::mem::size_of::<Self>(),
238 }
239 }
240
241 pub fn len(&self) -> usize {
243 self.dictionary_keys.len()
244 }
245
246 pub fn is_empty(&self) -> bool {
248 self.dictionary_keys.is_empty()
249 }
250
251 #[cfg(test)]
253 pub fn get_disk_read_count(&self) -> usize {
254 get_disk_read_counter()
255 }
256
257 #[cfg(test)]
259 pub fn reset_disk_read_count(&self) {
260 reset_disk_read_counter()
261 }
262}
263
264impl LiquidByteViewArray<FsstArray> {
265 pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
267 if self.should_decompress_keyed() {
268 self.to_dict_arrow_decompress_keyed()
269 } else {
270 self.to_dict_arrow_decompress_all()
271 }
272 }
273
274 fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
275 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
276 self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
277 }
278
279 fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
280 let (selected, new_keys) =
281 helpers::build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
282 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed_selected(&selected);
283 self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
284 }
285
286 pub fn to_arrow_array(&self) -> ArrayRef {
288 let dict = self.to_dict_arrow();
289 cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
290 }
291
292 pub fn is_fsst_buffer_on_disk(&self) -> bool {
294 false
295 }
296}
297
298impl LiquidByteViewArray<DiskBuffer> {
299 pub fn is_fsst_buffer_on_disk(&self) -> bool {
301 true
302 }
303
304 pub async fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
306 if self.should_decompress_keyed() {
307 self.to_dict_arrow_decompress_keyed().await
308 } else {
309 self.to_dict_arrow_decompress_all().await
310 }
311 }
312
313 async fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
314 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed().await;
315 self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
316 }
317
318 async fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
319 let (selected, new_keys) =
320 helpers::build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
321 let (values_buffer, offsets_buffer) =
322 self.fsst_buffer.to_uncompressed_selected(&selected).await;
323 self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
324 }
325
326 pub async fn to_arrow_array(&self) -> ArrayRef {
328 let dict = self.to_dict_arrow().await;
329 cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
330 }
331}
332
333impl LiquidArray for LiquidByteViewArray<FsstArray> {
334 fn as_any(&self) -> &dyn Any {
335 self
336 }
337
338 fn get_array_memory_size(&self) -> usize {
339 self.get_detailed_memory_usage().total()
340 }
341
342 fn len(&self) -> usize {
343 self.dictionary_keys.len()
344 }
345
346 #[inline]
347 fn to_arrow_array(&self) -> ArrayRef {
348 let dict = self.to_arrow_array();
349 Arc::new(dict)
350 }
351
352 fn to_best_arrow_array(&self) -> ArrayRef {
353 let dict = self.to_dict_arrow();
354 Arc::new(dict)
355 }
356
357 fn try_eval_predicate(&self, expr: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
358 let filtered = helpers::filter_inner(self, filter);
359
360 helpers::try_eval_predicate_in_memory(expr.physical_expr(), &filtered)
361 .unwrap_or_else(|| eval_predicate_on_array(filtered.to_arrow_array(), expr))
362 }
363
364 fn to_bytes(&self) -> Vec<u8> {
365 self.to_bytes_inner().expect("InMemoryFsstBuffer")
366 }
367
368 fn original_arrow_data_type(&self) -> DataType {
369 self.original_arrow_type.to_arrow_type()
370 }
371
372 fn data_type(&self) -> LiquidDataType {
373 LiquidDataType::ByteViewArray
374 }
375
376 fn squeeze(
377 &self,
378 io: Arc<dyn SqueezeIoHandler>,
379 squeeze_hint: Option<&CacheExpression>,
380 ) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
381 squeeze_hint?;
382
383 let string_fingerprints = if matches!(squeeze_hint, Some(CacheExpression::SubstringSearch))
384 {
385 self.string_fingerprints.clone().or_else(|| {
386 let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
387 Some(build_fingerprints(&values_buffer, &offsets_buffer))
388 })
389 } else {
390 None
391 };
392
393 let bytes = match self.to_bytes_inner() {
395 Ok(b) => b,
396 Err(_) => return None,
397 };
398
399 let disk_range = 0u64..(bytes.len() as u64);
401 let compressor = self.fsst_buffer.compressor_arc();
402 let disk = DiskBuffer::new(
403 self.fsst_buffer.uncompressed_bytes(),
404 io,
405 disk_range,
406 compressor,
407 );
408 let hybrid = LiquidByteViewArray::<DiskBuffer> {
409 dictionary_keys: self.dictionary_keys.clone(),
410 prefix_keys: self.prefix_keys.clone(),
411 fsst_buffer: disk,
412 original_arrow_type: self.original_arrow_type,
413 shared_prefix: self.shared_prefix.clone(),
414 string_fingerprints,
415 };
416
417 let bytes = Bytes::from(bytes);
418 Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, bytes))
419 }
420
421 fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
422 let filtered = helpers::filter_inner(self, selection);
423 filtered.to_arrow_array()
424 }
425}
426
427#[async_trait::async_trait]
428impl LiquidSqueezedArray for LiquidByteViewArray<DiskBuffer> {
429 fn as_any(&self) -> &dyn Any {
431 self
432 }
433
434 fn get_array_memory_size(&self) -> usize {
436 self.get_detailed_memory_usage().total()
437 }
438
439 fn len(&self) -> usize {
441 self.dictionary_keys.len()
442 }
443
444 fn is_empty(&self) -> bool {
446 self.len() == 0
447 }
448
449 async fn to_arrow_array(&self) -> ArrayRef {
451 let bytes = self
452 .fsst_buffer
453 .squeeze_io()
454 .read(Some(self.fsst_buffer.disk_range()))
455 .await
456 .expect("read squeezed backing");
457 let hydrated =
458 LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.fsst_buffer.compressor_arc());
459 LiquidByteViewArray::<FsstArray>::to_arrow_array(&hydrated)
460 }
461
462 fn data_type(&self) -> LiquidDataType {
464 LiquidDataType::ByteViewArray
465 }
466
467 fn original_arrow_data_type(&self) -> DataType {
468 self.original_arrow_type.to_arrow_type()
469 }
470
471 fn disk_backing(&self) -> SqueezedBacking {
472 SqueezedBacking::Liquid(self.fsst_buffer.disk_range_len())
473 }
474
475 async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
477 let select_any = selection.count_set_bits() > 0;
478 if !select_any {
479 return arrow::array::new_empty_array(&self.original_arrow_data_type());
480 }
481 let filtered = helpers::filter_inner(self, selection);
482 filtered.to_arrow_array().await
483 }
484
485 async fn try_eval_predicate(&self, expr: &LiquidExpr, filter: &BooleanBuffer) -> BooleanArray {
491 let filtered = helpers::filter_inner(self, filter);
493 if let Some(mask) =
494 helpers::try_eval_predicate_on_disk(expr.physical_expr(), &filtered).await
495 {
496 mask
497 } else {
498 eval_predicate_on_array(filtered.to_arrow_array().await, expr)
499 }
500 }
501}