liquid_cache/utils/
mod.rs1use std::num::NonZero;
4
5use arrow::{
6 array::{
7 ArrayAccessor, ArrayIter, BinaryViewArray, DictionaryArray, GenericByteArray,
8 GenericByteDictionaryBuilder, PrimitiveArray, PrimitiveDictionaryBuilder, StringViewArray,
9 },
10 datatypes::{BinaryType, ByteArrayType, DecimalType, UInt16Type, Utf8Type},
11};
12use arrow_schema::DataType;
13pub(crate) mod byte_cache;
14mod variant_schema;
15mod variant_utils;
16
17pub use variant_schema::VariantSchema;
18pub use variant_utils::typed_struct_contains_path;
19
20pub(crate) fn get_bit_width(max_value: u64) -> NonZero<u8> {
24 if max_value == 0 {
25 NonZero::new(1).unwrap()
28 } else {
29 NonZero::new(64 - max_value.leading_zeros() as u8).unwrap()
30 }
31}
32
33pub(crate) struct CheckedDictionaryArray {
38 val: DictionaryArray<UInt16Type>,
39}
40
41impl CheckedDictionaryArray {
42 pub fn new_checked(array: &DictionaryArray<UInt16Type>) -> Self {
43 gc_dictionary_array(array)
44 }
45
46 pub fn from_byte_array<T: ByteArrayType>(array: &GenericByteArray<T>) -> Self {
47 let iter = array.iter();
48 byte_array_to_dict_array::<T, _>(iter)
49 }
50
51 pub fn from_string_view_array(array: &StringViewArray) -> Self {
52 let iter = array.iter();
53 byte_array_to_dict_array::<Utf8Type, _>(iter)
54 }
55
56 pub fn from_binary_view_array(array: &BinaryViewArray) -> Self {
57 let iter = array.iter();
58 byte_array_to_dict_array::<BinaryType, _>(iter)
59 }
60
61 pub fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self {
62 decimal_array_to_dict_array(array)
63 }
64
65 pub unsafe fn new_unchecked_i_know_what_i_am_doing(
68 array: &DictionaryArray<UInt16Type>,
69 ) -> Self {
70 #[cfg(debug_assertions)]
71 {
72 let gc_ed = gc_dictionary_array(array).val;
73 assert_eq!(
74 gc_ed.values().len(),
75 array.values().len(),
76 "the input dictionary values are not unique"
77 );
78 }
79 Self { val: array.clone() }
80 }
81
82 pub fn into_inner(self) -> DictionaryArray<UInt16Type> {
83 self.val
84 }
85
86 pub fn as_ref(&self) -> &DictionaryArray<UInt16Type> {
87 &self.val
88 }
89
90 pub fn bit_width_for_key(&self) -> NonZero<u8> {
91 let distinct_count = self.as_ref().values().len();
92 get_bit_width(distinct_count as u64)
93 }
94}
95
96fn gc_dictionary_array(array: &DictionaryArray<UInt16Type>) -> CheckedDictionaryArray {
97 let value_type = array.values().data_type();
98 if let DataType::Binary = value_type {
99 let typed = array
100 .downcast_dict::<GenericByteArray<BinaryType>>()
101 .unwrap();
102 let iter = typed.into_iter();
103 byte_array_to_dict_array::<BinaryType, _>(iter)
104 } else if let DataType::Utf8 = value_type {
105 let typed = array.downcast_dict::<GenericByteArray<Utf8Type>>().unwrap();
106 let iter = typed.into_iter();
107 byte_array_to_dict_array::<Utf8Type, _>(iter)
108 } else {
109 unreachable!("Unsupported dictionary type: {:?}", value_type);
110 }
111}
112
113fn decimal_array_to_dict_array<T: DecimalType>(
114 array: &PrimitiveArray<T>,
115) -> CheckedDictionaryArray {
116 let iter = array.iter();
117 let mut builder =
118 PrimitiveDictionaryBuilder::<UInt16Type, T>::with_capacity(array.len(), array.len());
119 for s in iter {
120 builder.append_option(s);
121 }
122 let dict = builder.finish();
123 CheckedDictionaryArray { val: dict }
124}
125
126fn byte_array_to_dict_array<'a, T: ByteArrayType, I: ArrayAccessor<Item = &'a T::Native>>(
127 input: ArrayIter<I>,
128) -> CheckedDictionaryArray {
129 let mut builder = GenericByteDictionaryBuilder::<UInt16Type, T>::with_capacity(
130 input.size_hint().0,
131 input.size_hint().0,
132 input.size_hint().0,
133 );
134 for s in input {
135 builder.append_option(s);
136 }
137 let dict = builder.finish();
138 CheckedDictionaryArray { val: dict }
139}
140
141pub(crate) fn yield_now_if_shuttle() {
142 #[cfg(all(feature = "shuttle", test))]
143 shuttle::thread::yield_now();
144}
145
146#[cfg(all(feature = "shuttle", test))]
147pub(crate) fn shuttle_test(test: impl Fn() + Send + Sync + 'static) {
148 _ = tracing_subscriber::fmt()
149 .with_ansi(true)
150 .with_thread_names(false)
151 .with_target(false)
152 .try_init();
153
154 let mut runner = shuttle::PortfolioRunner::new(true, Default::default());
155
156 let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
157
158 for _i in 0..available_cores {
159 runner.add(shuttle::scheduler::PctScheduler::new(10, 1_000));
160 }
161 runner.run(test);
162}
163
164#[allow(unused)]
165#[cfg(all(feature = "shuttle", test))]
166pub(crate) fn shuttle_replay(test: impl Fn() + Send + Sync + 'static, schedule: &str) {
167 _ = tracing_subscriber::fmt()
168 .with_ansi(true)
169 .with_thread_names(false)
170 .with_target(false)
171 .try_init();
172 shuttle::replay(test, schedule);
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use arrow::array::{BinaryArray, DictionaryArray};
179 use std::sync::Arc;
180
181 fn create_test_dictionary(values: Vec<&[u8]>) -> DictionaryArray<UInt16Type> {
182 let binary_array = BinaryArray::from_iter_values(values);
183 DictionaryArray::new(vec![0u16, 1, 2, 3].into(), Arc::new(binary_array))
184 }
185
186 #[test]
187 fn test_gc_behavior() {
188 let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
190 let checked = CheckedDictionaryArray::new_checked(&dup_dict);
191 let dict_values = checked.as_ref().values();
192 assert_eq!(dict_values.len(), 2);
193 assert_eq!(
194 dict_values
195 .as_any()
196 .downcast_ref::<BinaryArray>()
197 .unwrap()
198 .value(0),
199 b"a"
200 );
201 assert_eq!(
202 dict_values
203 .as_any()
204 .downcast_ref::<BinaryArray>()
205 .unwrap()
206 .value(1),
207 b"b"
208 );
209
210 let unique_dict = create_test_dictionary(vec![b"a", b"b", b"c", b"d"]);
212 let checked_unique = CheckedDictionaryArray::new_checked(&unique_dict);
213 assert_eq!(checked_unique.as_ref().values().len(), 4);
214 }
215
216 #[test]
217 #[cfg(debug_assertions)]
218 #[should_panic(expected = "the input dictionary values are not unique")]
219 fn test_unchecked_duplicates_panic() {
220 let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
221 unsafe {
222 CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(&dup_dict);
223 }
224 }
225}