liquid_cache/liquid_array/byte_view_array/
conversions.rs1use arrow::array::{
2 Array, ArrayAccessor, ArrayIter, BinaryArray, BinaryViewArray, DictionaryArray,
3 GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray, types::UInt16Type,
4};
5use arrow::datatypes::ByteArrayType;
6use arrow_schema::DataType;
7use fsst::Compressor;
8use std::sync::Arc;
9
10use super::{ArrowByteType, ByteViewBuildOptions, LiquidByteViewArray};
11use crate::liquid_array::byte_view_array::fingerprint::StringFingerprint;
12use crate::liquid_array::raw::fsst_buffer::{
13 FsstArray, FsstBacking, PrefixKey, RawFsstBuffer, train_compressor,
14};
15use crate::utils::CheckedDictionaryArray;
16
17impl<B: FsstBacking> LiquidByteViewArray<B> {
18 pub fn from_string_view_array(
20 array: &StringViewArray,
21 compressor: Arc<Compressor>,
22 ) -> LiquidByteViewArray<FsstArray> {
23 Self::from_view_array_inner(
24 array,
25 compressor,
26 ByteViewBuildOptions::new(ArrowByteType::Utf8View),
27 )
28 }
29
30 pub fn from_binary_view_array(
32 array: &BinaryViewArray,
33 compressor: Arc<Compressor>,
34 ) -> LiquidByteViewArray<FsstArray> {
35 Self::from_view_array_inner(
36 array,
37 compressor,
38 ByteViewBuildOptions::new(ArrowByteType::BinaryView),
39 )
40 }
41
42 pub fn from_string_array(
44 array: &StringArray,
45 compressor: Arc<Compressor>,
46 ) -> LiquidByteViewArray<FsstArray> {
47 Self::from_byte_array_inner(
48 array,
49 compressor,
50 ByteViewBuildOptions::new(ArrowByteType::Utf8),
51 )
52 }
53
54 pub fn from_binary_array(
56 array: &BinaryArray,
57 compressor: Arc<Compressor>,
58 ) -> LiquidByteViewArray<FsstArray> {
59 Self::from_byte_array_inner(
60 array,
61 compressor,
62 ByteViewBuildOptions::new(ArrowByteType::Binary),
63 )
64 }
65
66 pub fn train_from_string_view(
68 array: &StringViewArray,
69 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
70 Self::train_from_string_view_inner(
71 array,
72 ByteViewBuildOptions::new(ArrowByteType::Utf8View),
73 )
74 }
75
76 pub fn train_from_binary_view(
78 array: &BinaryViewArray,
79 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
80 Self::train_from_binary_view_inner(
81 array,
82 ByteViewBuildOptions::new(ArrowByteType::BinaryView),
83 )
84 }
85
86 pub fn train_from_arrow<T: ByteArrayType>(
88 array: &GenericByteArray<T>,
89 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
90 Self::train_from_arrow_inner(
91 array,
92 ByteViewBuildOptions::new(ArrowByteType::from_arrow_type(&T::DATA_TYPE)),
93 )
94 }
95
96 pub unsafe fn from_unique_dict_array(
102 array: &DictionaryArray<UInt16Type>,
103 compressor: Arc<Compressor>,
104 ) -> LiquidByteViewArray<FsstArray> {
105 let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
106 Self::from_dict_array_inner(
107 unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
108 compressor,
109 ByteViewBuildOptions::new(arrow_type),
110 )
111 }
112
113 pub(crate) unsafe fn from_unique_dict_array_with_options(
114 array: &DictionaryArray<UInt16Type>,
115 compressor: Arc<Compressor>,
116 options: ByteViewBuildOptions,
117 ) -> LiquidByteViewArray<FsstArray> {
118 Self::from_dict_array_inner(
119 unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
120 compressor,
121 options,
122 )
123 }
124
125 pub fn train_from_arrow_dict(
127 array: &DictionaryArray<UInt16Type>,
128 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
129 let options = if array.values().data_type() == &DataType::Utf8 {
130 ByteViewBuildOptions::new(ArrowByteType::Dict16Utf8)
131 } else if array.values().data_type() == &DataType::Binary {
132 ByteViewBuildOptions::new(ArrowByteType::Dict16Binary)
133 } else {
134 panic!("Unsupported dictionary type: {:?}", array.data_type())
135 };
136 Self::train_from_arrow_dict_inner(array, options)
137 }
138
139 pub(crate) fn train_from_string_view_inner(
140 array: &StringViewArray,
141 options: ByteViewBuildOptions,
142 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
143 let compressor = Self::train_compressor(array.iter());
144 (
145 compressor.clone(),
146 Self::from_view_array_inner(array, compressor, options),
147 )
148 }
149
150 pub(crate) fn train_from_binary_view_inner(
151 array: &BinaryViewArray,
152 options: ByteViewBuildOptions,
153 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
154 let compressor = Self::train_compressor_bytes(array.iter());
155 (
156 compressor.clone(),
157 Self::from_view_array_inner(array, compressor, options),
158 )
159 }
160
161 pub(crate) fn train_from_arrow_inner<T: ByteArrayType>(
162 array: &GenericByteArray<T>,
163 options: ByteViewBuildOptions,
164 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
165 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
166 let value_type = dict.as_ref().values().data_type();
167
168 let compressor = if value_type == &DataType::Utf8 {
169 Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
170 } else {
171 Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
172 };
173 (
174 compressor.clone(),
175 Self::from_dict_array_inner(dict, compressor, options),
176 )
177 }
178
179 pub(crate) fn train_from_arrow_dict_inner(
180 array: &DictionaryArray<UInt16Type>,
181 options: ByteViewBuildOptions,
182 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
183 if array.values().data_type() == &DataType::Utf8 {
184 let values = array.values().as_string::<i32>();
185
186 let compressor = Self::train_compressor(values.iter());
187 (
188 compressor.clone(),
189 Self::from_dict_array_inner(
190 CheckedDictionaryArray::new_checked(array),
191 compressor,
192 options,
193 ),
194 )
195 } else if array.values().data_type() == &DataType::Binary {
196 let values = array.values().as_binary::<i32>();
197 let compressor = Self::train_compressor_bytes(values.iter());
198 (
199 compressor.clone(),
200 Self::from_dict_array_inner(
201 CheckedDictionaryArray::new_checked(array),
202 compressor,
203 options,
204 ),
205 )
206 } else {
207 panic!("Unsupported dictionary type: {:?}", array.data_type())
208 }
209 }
210
211 pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
213 array: ArrayIter<T>,
214 ) -> Arc<Compressor> {
215 Arc::new(train_compressor(
216 array.filter_map(|s| s.as_ref().map(|s| s.as_bytes())),
217 ))
218 }
219
220 pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
222 array: ArrayIter<T>,
223 ) -> Arc<Compressor> {
224 Arc::new(train_compressor(
225 array.filter_map(|s| s.as_ref().map(|s| *s)),
226 ))
227 }
228
229 pub(crate) fn from_view_array_inner<T>(
231 array: &T,
232 compressor: Arc<Compressor>,
233 options: ByteViewBuildOptions,
234 ) -> LiquidByteViewArray<FsstArray>
235 where
236 T: Array + 'static,
237 {
238 let dict = if let Some(string_view) = array.as_any().downcast_ref::<StringViewArray>() {
240 CheckedDictionaryArray::from_string_view_array(string_view)
241 } else if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
242 CheckedDictionaryArray::from_binary_view_array(binary_view)
243 } else {
244 panic!("Unsupported view array type")
245 };
246
247 Self::from_dict_array_inner(dict, compressor, options)
248 }
249
250 pub(crate) fn from_byte_array_inner<T: ByteArrayType>(
251 array: &GenericByteArray<T>,
252 compressor: Arc<Compressor>,
253 options: ByteViewBuildOptions,
254 ) -> LiquidByteViewArray<FsstArray> {
255 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
256 Self::from_dict_array_inner(dict, compressor, options)
257 }
258
259 fn from_dict_array_inner(
261 dict: CheckedDictionaryArray,
262 compressor: Arc<Compressor>,
263 options: ByteViewBuildOptions,
264 ) -> LiquidByteViewArray<FsstArray> {
265 let (keys, values) = dict.as_ref().clone().into_parts();
266 let arrow_type = options.arrow_type;
267
268 let shared_prefix = if values.is_empty() {
270 Vec::new()
271 } else {
272 let first_value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
274 string_values.value(0).as_bytes()
275 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
276 binary_values.value(0)
277 } else {
278 panic!("Unsupported dictionary value type")
279 };
280
281 let mut shared_prefix = first_value_bytes.to_vec();
282
283 for i in 1..values.len() {
285 let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
286 string_values.value(i).as_bytes()
287 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
288 binary_values.value(i)
289 } else {
290 panic!("Unsupported dictionary value type")
291 };
292
293 let common_len = shared_prefix
294 .iter()
295 .zip(value_bytes.iter())
296 .take_while(|(a, b)| a == b)
297 .count();
298 shared_prefix.truncate(common_len);
299
300 if shared_prefix.is_empty() {
302 break;
303 }
304 }
305
306 shared_prefix
307 };
308
309 let shared_prefix_len = shared_prefix.len();
310
311 let mut prefix_keys = Vec::with_capacity(values.len());
313 let mut fingerprints = options
314 .build_fingerprints
315 .then(|| Vec::with_capacity(values.len()));
316
317 let mut compress_buffer = Vec::with_capacity(1024 * 1024 * 2);
318
319 let (raw_fsst_buffer, byte_offsets) =
321 if let Some(string_values) = values.as_string_opt::<i32>() {
322 RawFsstBuffer::from_byte_slices(
323 string_values.iter().map(|s| s.map(|s| s.as_bytes())),
324 compressor.clone(),
325 &mut compress_buffer,
326 )
327 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
328 RawFsstBuffer::from_byte_slices(
329 binary_values.iter(),
330 compressor.clone(),
331 &mut compress_buffer,
332 )
333 } else {
334 panic!("Unsupported dictionary value type")
335 };
336
337 for i in 0..values.len() {
338 let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
339 string_values.value(i).as_bytes()
340 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
341 binary_values.value(i)
342 } else {
343 panic!("Unsupported dictionary value type")
344 };
345
346 let remaining_bytes = if shared_prefix_len < value_bytes.len() {
347 &value_bytes[shared_prefix_len..]
348 } else {
349 &[]
350 };
351
352 prefix_keys.push(PrefixKey::new(remaining_bytes));
353 if let Some(ref mut fingerprints) = fingerprints {
354 fingerprints.push(StringFingerprint::from_bytes(value_bytes).bits());
355 }
356 }
357
358 assert_eq!(values.len(), byte_offsets.len() - 1);
359
360 let prefix_keys: Arc<[PrefixKey]> = prefix_keys.into();
361
362 let mut array = LiquidByteViewArray::from_parts(
363 keys,
364 prefix_keys,
365 FsstArray::from_byte_offsets(Arc::new(raw_fsst_buffer), &byte_offsets, compressor),
366 arrow_type,
367 shared_prefix,
368 );
369 if let Some(fingerprints) = fingerprints {
370 array.string_fingerprints = Some(Arc::from(fingerprints.into_boxed_slice()));
371 }
372 array
373 }
374
375 pub(super) fn from_parts(
377 dictionary_keys: UInt16Array,
378 prefix_keys: Arc<[PrefixKey]>,
379 fsst_buffer: B,
380 original_arrow_type: ArrowByteType,
381 shared_prefix: Vec<u8>,
382 ) -> Self {
383 Self {
384 dictionary_keys,
385 prefix_keys,
386 fsst_buffer,
387 original_arrow_type,
388 shared_prefix,
389 string_fingerprints: None,
390 }
391 }
392}