use arrow::array::{
Array, ArrayAccessor, ArrayIter, BinaryArray, BinaryViewArray, DictionaryArray,
GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray, types::UInt16Type,
};
use arrow::datatypes::ByteArrayType;
use arrow_schema::DataType;
use fsst::Compressor;
use std::sync::Arc;
use super::{ByteViewBuildOptions, LiquidByteViewArray};
use crate::liquid_array::byte_array::ArrowByteType;
use crate::liquid_array::byte_view_array::fingerprint::StringFingerprint;
use crate::liquid_array::raw::fsst_buffer::{
FsstArray, FsstBacking, PrefixKey, RawFsstBuffer, train_compressor,
};
use crate::utils::CheckedDictionaryArray;
impl<B: FsstBacking> LiquidByteViewArray<B> {
pub fn from_string_view_array(
array: &StringViewArray,
compressor: Arc<Compressor>,
) -> LiquidByteViewArray<FsstArray> {
Self::from_view_array_inner(
array,
compressor,
ByteViewBuildOptions::new(ArrowByteType::Utf8View),
)
}
pub fn from_binary_view_array(
array: &BinaryViewArray,
compressor: Arc<Compressor>,
) -> LiquidByteViewArray<FsstArray> {
Self::from_view_array_inner(
array,
compressor,
ByteViewBuildOptions::new(ArrowByteType::BinaryView),
)
}
pub fn from_string_array(
array: &StringArray,
compressor: Arc<Compressor>,
) -> LiquidByteViewArray<FsstArray> {
Self::from_byte_array_inner(
array,
compressor,
ByteViewBuildOptions::new(ArrowByteType::Utf8),
)
}
pub fn from_binary_array(
array: &BinaryArray,
compressor: Arc<Compressor>,
) -> LiquidByteViewArray<FsstArray> {
Self::from_byte_array_inner(
array,
compressor,
ByteViewBuildOptions::new(ArrowByteType::Binary),
)
}
pub fn train_from_string_view(
array: &StringViewArray,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
Self::train_from_string_view_inner(
array,
ByteViewBuildOptions::new(ArrowByteType::Utf8View),
)
}
pub fn train_from_binary_view(
array: &BinaryViewArray,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
Self::train_from_binary_view_inner(
array,
ByteViewBuildOptions::new(ArrowByteType::BinaryView),
)
}
pub fn train_from_arrow<T: ByteArrayType>(
array: &GenericByteArray<T>,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
Self::train_from_arrow_inner(
array,
ByteViewBuildOptions::new(ArrowByteType::from_arrow_type(&T::DATA_TYPE)),
)
}
pub unsafe fn from_unique_dict_array(
array: &DictionaryArray<UInt16Type>,
compressor: Arc<Compressor>,
) -> LiquidByteViewArray<FsstArray> {
let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
Self::from_dict_array_inner(
unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
compressor,
ByteViewBuildOptions::new(arrow_type),
)
}
pub(crate) unsafe fn from_unique_dict_array_with_options(
array: &DictionaryArray<UInt16Type>,
compressor: Arc<Compressor>,
options: ByteViewBuildOptions,
) -> LiquidByteViewArray<FsstArray> {
Self::from_dict_array_inner(
unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
compressor,
options,
)
}
pub fn train_from_arrow_dict(
array: &DictionaryArray<UInt16Type>,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
let options = if array.values().data_type() == &DataType::Utf8 {
ByteViewBuildOptions::new(ArrowByteType::Dict16Utf8)
} else if array.values().data_type() == &DataType::Binary {
ByteViewBuildOptions::new(ArrowByteType::Dict16Binary)
} else {
panic!("Unsupported dictionary type: {:?}", array.data_type())
};
Self::train_from_arrow_dict_inner(array, options)
}
pub(crate) fn train_from_string_view_inner(
array: &StringViewArray,
options: ByteViewBuildOptions,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
let compressor = Self::train_compressor(array.iter());
(
compressor.clone(),
Self::from_view_array_inner(array, compressor, options),
)
}
pub(crate) fn train_from_binary_view_inner(
array: &BinaryViewArray,
options: ByteViewBuildOptions,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
let compressor = Self::train_compressor_bytes(array.iter());
(
compressor.clone(),
Self::from_view_array_inner(array, compressor, options),
)
}
pub(crate) fn train_from_arrow_inner<T: ByteArrayType>(
array: &GenericByteArray<T>,
options: ByteViewBuildOptions,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
let value_type = dict.as_ref().values().data_type();
let compressor = if value_type == &DataType::Utf8 {
Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
} else {
Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
};
(
compressor.clone(),
Self::from_dict_array_inner(dict, compressor, options),
)
}
pub(crate) fn train_from_arrow_dict_inner(
array: &DictionaryArray<UInt16Type>,
options: ByteViewBuildOptions,
) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
if array.values().data_type() == &DataType::Utf8 {
let values = array.values().as_string::<i32>();
let compressor = Self::train_compressor(values.iter());
(
compressor.clone(),
Self::from_dict_array_inner(
CheckedDictionaryArray::new_checked(array),
compressor,
options,
),
)
} else if array.values().data_type() == &DataType::Binary {
let values = array.values().as_binary::<i32>();
let compressor = Self::train_compressor_bytes(values.iter());
(
compressor.clone(),
Self::from_dict_array_inner(
CheckedDictionaryArray::new_checked(array),
compressor,
options,
),
)
} else {
panic!("Unsupported dictionary type: {:?}", array.data_type())
}
}
pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
array: ArrayIter<T>,
) -> Arc<Compressor> {
Arc::new(train_compressor(
array.filter_map(|s| s.as_ref().map(|s| s.as_bytes())),
))
}
pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
array: ArrayIter<T>,
) -> Arc<Compressor> {
Arc::new(train_compressor(
array.filter_map(|s| s.as_ref().map(|s| *s)),
))
}
pub(crate) fn from_view_array_inner<T>(
array: &T,
compressor: Arc<Compressor>,
options: ByteViewBuildOptions,
) -> LiquidByteViewArray<FsstArray>
where
T: Array + 'static,
{
let dict = if let Some(string_view) = array.as_any().downcast_ref::<StringViewArray>() {
CheckedDictionaryArray::from_string_view_array(string_view)
} else if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
CheckedDictionaryArray::from_binary_view_array(binary_view)
} else {
panic!("Unsupported view array type")
};
Self::from_dict_array_inner(dict, compressor, options)
}
pub(crate) fn from_byte_array_inner<T: ByteArrayType>(
array: &GenericByteArray<T>,
compressor: Arc<Compressor>,
options: ByteViewBuildOptions,
) -> LiquidByteViewArray<FsstArray> {
let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
Self::from_dict_array_inner(dict, compressor, options)
}
fn from_dict_array_inner(
dict: CheckedDictionaryArray,
compressor: Arc<Compressor>,
options: ByteViewBuildOptions,
) -> LiquidByteViewArray<FsstArray> {
let (keys, values) = dict.as_ref().clone().into_parts();
let arrow_type = options.arrow_type;
let shared_prefix = if values.is_empty() {
Vec::new()
} else {
let first_value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
string_values.value(0).as_bytes()
} else if let Some(binary_values) = values.as_binary_opt::<i32>() {
binary_values.value(0)
} else {
panic!("Unsupported dictionary value type")
};
let mut shared_prefix = first_value_bytes.to_vec();
for i in 1..values.len() {
let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
string_values.value(i).as_bytes()
} else if let Some(binary_values) = values.as_binary_opt::<i32>() {
binary_values.value(i)
} else {
panic!("Unsupported dictionary value type")
};
let common_len = shared_prefix
.iter()
.zip(value_bytes.iter())
.take_while(|(a, b)| a == b)
.count();
shared_prefix.truncate(common_len);
if shared_prefix.is_empty() {
break;
}
}
shared_prefix
};
let shared_prefix_len = shared_prefix.len();
let mut prefix_keys = Vec::with_capacity(values.len());
let mut fingerprints = options
.build_fingerprints
.then(|| Vec::with_capacity(values.len()));
let mut compress_buffer = Vec::with_capacity(1024 * 1024 * 2);
let (raw_fsst_buffer, byte_offsets) =
if let Some(string_values) = values.as_string_opt::<i32>() {
RawFsstBuffer::from_byte_slices(
string_values.iter().map(|s| s.map(|s| s.as_bytes())),
compressor.clone(),
&mut compress_buffer,
)
} else if let Some(binary_values) = values.as_binary_opt::<i32>() {
RawFsstBuffer::from_byte_slices(
binary_values.iter(),
compressor.clone(),
&mut compress_buffer,
)
} else {
panic!("Unsupported dictionary value type")
};
for i in 0..values.len() {
let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
string_values.value(i).as_bytes()
} else if let Some(binary_values) = values.as_binary_opt::<i32>() {
binary_values.value(i)
} else {
panic!("Unsupported dictionary value type")
};
let remaining_bytes = if shared_prefix_len < value_bytes.len() {
&value_bytes[shared_prefix_len..]
} else {
&[]
};
prefix_keys.push(PrefixKey::new(remaining_bytes));
if let Some(ref mut fingerprints) = fingerprints {
fingerprints.push(StringFingerprint::from_bytes(value_bytes).bits());
}
}
assert_eq!(values.len(), byte_offsets.len() - 1);
let prefix_keys: Arc<[PrefixKey]> = prefix_keys.into();
let mut array = LiquidByteViewArray::from_parts(
keys,
prefix_keys,
FsstArray::from_byte_offsets(Arc::new(raw_fsst_buffer), &byte_offsets, compressor),
arrow_type,
shared_prefix,
);
if let Some(fingerprints) = fingerprints {
array.string_fingerprints = Some(Arc::from(fingerprints.into_boxed_slice()));
}
array
}
pub(super) fn from_parts(
dictionary_keys: UInt16Array,
prefix_keys: Arc<[PrefixKey]>,
fsst_buffer: B,
original_arrow_type: ArrowByteType,
shared_prefix: Vec<u8>,
) -> Self {
Self {
dictionary_keys,
prefix_keys,
fsst_buffer,
original_arrow_type,
shared_prefix,
string_fingerprints: None,
}
}
}