Skip to main content

liquid_cache/utils/
mod.rs

1//! Utility functions for the storage module.
2
3use std::num::NonZero;
4
5use arrow::{
6    array::{
7        ArrayAccessor, ArrayIter, BinaryViewArray, DictionaryArray, GenericByteArray,
8        GenericByteDictionaryBuilder, PrimitiveArray, PrimitiveDictionaryBuilder, StringViewArray,
9    },
10    datatypes::{BinaryType, ByteArrayType, DecimalType, UInt16Type, Utf8Type},
11};
12use arrow_schema::DataType;
13use datafusion_common::ScalarValue;
14pub(crate) mod byte_cache;
15mod variant_schema;
16mod variant_utils;
17
18pub use variant_schema::VariantSchema;
19pub use variant_utils::typed_struct_contains_path;
20
21/// Get the bit width for a given max value.
22/// Returns 1 if the max value is 0.
23/// Returns 64 - max_value.leading_zeros() as u8 otherwise.
24pub(crate) fn get_bit_width(max_value: u64) -> NonZero<u8> {
25    if max_value == 0 {
26        // todo: here we actually should return 0, as we should just use constant encoding.
27        // but that's not implemented yet.
28        NonZero::new(1).unwrap()
29    } else {
30        NonZero::new(64 - max_value.leading_zeros() as u8).unwrap()
31    }
32}
33
34pub(crate) fn get_bytes_needle(value: &ScalarValue) -> Option<Vec<u8>> {
35    match value {
36        ScalarValue::Utf8(Some(v)) => Some(v.as_bytes().to_vec()),
37        ScalarValue::Utf8View(Some(v)) => Some(v.as_bytes().to_vec()),
38        ScalarValue::LargeUtf8(Some(v)) => Some(v.as_bytes().to_vec()),
39        ScalarValue::Binary(Some(v)) => Some(v.clone()),
40        ScalarValue::BinaryView(Some(v)) => Some(v.clone()),
41        ScalarValue::FixedSizeBinary(_, Some(v)) => Some(v.clone()),
42        ScalarValue::LargeBinary(Some(v)) => Some(v.clone()),
43        ScalarValue::Dictionary(_, value) => get_bytes_needle(value.as_ref()),
44        _ => None,
45    }
46}
47
48/// A wrapper around `DictionaryArray<UInt16Type>` that ensures the values are unique.
49/// This is because we leverage the fact that the values are unique in the dictionary to short cut the
50/// comparison process, i.e., return the index on first match.
51/// If the values are not unique, we are screwed.
52pub(crate) struct CheckedDictionaryArray {
53    val: DictionaryArray<UInt16Type>,
54}
55
56impl CheckedDictionaryArray {
57    pub fn new_checked(array: &DictionaryArray<UInt16Type>) -> Self {
58        gc_dictionary_array(array)
59    }
60
61    pub fn from_byte_array<T: ByteArrayType>(array: &GenericByteArray<T>) -> Self {
62        let iter = array.iter();
63        byte_array_to_dict_array::<T, _>(iter)
64    }
65
66    pub fn from_string_view_array(array: &StringViewArray) -> Self {
67        let iter = array.iter();
68        byte_array_to_dict_array::<Utf8Type, _>(iter)
69    }
70
71    pub fn from_binary_view_array(array: &BinaryViewArray) -> Self {
72        let iter = array.iter();
73        byte_array_to_dict_array::<BinaryType, _>(iter)
74    }
75
76    pub fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self {
77        decimal_array_to_dict_array(array)
78    }
79
80    /// # Safety
81    /// The caller must ensure that the values in the dictionary are unique.
82    pub unsafe fn new_unchecked_i_know_what_i_am_doing(
83        array: &DictionaryArray<UInt16Type>,
84    ) -> Self {
85        #[cfg(debug_assertions)]
86        {
87            let gc_ed = gc_dictionary_array(array).val;
88            assert_eq!(
89                gc_ed.values().len(),
90                array.values().len(),
91                "the input dictionary values are not unique"
92            );
93        }
94        Self { val: array.clone() }
95    }
96
97    pub fn into_inner(self) -> DictionaryArray<UInt16Type> {
98        self.val
99    }
100
101    pub fn as_ref(&self) -> &DictionaryArray<UInt16Type> {
102        &self.val
103    }
104
105    pub fn bit_width_for_key(&self) -> NonZero<u8> {
106        let distinct_count = self.as_ref().values().len();
107        get_bit_width(distinct_count as u64)
108    }
109}
110
111fn gc_dictionary_array(array: &DictionaryArray<UInt16Type>) -> CheckedDictionaryArray {
112    let value_type = array.values().data_type();
113    if let DataType::Binary = value_type {
114        let typed = array
115            .downcast_dict::<GenericByteArray<BinaryType>>()
116            .unwrap();
117        let iter = typed.into_iter();
118        byte_array_to_dict_array::<BinaryType, _>(iter)
119    } else if let DataType::Utf8 = value_type {
120        let typed = array.downcast_dict::<GenericByteArray<Utf8Type>>().unwrap();
121        let iter = typed.into_iter();
122        byte_array_to_dict_array::<Utf8Type, _>(iter)
123    } else {
124        unreachable!("Unsupported dictionary type: {:?}", value_type);
125    }
126}
127
128fn decimal_array_to_dict_array<T: DecimalType>(
129    array: &PrimitiveArray<T>,
130) -> CheckedDictionaryArray {
131    let iter = array.iter();
132    let mut builder =
133        PrimitiveDictionaryBuilder::<UInt16Type, T>::with_capacity(array.len(), array.len());
134    for s in iter {
135        builder.append_option(s);
136    }
137    let dict = builder.finish();
138    CheckedDictionaryArray { val: dict }
139}
140
141fn byte_array_to_dict_array<'a, T: ByteArrayType, I: ArrayAccessor<Item = &'a T::Native>>(
142    input: ArrayIter<I>,
143) -> CheckedDictionaryArray {
144    let mut builder = GenericByteDictionaryBuilder::<UInt16Type, T>::with_capacity(
145        input.size_hint().0,
146        input.size_hint().0,
147        input.size_hint().0,
148    );
149    for s in input {
150        builder.append_option(s);
151    }
152    let dict = builder.finish();
153    CheckedDictionaryArray { val: dict }
154}
155
156pub(crate) fn yield_now_if_shuttle() {
157    #[cfg(all(feature = "shuttle", test))]
158    shuttle::thread::yield_now();
159}
160
161#[cfg(all(feature = "shuttle", test))]
162pub(crate) fn shuttle_test(test: impl Fn() + Send + Sync + 'static) {
163    _ = tracing_subscriber::fmt()
164        .with_ansi(true)
165        .with_thread_names(false)
166        .with_target(false)
167        .try_init();
168
169    let mut runner = shuttle::PortfolioRunner::new(true, Default::default());
170
171    let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
172
173    for _i in 0..available_cores {
174        runner.add(shuttle::scheduler::PctScheduler::new(10, 1_000));
175    }
176    runner.run(test);
177}
178
179#[allow(unused)]
180#[cfg(all(feature = "shuttle", test))]
181pub(crate) fn shuttle_replay(test: impl Fn() + Send + Sync + 'static, schedule: &str) {
182    _ = tracing_subscriber::fmt()
183        .with_ansi(true)
184        .with_thread_names(false)
185        .with_target(false)
186        .try_init();
187    shuttle::replay(test, schedule);
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193    use arrow::array::{BinaryArray, DictionaryArray};
194    use std::sync::Arc;
195
196    fn create_test_dictionary(values: Vec<&[u8]>) -> DictionaryArray<UInt16Type> {
197        let binary_array = BinaryArray::from_iter_values(values);
198        DictionaryArray::new(vec![0u16, 1, 2, 3].into(), Arc::new(binary_array))
199    }
200
201    #[test]
202    fn test_gc_behavior() {
203        // Test duplicate removal
204        let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
205        let checked = CheckedDictionaryArray::new_checked(&dup_dict);
206        let dict_values = checked.as_ref().values();
207        assert_eq!(dict_values.len(), 2);
208        assert_eq!(
209            dict_values
210                .as_any()
211                .downcast_ref::<BinaryArray>()
212                .unwrap()
213                .value(0),
214            b"a"
215        );
216        assert_eq!(
217            dict_values
218                .as_any()
219                .downcast_ref::<BinaryArray>()
220                .unwrap()
221                .value(1),
222            b"b"
223        );
224
225        // Test already unique values
226        let unique_dict = create_test_dictionary(vec![b"a", b"b", b"c", b"d"]);
227        let checked_unique = CheckedDictionaryArray::new_checked(&unique_dict);
228        assert_eq!(checked_unique.as_ref().values().len(), 4);
229    }
230
231    #[test]
232    #[cfg(debug_assertions)]
233    #[should_panic(expected = "the input dictionary values are not unique")]
234    fn test_unchecked_duplicates_panic() {
235        let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
236        unsafe {
237            CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(&dup_dict);
238        }
239    }
240}