use arrow::array::BooleanArray;
use arrow::array::{
Array, ArrayRef, BinaryArray, DictionaryArray, StringArray, UInt16Array, types::UInt16Type,
};
use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
use arrow::compute::cast;
use arrow_schema::DataType;
use bytes::Bytes;
use datafusion::physical_plan::PhysicalExpr;
use std::any::Any;
use std::sync::Arc;
#[cfg(test)]
use std::cell::Cell;
use crate::cache::CacheExpression;
use crate::liquid_array::byte_array::{ArrowByteType, build_dict_selection};
use crate::liquid_array::byte_view_array::fingerprint::build_fingerprints;
use crate::liquid_array::raw::FsstArray;
use crate::liquid_array::raw::fsst_buffer::{DiskBuffer, FsstBacking, PrefixKey};
use crate::liquid_array::{
LiquidArray, LiquidDataType, LiquidSqueezedArray, LiquidSqueezedArrayRef, SqueezeIoHandler,
};
mod comparisons;
mod conversions;
mod fingerprint;
mod helpers;
mod operator;
mod serialization;
#[cfg(test)]
mod tests;
pub use helpers::ByteViewArrayMemoryUsage;
pub use operator::{ByteViewOperator, Comparison, Equality, SubString};
#[cfg(test)]
thread_local! {
static DISK_READ_COUNTER: Cell<usize> = const { Cell::new(0)};
static FULL_DATA_COMPARISON_COUNTER: Cell<usize> = const { Cell::new(0)};
}
#[cfg(test)]
fn get_disk_read_counter() -> usize {
DISK_READ_COUNTER.with(|counter| counter.get())
}
#[cfg(test)]
fn reset_disk_read_counter() {
DISK_READ_COUNTER.with(|counter| counter.set(0));
}
#[derive(Clone)]
pub struct LiquidByteViewArray<B: FsstBacking> {
pub(super) dictionary_keys: UInt16Array,
pub(super) prefix_keys: Arc<[PrefixKey]>,
pub(super) fsst_buffer: B,
pub(super) original_arrow_type: ArrowByteType,
pub(super) shared_prefix: Vec<u8>,
pub(super) string_fingerprints: Option<Arc<[u32]>>,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct ByteViewBuildOptions {
pub(super) arrow_type: ArrowByteType,
pub(super) build_fingerprints: bool,
}
impl ByteViewBuildOptions {
pub(crate) fn new(arrow_type: ArrowByteType) -> Self {
Self {
arrow_type,
build_fingerprints: false,
}
}
pub(crate) fn for_data_type(data_type: &DataType, build_fingerprints: bool) -> Self {
Self {
arrow_type: ArrowByteType::from_arrow_type(data_type),
build_fingerprints,
}
}
}
impl<B: FsstBacking> std::fmt::Debug for LiquidByteViewArray<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiquidByteViewArray")
.field("dictionary_keys", &self.dictionary_keys)
.field("prefix_keys", &self.prefix_keys)
.field("fsst_buffer", &self.fsst_buffer)
.field("original_arrow_type", &self.original_arrow_type)
.field("shared_prefix", &self.shared_prefix)
.field("string_fingerprints", &self.string_fingerprints)
.finish()
}
}
impl<B: FsstBacking> LiquidByteViewArray<B> {
fn to_dict_arrow_inner(
&self,
keys_array: UInt16Array,
values_buffer: Buffer,
offsets_buffer: OffsetBuffer<i32>,
) -> DictionaryArray<UInt16Type> {
let values = if self.original_arrow_type == ArrowByteType::Utf8
|| self.original_arrow_type == ArrowByteType::Utf8View
|| self.original_arrow_type == ArrowByteType::Dict16Utf8
{
let string_array =
unsafe { StringArray::new_unchecked(offsets_buffer, values_buffer, None) };
Arc::new(string_array) as ArrayRef
} else {
let binary_array =
unsafe { BinaryArray::new_unchecked(offsets_buffer, values_buffer, None) };
Arc::new(binary_array) as ArrayRef
};
unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys_array, values) }
}
fn should_decompress_keyed(&self) -> bool {
self.dictionary_keys.len() < 2048 || self.dictionary_keys.len() < self.prefix_keys.len()
}
pub fn nulls(&self) -> Option<&NullBuffer> {
self.dictionary_keys.nulls()
}
pub fn get_detailed_memory_usage(&self) -> ByteViewArrayMemoryUsage {
let fingerprint_bytes = self
.string_fingerprints
.as_ref()
.map(|fingerprints| fingerprints.len() * std::mem::size_of::<u32>())
.unwrap_or(0);
ByteViewArrayMemoryUsage {
dictionary_key: self.dictionary_keys.get_array_memory_size(),
prefix_keys: self.prefix_keys.len() * std::mem::size_of::<PrefixKey>(),
fsst_buffer: self.fsst_buffer.get_array_memory_size(),
shared_prefix: self.shared_prefix.len(),
string_fingerprints: fingerprint_bytes,
struct_size: std::mem::size_of::<Self>(),
}
}
pub fn len(&self) -> usize {
self.dictionary_keys.len()
}
pub fn is_empty(&self) -> bool {
self.dictionary_keys.is_empty()
}
#[cfg(test)]
pub fn get_disk_read_count(&self) -> usize {
get_disk_read_counter()
}
#[cfg(test)]
pub fn reset_disk_read_count(&self) {
reset_disk_read_counter()
}
}
impl LiquidByteViewArray<FsstArray> {
pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
if self.should_decompress_keyed() {
self.to_dict_arrow_decompress_keyed()
} else {
self.to_dict_arrow_decompress_all()
}
}
fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
}
fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
let (selected, new_keys) =
build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed_selected(&selected);
self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
}
pub fn to_arrow_array(&self) -> ArrayRef {
let dict = self.to_dict_arrow();
cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
}
pub fn is_fsst_buffer_on_disk(&self) -> bool {
false
}
}
impl LiquidByteViewArray<DiskBuffer> {
pub fn is_fsst_buffer_on_disk(&self) -> bool {
true
}
pub async fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
if self.should_decompress_keyed() {
self.to_dict_arrow_decompress_keyed().await
} else {
self.to_dict_arrow_decompress_all().await
}
}
async fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed().await;
self.to_dict_arrow_inner(self.dictionary_keys.clone(), values_buffer, offsets_buffer)
}
async fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
let (selected, new_keys) =
build_dict_selection(&self.dictionary_keys, self.prefix_keys.len());
let (values_buffer, offsets_buffer) =
self.fsst_buffer.to_uncompressed_selected(&selected).await;
self.to_dict_arrow_inner(new_keys, values_buffer, offsets_buffer)
}
pub async fn to_arrow_array(&self) -> ArrayRef {
let dict = self.to_dict_arrow().await;
cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
}
}
impl LiquidArray for LiquidByteViewArray<FsstArray> {
fn as_any(&self) -> &dyn Any {
self
}
fn get_array_memory_size(&self) -> usize {
self.get_detailed_memory_usage().total()
}
fn len(&self) -> usize {
self.dictionary_keys.len()
}
#[inline]
fn to_arrow_array(&self) -> ArrayRef {
let dict = self.to_arrow_array();
Arc::new(dict)
}
fn to_best_arrow_array(&self) -> ArrayRef {
let dict = self.to_dict_arrow();
Arc::new(dict)
}
fn try_eval_predicate(
&self,
expr: &Arc<dyn PhysicalExpr>,
filter: &BooleanBuffer,
) -> Option<BooleanArray> {
let filtered = helpers::filter_inner(self, filter);
helpers::try_eval_predicate_in_memory(expr, &filtered)
}
fn to_bytes(&self) -> Vec<u8> {
self.to_bytes_inner().expect("InMemoryFsstBuffer")
}
fn original_arrow_data_type(&self) -> DataType {
self.original_arrow_type.to_arrow_type()
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::ByteViewArray
}
fn squeeze(
&self,
io: Arc<dyn SqueezeIoHandler>,
squeeze_hint: Option<&CacheExpression>,
) -> Option<(LiquidSqueezedArrayRef, Bytes)> {
squeeze_hint?;
let string_fingerprints = if matches!(squeeze_hint, Some(CacheExpression::SubstringSearch))
{
self.string_fingerprints.clone().or_else(|| {
let (values_buffer, offsets_buffer) = self.fsst_buffer.to_uncompressed();
Some(build_fingerprints(&values_buffer, &offsets_buffer))
})
} else {
None
};
let bytes = match self.to_bytes_inner() {
Ok(b) => b,
Err(_) => return None,
};
let disk_range = 0u64..(bytes.len() as u64);
let compressor = self.fsst_buffer.compressor_arc();
let disk = DiskBuffer::new(
self.fsst_buffer.uncompressed_bytes(),
io,
disk_range,
compressor,
);
let hybrid = LiquidByteViewArray::<DiskBuffer> {
dictionary_keys: self.dictionary_keys.clone(),
prefix_keys: self.prefix_keys.clone(),
fsst_buffer: disk,
original_arrow_type: self.original_arrow_type,
shared_prefix: self.shared_prefix.clone(),
string_fingerprints,
};
let bytes = Bytes::from(bytes);
Some((Arc::new(hybrid) as LiquidSqueezedArrayRef, bytes))
}
fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let filtered = helpers::filter_inner(self, selection);
filtered.to_arrow_array()
}
}
#[async_trait::async_trait]
impl LiquidSqueezedArray for LiquidByteViewArray<DiskBuffer> {
fn as_any(&self) -> &dyn Any {
self
}
fn get_array_memory_size(&self) -> usize {
self.get_detailed_memory_usage().total()
}
fn len(&self) -> usize {
self.dictionary_keys.len()
}
fn is_empty(&self) -> bool {
self.len() == 0
}
async fn to_arrow_array(&self) -> ArrayRef {
let bytes = self
.fsst_buffer
.squeeze_io()
.read(Some(self.fsst_buffer.disk_range()))
.await
.expect("read squeezed backing");
let hydrated =
LiquidByteViewArray::<FsstArray>::from_bytes(bytes, self.fsst_buffer.compressor_arc());
LiquidByteViewArray::<FsstArray>::to_arrow_array(&hydrated)
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::ByteViewArray
}
fn original_arrow_data_type(&self) -> DataType {
self.original_arrow_type.to_arrow_type()
}
async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
let select_any = selection.count_set_bits() > 0;
if !select_any {
return arrow::array::new_empty_array(&self.original_arrow_data_type());
}
let filtered = helpers::filter_inner(self, selection);
filtered.to_arrow_array().await
}
async fn try_eval_predicate(
&self,
expr: &Arc<dyn PhysicalExpr>,
filter: &BooleanBuffer,
) -> Option<BooleanArray> {
let filtered = helpers::filter_inner(self, filter);
helpers::try_eval_predicate_on_disk(expr, &filtered).await
}
}