Skip to main content

liquid_cache/utils/
mod.rs

1//! Utility functions for the storage module.
2
3use std::num::NonZero;
4
5use arrow::{
6    array::{
7        ArrayAccessor, ArrayIter, BinaryViewArray, DictionaryArray, GenericByteArray,
8        GenericByteDictionaryBuilder, PrimitiveArray, PrimitiveDictionaryBuilder, StringViewArray,
9    },
10    datatypes::{BinaryType, ByteArrayType, DecimalType, UInt16Type, Utf8Type},
11};
12use arrow_schema::DataType;
13pub(crate) mod byte_cache;
14mod variant_schema;
15mod variant_utils;
16
17pub use variant_schema::VariantSchema;
18pub use variant_utils::typed_struct_contains_path;
19
20/// Get the bit width for a given max value.
21/// Returns 1 if the max value is 0.
22/// Returns 64 - max_value.leading_zeros() as u8 otherwise.
23pub(crate) fn get_bit_width(max_value: u64) -> NonZero<u8> {
24    if max_value == 0 {
25        // todo: here we actually should return 0, as we should just use constant encoding.
26        // but that's not implemented yet.
27        NonZero::new(1).unwrap()
28    } else {
29        NonZero::new(64 - max_value.leading_zeros() as u8).unwrap()
30    }
31}
32
33/// A wrapper around `DictionaryArray<UInt16Type>` that ensures the values are unique.
34/// This is because we leverage the fact that the values are unique in the dictionary to short cut the
35/// comparison process, i.e., return the index on first match.
36/// If the values are not unique, we are screwed.
37pub(crate) struct CheckedDictionaryArray {
38    val: DictionaryArray<UInt16Type>,
39}
40
41impl CheckedDictionaryArray {
42    pub fn new_checked(array: &DictionaryArray<UInt16Type>) -> Self {
43        gc_dictionary_array(array)
44    }
45
46    pub fn from_byte_array<T: ByteArrayType>(array: &GenericByteArray<T>) -> Self {
47        let iter = array.iter();
48        byte_array_to_dict_array::<T, _>(iter)
49    }
50
51    pub fn from_string_view_array(array: &StringViewArray) -> Self {
52        let iter = array.iter();
53        byte_array_to_dict_array::<Utf8Type, _>(iter)
54    }
55
56    pub fn from_binary_view_array(array: &BinaryViewArray) -> Self {
57        let iter = array.iter();
58        byte_array_to_dict_array::<BinaryType, _>(iter)
59    }
60
61    pub fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self {
62        decimal_array_to_dict_array(array)
63    }
64
65    /// # Safety
66    /// The caller must ensure that the values in the dictionary are unique.
67    pub unsafe fn new_unchecked_i_know_what_i_am_doing(
68        array: &DictionaryArray<UInt16Type>,
69    ) -> Self {
70        #[cfg(debug_assertions)]
71        {
72            let gc_ed = gc_dictionary_array(array).val;
73            assert_eq!(
74                gc_ed.values().len(),
75                array.values().len(),
76                "the input dictionary values are not unique"
77            );
78        }
79        Self { val: array.clone() }
80    }
81
82    pub fn into_inner(self) -> DictionaryArray<UInt16Type> {
83        self.val
84    }
85
86    pub fn as_ref(&self) -> &DictionaryArray<UInt16Type> {
87        &self.val
88    }
89
90    pub fn bit_width_for_key(&self) -> NonZero<u8> {
91        let distinct_count = self.as_ref().values().len();
92        get_bit_width(distinct_count as u64)
93    }
94}
95
96fn gc_dictionary_array(array: &DictionaryArray<UInt16Type>) -> CheckedDictionaryArray {
97    let value_type = array.values().data_type();
98    if let DataType::Binary = value_type {
99        let typed = array
100            .downcast_dict::<GenericByteArray<BinaryType>>()
101            .unwrap();
102        let iter = typed.into_iter();
103        byte_array_to_dict_array::<BinaryType, _>(iter)
104    } else if let DataType::Utf8 = value_type {
105        let typed = array.downcast_dict::<GenericByteArray<Utf8Type>>().unwrap();
106        let iter = typed.into_iter();
107        byte_array_to_dict_array::<Utf8Type, _>(iter)
108    } else {
109        unreachable!("Unsupported dictionary type: {:?}", value_type);
110    }
111}
112
113fn decimal_array_to_dict_array<T: DecimalType>(
114    array: &PrimitiveArray<T>,
115) -> CheckedDictionaryArray {
116    let iter = array.iter();
117    let mut builder =
118        PrimitiveDictionaryBuilder::<UInt16Type, T>::with_capacity(array.len(), array.len());
119    for s in iter {
120        builder.append_option(s);
121    }
122    let dict = builder.finish();
123    CheckedDictionaryArray { val: dict }
124}
125
126fn byte_array_to_dict_array<'a, T: ByteArrayType, I: ArrayAccessor<Item = &'a T::Native>>(
127    input: ArrayIter<I>,
128) -> CheckedDictionaryArray {
129    let mut builder = GenericByteDictionaryBuilder::<UInt16Type, T>::with_capacity(
130        input.size_hint().0,
131        input.size_hint().0,
132        input.size_hint().0,
133    );
134    for s in input {
135        builder.append_option(s);
136    }
137    let dict = builder.finish();
138    CheckedDictionaryArray { val: dict }
139}
140
141pub(crate) fn yield_now_if_shuttle() {
142    #[cfg(all(feature = "shuttle", test))]
143    shuttle::thread::yield_now();
144}
145
146#[cfg(all(feature = "shuttle", test))]
147pub(crate) fn shuttle_test(test: impl Fn() + Send + Sync + 'static) {
148    _ = tracing_subscriber::fmt()
149        .with_ansi(true)
150        .with_thread_names(false)
151        .with_target(false)
152        .try_init();
153
154    let mut runner = shuttle::PortfolioRunner::new(true, Default::default());
155
156    let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
157
158    for _i in 0..available_cores {
159        runner.add(shuttle::scheduler::PctScheduler::new(10, 1_000));
160    }
161    runner.run(test);
162}
163
164#[allow(unused)]
165#[cfg(all(feature = "shuttle", test))]
166pub(crate) fn shuttle_replay(test: impl Fn() + Send + Sync + 'static, schedule: &str) {
167    _ = tracing_subscriber::fmt()
168        .with_ansi(true)
169        .with_thread_names(false)
170        .with_target(false)
171        .try_init();
172    shuttle::replay(test, schedule);
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use arrow::array::{BinaryArray, DictionaryArray};
179    use std::sync::Arc;
180
181    fn create_test_dictionary(values: Vec<&[u8]>) -> DictionaryArray<UInt16Type> {
182        let binary_array = BinaryArray::from_iter_values(values);
183        DictionaryArray::new(vec![0u16, 1, 2, 3].into(), Arc::new(binary_array))
184    }
185
186    #[test]
187    fn test_gc_behavior() {
188        // Test duplicate removal
189        let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
190        let checked = CheckedDictionaryArray::new_checked(&dup_dict);
191        let dict_values = checked.as_ref().values();
192        assert_eq!(dict_values.len(), 2);
193        assert_eq!(
194            dict_values
195                .as_any()
196                .downcast_ref::<BinaryArray>()
197                .unwrap()
198                .value(0),
199            b"a"
200        );
201        assert_eq!(
202            dict_values
203                .as_any()
204                .downcast_ref::<BinaryArray>()
205                .unwrap()
206                .value(1),
207            b"b"
208        );
209
210        // Test already unique values
211        let unique_dict = create_test_dictionary(vec![b"a", b"b", b"c", b"d"]);
212        let checked_unique = CheckedDictionaryArray::new_checked(&unique_dict);
213        assert_eq!(checked_unique.as_ref().values().len(), 4);
214    }
215
216    #[test]
217    #[cfg(debug_assertions)]
218    #[should_panic(expected = "the input dictionary values are not unique")]
219    fn test_unchecked_duplicates_panic() {
220        let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
221        unsafe {
222            CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(&dup_dict);
223        }
224    }
225}