liquid_cache/liquid_array/byte_view_array/
conversions.rs1use arrow::array::{
2 Array, ArrayAccessor, ArrayIter, BinaryArray, BinaryViewArray, DictionaryArray,
3 GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray, types::UInt16Type,
4};
5use arrow::datatypes::ByteArrayType;
6use arrow_schema::DataType;
7use fsst::Compressor;
8use std::sync::Arc;
9
10use super::{ByteViewBuildOptions, LiquidByteViewArray};
11use crate::liquid_array::byte_array::ArrowByteType;
12use crate::liquid_array::byte_view_array::fingerprint::StringFingerprint;
13use crate::liquid_array::raw::fsst_buffer::{
14 FsstArray, FsstBacking, PrefixKey, RawFsstBuffer, train_compressor,
15};
16use crate::utils::CheckedDictionaryArray;
17
18impl<B: FsstBacking> LiquidByteViewArray<B> {
19 pub fn from_string_view_array(
21 array: &StringViewArray,
22 compressor: Arc<Compressor>,
23 ) -> LiquidByteViewArray<FsstArray> {
24 Self::from_view_array_inner(
25 array,
26 compressor,
27 ByteViewBuildOptions::new(ArrowByteType::Utf8View),
28 )
29 }
30
31 pub fn from_binary_view_array(
33 array: &BinaryViewArray,
34 compressor: Arc<Compressor>,
35 ) -> LiquidByteViewArray<FsstArray> {
36 Self::from_view_array_inner(
37 array,
38 compressor,
39 ByteViewBuildOptions::new(ArrowByteType::BinaryView),
40 )
41 }
42
43 pub fn from_string_array(
45 array: &StringArray,
46 compressor: Arc<Compressor>,
47 ) -> LiquidByteViewArray<FsstArray> {
48 Self::from_byte_array_inner(
49 array,
50 compressor,
51 ByteViewBuildOptions::new(ArrowByteType::Utf8),
52 )
53 }
54
55 pub fn from_binary_array(
57 array: &BinaryArray,
58 compressor: Arc<Compressor>,
59 ) -> LiquidByteViewArray<FsstArray> {
60 Self::from_byte_array_inner(
61 array,
62 compressor,
63 ByteViewBuildOptions::new(ArrowByteType::Binary),
64 )
65 }
66
67 pub fn train_from_string_view(
69 array: &StringViewArray,
70 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
71 Self::train_from_string_view_inner(
72 array,
73 ByteViewBuildOptions::new(ArrowByteType::Utf8View),
74 )
75 }
76
77 pub fn train_from_binary_view(
79 array: &BinaryViewArray,
80 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
81 Self::train_from_binary_view_inner(
82 array,
83 ByteViewBuildOptions::new(ArrowByteType::BinaryView),
84 )
85 }
86
87 pub fn train_from_arrow<T: ByteArrayType>(
89 array: &GenericByteArray<T>,
90 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
91 Self::train_from_arrow_inner(
92 array,
93 ByteViewBuildOptions::new(ArrowByteType::from_arrow_type(&T::DATA_TYPE)),
94 )
95 }
96
97 pub unsafe fn from_unique_dict_array(
103 array: &DictionaryArray<UInt16Type>,
104 compressor: Arc<Compressor>,
105 ) -> LiquidByteViewArray<FsstArray> {
106 let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
107 Self::from_dict_array_inner(
108 unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
109 compressor,
110 ByteViewBuildOptions::new(arrow_type),
111 )
112 }
113
114 pub(crate) unsafe fn from_unique_dict_array_with_options(
115 array: &DictionaryArray<UInt16Type>,
116 compressor: Arc<Compressor>,
117 options: ByteViewBuildOptions,
118 ) -> LiquidByteViewArray<FsstArray> {
119 Self::from_dict_array_inner(
120 unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
121 compressor,
122 options,
123 )
124 }
125
126 pub fn train_from_arrow_dict(
128 array: &DictionaryArray<UInt16Type>,
129 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
130 let options = if array.values().data_type() == &DataType::Utf8 {
131 ByteViewBuildOptions::new(ArrowByteType::Dict16Utf8)
132 } else if array.values().data_type() == &DataType::Binary {
133 ByteViewBuildOptions::new(ArrowByteType::Dict16Binary)
134 } else {
135 panic!("Unsupported dictionary type: {:?}", array.data_type())
136 };
137 Self::train_from_arrow_dict_inner(array, options)
138 }
139
140 pub(crate) fn train_from_string_view_inner(
141 array: &StringViewArray,
142 options: ByteViewBuildOptions,
143 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
144 let compressor = Self::train_compressor(array.iter());
145 (
146 compressor.clone(),
147 Self::from_view_array_inner(array, compressor, options),
148 )
149 }
150
151 pub(crate) fn train_from_binary_view_inner(
152 array: &BinaryViewArray,
153 options: ByteViewBuildOptions,
154 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
155 let compressor = Self::train_compressor_bytes(array.iter());
156 (
157 compressor.clone(),
158 Self::from_view_array_inner(array, compressor, options),
159 )
160 }
161
162 pub(crate) fn train_from_arrow_inner<T: ByteArrayType>(
163 array: &GenericByteArray<T>,
164 options: ByteViewBuildOptions,
165 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
166 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
167 let value_type = dict.as_ref().values().data_type();
168
169 let compressor = if value_type == &DataType::Utf8 {
170 Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
171 } else {
172 Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
173 };
174 (
175 compressor.clone(),
176 Self::from_dict_array_inner(dict, compressor, options),
177 )
178 }
179
180 pub(crate) fn train_from_arrow_dict_inner(
181 array: &DictionaryArray<UInt16Type>,
182 options: ByteViewBuildOptions,
183 ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
184 if array.values().data_type() == &DataType::Utf8 {
185 let values = array.values().as_string::<i32>();
186
187 let compressor = Self::train_compressor(values.iter());
188 (
189 compressor.clone(),
190 Self::from_dict_array_inner(
191 CheckedDictionaryArray::new_checked(array),
192 compressor,
193 options,
194 ),
195 )
196 } else if array.values().data_type() == &DataType::Binary {
197 let values = array.values().as_binary::<i32>();
198 let compressor = Self::train_compressor_bytes(values.iter());
199 (
200 compressor.clone(),
201 Self::from_dict_array_inner(
202 CheckedDictionaryArray::new_checked(array),
203 compressor,
204 options,
205 ),
206 )
207 } else {
208 panic!("Unsupported dictionary type: {:?}", array.data_type())
209 }
210 }
211
212 pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
214 array: ArrayIter<T>,
215 ) -> Arc<Compressor> {
216 Arc::new(train_compressor(
217 array.filter_map(|s| s.as_ref().map(|s| s.as_bytes())),
218 ))
219 }
220
221 pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
223 array: ArrayIter<T>,
224 ) -> Arc<Compressor> {
225 Arc::new(train_compressor(
226 array.filter_map(|s| s.as_ref().map(|s| *s)),
227 ))
228 }
229
230 pub(crate) fn from_view_array_inner<T>(
232 array: &T,
233 compressor: Arc<Compressor>,
234 options: ByteViewBuildOptions,
235 ) -> LiquidByteViewArray<FsstArray>
236 where
237 T: Array + 'static,
238 {
239 let dict = if let Some(string_view) = array.as_any().downcast_ref::<StringViewArray>() {
241 CheckedDictionaryArray::from_string_view_array(string_view)
242 } else if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
243 CheckedDictionaryArray::from_binary_view_array(binary_view)
244 } else {
245 panic!("Unsupported view array type")
246 };
247
248 Self::from_dict_array_inner(dict, compressor, options)
249 }
250
251 pub(crate) fn from_byte_array_inner<T: ByteArrayType>(
252 array: &GenericByteArray<T>,
253 compressor: Arc<Compressor>,
254 options: ByteViewBuildOptions,
255 ) -> LiquidByteViewArray<FsstArray> {
256 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
257 Self::from_dict_array_inner(dict, compressor, options)
258 }
259
260 fn from_dict_array_inner(
262 dict: CheckedDictionaryArray,
263 compressor: Arc<Compressor>,
264 options: ByteViewBuildOptions,
265 ) -> LiquidByteViewArray<FsstArray> {
266 let (keys, values) = dict.as_ref().clone().into_parts();
267 let arrow_type = options.arrow_type;
268
269 let shared_prefix = if values.is_empty() {
271 Vec::new()
272 } else {
273 let first_value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
275 string_values.value(0).as_bytes()
276 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
277 binary_values.value(0)
278 } else {
279 panic!("Unsupported dictionary value type")
280 };
281
282 let mut shared_prefix = first_value_bytes.to_vec();
283
284 for i in 1..values.len() {
286 let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
287 string_values.value(i).as_bytes()
288 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
289 binary_values.value(i)
290 } else {
291 panic!("Unsupported dictionary value type")
292 };
293
294 let common_len = shared_prefix
295 .iter()
296 .zip(value_bytes.iter())
297 .take_while(|(a, b)| a == b)
298 .count();
299 shared_prefix.truncate(common_len);
300
301 if shared_prefix.is_empty() {
303 break;
304 }
305 }
306
307 shared_prefix
308 };
309
310 let shared_prefix_len = shared_prefix.len();
311
312 let mut prefix_keys = Vec::with_capacity(values.len());
314 let mut fingerprints = options
315 .build_fingerprints
316 .then(|| Vec::with_capacity(values.len()));
317
318 let mut compress_buffer = Vec::with_capacity(1024 * 1024 * 2);
319
320 let (raw_fsst_buffer, byte_offsets) =
322 if let Some(string_values) = values.as_string_opt::<i32>() {
323 RawFsstBuffer::from_byte_slices(
324 string_values.iter().map(|s| s.map(|s| s.as_bytes())),
325 compressor.clone(),
326 &mut compress_buffer,
327 )
328 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
329 RawFsstBuffer::from_byte_slices(
330 binary_values.iter(),
331 compressor.clone(),
332 &mut compress_buffer,
333 )
334 } else {
335 panic!("Unsupported dictionary value type")
336 };
337
338 for i in 0..values.len() {
339 let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
340 string_values.value(i).as_bytes()
341 } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
342 binary_values.value(i)
343 } else {
344 panic!("Unsupported dictionary value type")
345 };
346
347 let remaining_bytes = if shared_prefix_len < value_bytes.len() {
348 &value_bytes[shared_prefix_len..]
349 } else {
350 &[]
351 };
352
353 prefix_keys.push(PrefixKey::new(remaining_bytes));
354 if let Some(ref mut fingerprints) = fingerprints {
355 fingerprints.push(StringFingerprint::from_bytes(value_bytes).bits());
356 }
357 }
358
359 assert_eq!(values.len(), byte_offsets.len() - 1);
360
361 let prefix_keys: Arc<[PrefixKey]> = prefix_keys.into();
362
363 let mut array = LiquidByteViewArray::from_parts(
364 keys,
365 prefix_keys,
366 FsstArray::from_byte_offsets(Arc::new(raw_fsst_buffer), &byte_offsets, compressor),
367 arrow_type,
368 shared_prefix,
369 );
370 if let Some(fingerprints) = fingerprints {
371 array.string_fingerprints = Some(Arc::from(fingerprints.into_boxed_slice()));
372 }
373 array
374 }
375
376 pub(super) fn from_parts(
378 dictionary_keys: UInt16Array,
379 prefix_keys: Arc<[PrefixKey]>,
380 fsst_buffer: B,
381 original_arrow_type: ArrowByteType,
382 shared_prefix: Vec<u8>,
383 ) -> Self {
384 Self {
385 dictionary_keys,
386 prefix_keys,
387 fsst_buffer,
388 original_arrow_type,
389 shared_prefix,
390 string_fingerprints: None,
391 }
392 }
393}