liquid_cache/utils/
mod.rs1use std::num::NonZero;
4
5use arrow::{
6 array::{
7 ArrayAccessor, ArrayIter, BinaryViewArray, DictionaryArray, GenericByteArray,
8 GenericByteDictionaryBuilder, PrimitiveArray, PrimitiveDictionaryBuilder, StringViewArray,
9 },
10 datatypes::{BinaryType, ByteArrayType, DecimalType, UInt16Type, Utf8Type},
11};
12use arrow_schema::DataType;
13use datafusion_common::ScalarValue;
14pub(crate) mod byte_cache;
15mod variant_schema;
16mod variant_utils;
17
18pub use variant_schema::VariantSchema;
19pub use variant_utils::typed_struct_contains_path;
20
21pub(crate) fn get_bit_width(max_value: u64) -> NonZero<u8> {
25 if max_value == 0 {
26 NonZero::new(1).unwrap()
29 } else {
30 NonZero::new(64 - max_value.leading_zeros() as u8).unwrap()
31 }
32}
33
34pub(crate) fn get_bytes_needle(value: &ScalarValue) -> Option<Vec<u8>> {
35 match value {
36 ScalarValue::Utf8(Some(v)) => Some(v.as_bytes().to_vec()),
37 ScalarValue::Utf8View(Some(v)) => Some(v.as_bytes().to_vec()),
38 ScalarValue::LargeUtf8(Some(v)) => Some(v.as_bytes().to_vec()),
39 ScalarValue::Binary(Some(v)) => Some(v.clone()),
40 ScalarValue::BinaryView(Some(v)) => Some(v.clone()),
41 ScalarValue::FixedSizeBinary(_, Some(v)) => Some(v.clone()),
42 ScalarValue::LargeBinary(Some(v)) => Some(v.clone()),
43 ScalarValue::Dictionary(_, value) => get_bytes_needle(value.as_ref()),
44 _ => None,
45 }
46}
47
48pub(crate) struct CheckedDictionaryArray {
53 val: DictionaryArray<UInt16Type>,
54}
55
56impl CheckedDictionaryArray {
57 pub fn new_checked(array: &DictionaryArray<UInt16Type>) -> Self {
58 gc_dictionary_array(array)
59 }
60
61 pub fn from_byte_array<T: ByteArrayType>(array: &GenericByteArray<T>) -> Self {
62 let iter = array.iter();
63 byte_array_to_dict_array::<T, _>(iter)
64 }
65
66 pub fn from_string_view_array(array: &StringViewArray) -> Self {
67 let iter = array.iter();
68 byte_array_to_dict_array::<Utf8Type, _>(iter)
69 }
70
71 pub fn from_binary_view_array(array: &BinaryViewArray) -> Self {
72 let iter = array.iter();
73 byte_array_to_dict_array::<BinaryType, _>(iter)
74 }
75
76 pub fn from_decimal_array<T: DecimalType>(array: &PrimitiveArray<T>) -> Self {
77 decimal_array_to_dict_array(array)
78 }
79
80 pub unsafe fn new_unchecked_i_know_what_i_am_doing(
83 array: &DictionaryArray<UInt16Type>,
84 ) -> Self {
85 #[cfg(debug_assertions)]
86 {
87 let gc_ed = gc_dictionary_array(array).val;
88 assert_eq!(
89 gc_ed.values().len(),
90 array.values().len(),
91 "the input dictionary values are not unique"
92 );
93 }
94 Self { val: array.clone() }
95 }
96
97 pub fn into_inner(self) -> DictionaryArray<UInt16Type> {
98 self.val
99 }
100
101 pub fn as_ref(&self) -> &DictionaryArray<UInt16Type> {
102 &self.val
103 }
104
105 pub fn bit_width_for_key(&self) -> NonZero<u8> {
106 let distinct_count = self.as_ref().values().len();
107 get_bit_width(distinct_count as u64)
108 }
109}
110
111fn gc_dictionary_array(array: &DictionaryArray<UInt16Type>) -> CheckedDictionaryArray {
112 let value_type = array.values().data_type();
113 if let DataType::Binary = value_type {
114 let typed = array
115 .downcast_dict::<GenericByteArray<BinaryType>>()
116 .unwrap();
117 let iter = typed.into_iter();
118 byte_array_to_dict_array::<BinaryType, _>(iter)
119 } else if let DataType::Utf8 = value_type {
120 let typed = array.downcast_dict::<GenericByteArray<Utf8Type>>().unwrap();
121 let iter = typed.into_iter();
122 byte_array_to_dict_array::<Utf8Type, _>(iter)
123 } else {
124 unreachable!("Unsupported dictionary type: {:?}", value_type);
125 }
126}
127
128fn decimal_array_to_dict_array<T: DecimalType>(
129 array: &PrimitiveArray<T>,
130) -> CheckedDictionaryArray {
131 let iter = array.iter();
132 let mut builder =
133 PrimitiveDictionaryBuilder::<UInt16Type, T>::with_capacity(array.len(), array.len());
134 for s in iter {
135 builder.append_option(s);
136 }
137 let dict = builder.finish();
138 CheckedDictionaryArray { val: dict }
139}
140
141fn byte_array_to_dict_array<'a, T: ByteArrayType, I: ArrayAccessor<Item = &'a T::Native>>(
142 input: ArrayIter<I>,
143) -> CheckedDictionaryArray {
144 let mut builder = GenericByteDictionaryBuilder::<UInt16Type, T>::with_capacity(
145 input.size_hint().0,
146 input.size_hint().0,
147 input.size_hint().0,
148 );
149 for s in input {
150 builder.append_option(s);
151 }
152 let dict = builder.finish();
153 CheckedDictionaryArray { val: dict }
154}
155
156pub(crate) fn yield_now_if_shuttle() {
157 #[cfg(all(feature = "shuttle", test))]
158 shuttle::thread::yield_now();
159}
160
161#[cfg(all(feature = "shuttle", test))]
162pub(crate) fn shuttle_test(test: impl Fn() + Send + Sync + 'static) {
163 _ = tracing_subscriber::fmt()
164 .with_ansi(true)
165 .with_thread_names(false)
166 .with_target(false)
167 .try_init();
168
169 let mut runner = shuttle::PortfolioRunner::new(true, Default::default());
170
171 let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
172
173 for _i in 0..available_cores {
174 runner.add(shuttle::scheduler::PctScheduler::new(10, 1_000));
175 }
176 runner.run(test);
177}
178
179#[allow(unused)]
180#[cfg(all(feature = "shuttle", test))]
181pub(crate) fn shuttle_replay(test: impl Fn() + Send + Sync + 'static, schedule: &str) {
182 _ = tracing_subscriber::fmt()
183 .with_ansi(true)
184 .with_thread_names(false)
185 .with_target(false)
186 .try_init();
187 shuttle::replay(test, schedule);
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193 use arrow::array::{BinaryArray, DictionaryArray};
194 use std::sync::Arc;
195
196 fn create_test_dictionary(values: Vec<&[u8]>) -> DictionaryArray<UInt16Type> {
197 let binary_array = BinaryArray::from_iter_values(values);
198 DictionaryArray::new(vec![0u16, 1, 2, 3].into(), Arc::new(binary_array))
199 }
200
201 #[test]
202 fn test_gc_behavior() {
203 let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
205 let checked = CheckedDictionaryArray::new_checked(&dup_dict);
206 let dict_values = checked.as_ref().values();
207 assert_eq!(dict_values.len(), 2);
208 assert_eq!(
209 dict_values
210 .as_any()
211 .downcast_ref::<BinaryArray>()
212 .unwrap()
213 .value(0),
214 b"a"
215 );
216 assert_eq!(
217 dict_values
218 .as_any()
219 .downcast_ref::<BinaryArray>()
220 .unwrap()
221 .value(1),
222 b"b"
223 );
224
225 let unique_dict = create_test_dictionary(vec![b"a", b"b", b"c", b"d"]);
227 let checked_unique = CheckedDictionaryArray::new_checked(&unique_dict);
228 assert_eq!(checked_unique.as_ref().values().len(), 4);
229 }
230
231 #[test]
232 #[cfg(debug_assertions)]
233 #[should_panic(expected = "the input dictionary values are not unique")]
234 fn test_unchecked_duplicates_panic() {
235 let dup_dict = create_test_dictionary(vec![b"a", b"a", b"b", b"b"]);
236 unsafe {
237 CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(&dup_dict);
238 }
239 }
240}