lance_encoding/previous/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, env, hash::RandomState, sync::Arc};
5
6use arrow::array::AsArray;
7use arrow_array::{ArrayRef, UInt8Array};
8use arrow_schema::DataType;
9use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
10use snafu::location;
11
12use crate::{
13    buffer::LanceBuffer,
14    data::DataBlock,
15    encoder::{ColumnIndexSequence, EncodingOptions, FieldEncoder, FieldEncodingStrategy},
16    encodings::{
17        logical::r#struct::StructFieldEncoder,
18        physical::{
19            block::{CompressionConfig, CompressionScheme},
20            value::ValueEncoder,
21        },
22    },
23    format::pb,
24    previous::encodings::{
25        logical::{
26            blob::BlobFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder,
27        },
28        physical::{
29            basic::BasicEncoder,
30            binary::BinaryEncoder,
31            bitpack::{compute_compressed_bit_width_for_non_neg, BitpackedForNonNegArrayEncoder},
32            dictionary::{AlreadyDictionaryEncoder, DictionaryEncoder},
33            fixed_size_binary::FixedSizeBinaryEncoder,
34            fixed_size_list::FslEncoder,
35            fsst::FsstArrayEncoder,
36            packed_struct::PackedStructEncoder,
37        },
38    },
39    version::LanceFileVersion,
40};
41
42use crate::constants::{
43    COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY,
44    PACKED_STRUCT_META_KEY,
45};
46
47use lance_core::datatypes::{Field, BLOB_DESC_FIELD, BLOB_META_KEY};
48use lance_core::{Error, Result};
49
50/// An encoded array
51///
52/// Maps to a single Arrow array
53///
54/// This contains the encoded data as well as a description of the encoding that was applied which
55/// can be used to decode the data later.
56#[derive(Debug)]
57pub struct EncodedArray {
58    /// The encoded buffers
59    pub data: DataBlock,
60    /// A description of the encoding used to encode the array
61    pub encoding: pb::ArrayEncoding,
62}
63
64impl EncodedArray {
65    pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
66        Self { data, encoding }
67    }
68
69    pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
70        let buffers = self.data.into_buffers();
71        (buffers, self.encoding)
72    }
73}
74
75/// Encodes data from one format to another (hopefully more compact or useful) format
76///
77/// The array encoder must be Send + Sync.  Encoding is always done on its own
78/// thread task in the background and there could potentially be multiple encode
79/// tasks running for a column at once.
80pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
81    /// Encode data
82    ///
83    /// The result should contain a description of the encoding that was chosen.
84    /// This can be used to decode the data later.
85    fn encode(
86        &self,
87        data: DataBlock,
88        data_type: &DataType,
89        buffer_index: &mut u32,
90    ) -> Result<EncodedArray>;
91}
92
93/// A trait to pick which encoding strategy to use for a single page
94/// of data
95///
96/// Presumably, implementations will make encoding decisions based on
97/// array statistics.
98pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
99    fn create_array_encoder(
100        &self,
101        arrays: &[ArrayRef],
102        field: &Field,
103    ) -> Result<Box<dyn ArrayEncoder>>;
104}
105
106/// The core field encoding strategy is a set of basic encodings that
107/// are generally applicable in most scenarios.
108#[derive(Debug)]
109pub struct CoreFieldEncodingStrategy {
110    pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
111    pub version: LanceFileVersion,
112}
113
114// For some reason clippy has a false negative and thinks this can be derived but
115// it can't because ArrayEncodingStrategy has no default implementation
116#[allow(clippy::derivable_impls)]
117impl Default for CoreFieldEncodingStrategy {
118    fn default() -> Self {
119        Self {
120            array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
121            version: LanceFileVersion::default(),
122        }
123    }
124}
125
126impl CoreFieldEncodingStrategy {
127    fn is_primitive_type(data_type: &DataType) -> bool {
128        matches!(
129            data_type,
130            DataType::Boolean
131                | DataType::Date32
132                | DataType::Date64
133                | DataType::Decimal128(_, _)
134                | DataType::Decimal256(_, _)
135                | DataType::Duration(_)
136                | DataType::Float16
137                | DataType::Float32
138                | DataType::Float64
139                | DataType::Int16
140                | DataType::Int32
141                | DataType::Int64
142                | DataType::Int8
143                | DataType::Interval(_)
144                | DataType::Null
145                | DataType::Time32(_)
146                | DataType::Time64(_)
147                | DataType::Timestamp(_, _)
148                | DataType::UInt16
149                | DataType::UInt32
150                | DataType::UInt64
151                | DataType::UInt8
152                | DataType::FixedSizeBinary(_)
153                | DataType::FixedSizeList(_, _)
154                | DataType::Binary
155                | DataType::LargeBinary
156                | DataType::Utf8
157                | DataType::LargeUtf8,
158        )
159    }
160}
161
162impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
163    fn create_field_encoder(
164        &self,
165        encoding_strategy_root: &dyn FieldEncodingStrategy,
166        field: &Field,
167        column_index: &mut ColumnIndexSequence,
168        options: &EncodingOptions,
169    ) -> Result<Box<dyn FieldEncoder>> {
170        let data_type = field.data_type();
171        if Self::is_primitive_type(&data_type) {
172            let column_index = column_index.next_column_index(field.id as u32);
173            if field.metadata.contains_key(BLOB_META_KEY) {
174                let mut packed_meta = HashMap::new();
175                packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
176                let desc_field =
177                    Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
178                let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
179                    options,
180                    self.array_encoding_strategy.clone(),
181                    column_index,
182                    desc_field,
183                )?);
184                Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
185            } else {
186                Ok(Box::new(PrimitiveFieldEncoder::try_new(
187                    options,
188                    self.array_encoding_strategy.clone(),
189                    column_index,
190                    field.clone(),
191                )?))
192            }
193        } else {
194            match data_type {
195                DataType::List(_child) | DataType::LargeList(_child) => {
196                    let list_idx = column_index.next_column_index(field.id as u32);
197                    let inner_encoding = encoding_strategy_root.create_field_encoder(
198                        encoding_strategy_root,
199                        &field.children[0],
200                        column_index,
201                        options,
202                    )?;
203                    let offsets_encoder =
204                        Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
205                    Ok(Box::new(ListFieldEncoder::new(
206                        inner_encoding,
207                        offsets_encoder,
208                        options.cache_bytes_per_column,
209                        options.keep_original_array,
210                        list_idx,
211                    )))
212                }
213                DataType::Struct(_) => {
214                    let field_metadata = &field.metadata;
215                    if field_metadata
216                        .get(PACKED_STRUCT_LEGACY_META_KEY)
217                        .map(|v| v == "true")
218                        .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
219                    {
220                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
221                            options,
222                            self.array_encoding_strategy.clone(),
223                            column_index.next_column_index(field.id as u32),
224                            field.clone(),
225                        )?))
226                    } else {
227                        let header_idx = column_index.next_column_index(field.id as u32);
228                        let children_encoders = field
229                            .children
230                            .iter()
231                            .map(|field| {
232                                self.create_field_encoder(
233                                    encoding_strategy_root,
234                                    field,
235                                    column_index,
236                                    options,
237                                )
238                            })
239                            .collect::<Result<Vec<_>>>()?;
240                        Ok(Box::new(StructFieldEncoder::new(
241                            children_encoders,
242                            header_idx,
243                        )))
244                    }
245                }
246                DataType::Dictionary(_, value_type) => {
247                    // A dictionary of primitive is, itself, primitive
248                    if Self::is_primitive_type(&value_type) {
249                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
250                            options,
251                            self.array_encoding_strategy.clone(),
252                            column_index.next_column_index(field.id as u32),
253                            field.clone(),
254                        )?))
255                    } else {
256                        // A dictionary of logical is, itself, logical and we don't support that today
257                        // It could be possible (e.g. store indices in one column and values in remaining columns)
258                        // but would be a significant amount of work
259                        //
260                        // An easier fallback implementation would be to decode-on-write and encode-on-read
261                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
262                    }
263                }
264                _ => todo!("Implement encoding for field {}", field),
265            }
266        }
267    }
268}
269
270/// The core array encoding strategy is a set of basic encodings that
271/// are generally applicable in most scenarios.
272#[derive(Debug, Default)]
273pub struct CoreArrayEncodingStrategy {
274    pub version: LanceFileVersion,
275}
276
277const BINARY_DATATYPES: [DataType; 4] = [
278    DataType::Binary,
279    DataType::LargeBinary,
280    DataType::Utf8,
281    DataType::LargeUtf8,
282];
283
284impl CoreArrayEncodingStrategy {
285    fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
286        version >= LanceFileVersion::V2_1
287            && matches!(data_type, DataType::Utf8 | DataType::Binary)
288            && data_size > 4 * 1024 * 1024
289    }
290
291    fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
292        let compression = field_meta.get(COMPRESSION_META_KEY)?;
293        let compression_scheme = compression.parse::<CompressionScheme>();
294        match compression_scheme {
295            Ok(compression_scheme) => Some(CompressionConfig::new(
296                compression_scheme,
297                field_meta
298                    .get(COMPRESSION_LEVEL_META_KEY)
299                    .and_then(|level| level.parse().ok()),
300            )),
301            Err(_) => None,
302        }
303    }
304
305    fn default_binary_encoder(
306        arrays: &[ArrayRef],
307        data_type: &DataType,
308        field_meta: Option<&HashMap<String, String>>,
309        data_size: u64,
310        version: LanceFileVersion,
311    ) -> Result<Box<dyn ArrayEncoder>> {
312        let bin_indices_encoder =
313            Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
314
315        if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
316            if compression.scheme == CompressionScheme::Fsst {
317                // User requested FSST
318                let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
319                Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
320            } else {
321                // Generic compression
322                Ok(Box::new(BinaryEncoder::new(
323                    bin_indices_encoder,
324                    Some(compression),
325                )))
326            }
327        } else {
328            // No user-specified compression, use FSST if we can
329            let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
330            if Self::can_use_fsst(data_type, data_size, version) {
331                Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
332            } else {
333                Ok(bin_encoder)
334            }
335        }
336    }
337
338    fn choose_array_encoder(
339        arrays: &[ArrayRef],
340        data_type: &DataType,
341        data_size: u64,
342        use_dict_encoding: bool,
343        version: LanceFileVersion,
344        field_meta: Option<&HashMap<String, String>>,
345    ) -> Result<Box<dyn ArrayEncoder>> {
346        match data_type {
347            DataType::FixedSizeList(inner, dimension) => {
348                Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
349                    Self::choose_array_encoder(
350                        arrays,
351                        inner.data_type(),
352                        data_size,
353                        use_dict_encoding,
354                        version,
355                        None,
356                    )?,
357                    *dimension as u32,
358                )))))
359            }
360            DataType::Dictionary(key_type, value_type) => {
361                let key_encoder =
362                    Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
363                let value_encoder = Self::choose_array_encoder(
364                    arrays, value_type, data_size, false, version, None,
365                )?;
366
367                Ok(Box::new(AlreadyDictionaryEncoder::new(
368                    key_encoder,
369                    value_encoder,
370                )))
371            }
372            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
373                if use_dict_encoding {
374                    let dict_indices_encoder = Self::choose_array_encoder(
375                        // We need to pass arrays to this method to figure out what kind of compression to
376                        // use but we haven't actually calculated the indices yet.  For now, we just assume
377                        // worst case and use the full range.  In the future maybe we can pass in statistics
378                        // instead of the actual data
379                        &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
380                        &DataType::UInt8,
381                        data_size,
382                        false,
383                        version,
384                        None,
385                    )?;
386                    let dict_items_encoder = Self::choose_array_encoder(
387                        arrays,
388                        &DataType::Utf8,
389                        data_size,
390                        false,
391                        version,
392                        None,
393                    )?;
394
395                    Ok(Box::new(DictionaryEncoder::new(
396                        dict_indices_encoder,
397                        dict_items_encoder,
398                    )))
399                }
400                // The parent datatype should be binary or utf8 to use the fixed size encoding
401                // The variable 'data_type' is passed through recursion so comparing with it would be incorrect
402                else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
403                    if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
404                        // use FixedSizeBinaryEncoder
405                        let bytes_encoder = Self::choose_array_encoder(
406                            arrays,
407                            &DataType::UInt8,
408                            data_size,
409                            false,
410                            version,
411                            None,
412                        )?;
413
414                        Ok(Box::new(BasicEncoder::new(Box::new(
415                            FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
416                        ))))
417                    } else {
418                        Self::default_binary_encoder(
419                            arrays, data_type, field_meta, data_size, version,
420                        )
421                    }
422                } else {
423                    Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
424                }
425            }
426            DataType::Struct(fields) => {
427                let num_fields = fields.len();
428                let mut inner_encoders = Vec::new();
429
430                for i in 0..num_fields {
431                    let inner_datatype = fields[i].data_type();
432                    let inner_encoder = Self::choose_array_encoder(
433                        arrays,
434                        inner_datatype,
435                        data_size,
436                        use_dict_encoding,
437                        version,
438                        None,
439                    )?;
440                    inner_encoders.push(inner_encoder);
441                }
442
443                Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
444            }
445            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
446                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
447                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
448                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
449                        compressed_bit_width as usize,
450                        data_type.clone(),
451                    )))
452                } else {
453                    Ok(Box::new(BasicEncoder::new(Box::new(
454                        ValueEncoder::default(),
455                    ))))
456                }
457            }
458
459            // TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
460            // then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am
461            // thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only.
462            DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
463                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
464                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
465                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
466                        compressed_bit_width as usize,
467                        data_type.clone(),
468                    )))
469                } else {
470                    Ok(Box::new(BasicEncoder::new(Box::new(
471                        ValueEncoder::default(),
472                    ))))
473                }
474            }
475            _ => Ok(Box::new(BasicEncoder::new(Box::new(
476                ValueEncoder::default(),
477            )))),
478        }
479    }
480}
481
482fn get_dict_encoding_threshold() -> u64 {
483    env::var("LANCE_DICT_ENCODING_THRESHOLD")
484        .ok()
485        .and_then(|val| val.parse().ok())
486        .unwrap_or(100)
487}
488
489// check whether we want to use dictionary encoding or not
490// by applying a threshold on cardinality
491// returns true if cardinality < threshold but false if the total number of rows is less than the threshold
492// The choice to use 100 is just a heuristic for now
493// hyperloglog is used for cardinality estimation
494// error rate = 1.04 / sqrt(2^p), where p is the precision
495// and error rate is 1.04 / sqrt(2^12) = 1.56%
496fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
497    let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
498    if num_total_rows < threshold as usize {
499        return false;
500    }
501    const PRECISION: u8 = 12;
502
503    let mut hll: HyperLogLogPlus<String, RandomState> =
504        HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
505
506    for arr in arrays {
507        let string_array = arrow_array::cast::as_string_array(arr);
508        for value in string_array.iter().flatten() {
509            hll.insert(value);
510            let estimated_cardinality = hll.count() as u64;
511            if estimated_cardinality >= threshold {
512                return false;
513            }
514        }
515    }
516
517    true
518}
519
520fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
521    if version < LanceFileVersion::V2_1 || arrays.is_empty() {
522        return None;
523    }
524
525    // make sure no array has an empty string
526    if !arrays.iter().all(|arr| {
527        if let Some(arr) = arr.as_string_opt::<i32>() {
528            arr.iter().flatten().all(|s| !s.is_empty())
529        } else if let Some(arr) = arr.as_binary_opt::<i32>() {
530            arr.iter().flatten().all(|s| !s.is_empty())
531        } else if let Some(arr) = arr.as_string_opt::<i64>() {
532            arr.iter().flatten().all(|s| !s.is_empty())
533        } else if let Some(arr) = arr.as_binary_opt::<i64>() {
534            arr.iter().flatten().all(|s| !s.is_empty())
535        } else {
536            panic!("wrong dtype");
537        }
538    }) {
539        return None;
540    }
541
542    let lengths = arrays
543        .iter()
544        .flat_map(|arr| {
545            if let Some(arr) = arr.as_string_opt::<i32>() {
546                let offsets = arr.offsets().inner();
547                offsets
548                    .windows(2)
549                    .map(|w| (w[1] - w[0]) as u64)
550                    .collect::<Vec<_>>()
551            } else if let Some(arr) = arr.as_binary_opt::<i32>() {
552                let offsets = arr.offsets().inner();
553                offsets
554                    .windows(2)
555                    .map(|w| (w[1] - w[0]) as u64)
556                    .collect::<Vec<_>>()
557            } else if let Some(arr) = arr.as_string_opt::<i64>() {
558                let offsets = arr.offsets().inner();
559                offsets
560                    .windows(2)
561                    .map(|w| (w[1] - w[0]) as u64)
562                    .collect::<Vec<_>>()
563            } else if let Some(arr) = arr.as_binary_opt::<i64>() {
564                let offsets = arr.offsets().inner();
565                offsets
566                    .windows(2)
567                    .map(|w| (w[1] - w[0]) as u64)
568                    .collect::<Vec<_>>()
569            } else {
570                panic!("wrong dtype");
571            }
572        })
573        .collect::<Vec<_>>();
574
575    // find first non-zero value in lengths
576    let first_non_zero = lengths.iter().position(|&x| x != 0);
577    if let Some(first_non_zero) = first_non_zero {
578        // make sure all lengths are equal to first_non_zero length or zero
579        if !lengths
580            .iter()
581            .all(|&x| x == 0 || x == lengths[first_non_zero])
582        {
583            return None;
584        }
585
586        // set the byte width
587        Some(lengths[first_non_zero])
588    } else {
589        None
590    }
591}
592
593impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
594    fn create_array_encoder(
595        &self,
596        arrays: &[ArrayRef],
597        field: &Field,
598    ) -> Result<Box<dyn ArrayEncoder>> {
599        let data_size = arrays
600            .iter()
601            .map(|arr| arr.get_buffer_memory_size() as u64)
602            .sum::<u64>();
603        let data_type = arrays[0].data_type();
604
605        let use_dict_encoding = data_type == &DataType::Utf8
606            && check_dict_encoding(arrays, get_dict_encoding_threshold());
607
608        Self::choose_array_encoder(
609            arrays,
610            data_type,
611            data_size,
612            use_dict_encoding,
613            self.version,
614            Some(&field.metadata),
615        )
616    }
617}
618
619#[cfg(test)]
620pub mod tests {
621    use crate::constants::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
622    use crate::previous::encoder::{
623        check_dict_encoding, check_fixed_size_encoding, ArrayEncodingStrategy,
624        CoreArrayEncodingStrategy,
625    };
626    use crate::version::LanceFileVersion;
627    use arrow_array::{ArrayRef, StringArray};
628    use arrow_schema::Field;
629    use std::collections::HashMap;
630    use std::sync::Arc;
631
632    fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
633        let arr = StringArray::from(arr);
634        let arr = Arc::new(arr) as ArrayRef;
635        check_dict_encoding(&[arr], threshold)
636    }
637
638    #[test]
639    fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
640        assert!(is_dict_encoding_applicable(
641            vec![Some("a"), Some("b"), Some("a"), Some("b")],
642            3,
643        ));
644    }
645
646    #[test]
647    fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
648        assert!(!is_dict_encoding_applicable(
649            vec![Some("a"), Some("b"), Some("c"), Some("d")],
650            3,
651        ));
652    }
653
654    #[test]
655    fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
656        assert!(!is_dict_encoding_applicable(
657            vec![Some("a"), Some("b"), Some("c"), Some("a")],
658            3,
659        ));
660    }
661
662    #[test]
663    fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
664        assert!(!is_dict_encoding_applicable(vec![], 3));
665    }
666
667    #[test]
668    fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
669        assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
670    }
671
672    fn is_fixed_size_encoding_applicable(
673        arrays: Vec<Vec<Option<&str>>>,
674        version: LanceFileVersion,
675    ) -> bool {
676        let mut final_arrays = Vec::new();
677        for arr in arrays {
678            let arr = StringArray::from(arr);
679            let arr = Arc::new(arr) as ArrayRef;
680            final_arrays.push(arr);
681        }
682
683        check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
684    }
685
686    #[test]
687    fn test_fixed_size_binary_encoding_applicable() {
688        assert!(!is_fixed_size_encoding_applicable(
689            vec![vec![]],
690            LanceFileVersion::V2_1
691        ));
692
693        assert!(is_fixed_size_encoding_applicable(
694            vec![vec![Some("a"), Some("b")]],
695            LanceFileVersion::V2_1
696        ));
697
698        assert!(!is_fixed_size_encoding_applicable(
699            vec![vec![Some("abc"), Some("de")]],
700            LanceFileVersion::V2_1
701        ));
702
703        assert!(is_fixed_size_encoding_applicable(
704            vec![vec![Some("pqr"), None]],
705            LanceFileVersion::V2_1
706        ));
707
708        assert!(!is_fixed_size_encoding_applicable(
709            vec![vec![Some("pqr"), Some("")]],
710            LanceFileVersion::V2_1
711        ));
712
713        assert!(!is_fixed_size_encoding_applicable(
714            vec![vec![Some(""), Some("")]],
715            LanceFileVersion::V2_1
716        ));
717    }
718
719    #[test]
720    fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
721        assert!(is_fixed_size_encoding_applicable(
722            vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
723            LanceFileVersion::V2_1
724        ));
725
726        assert!(!is_fixed_size_encoding_applicable(
727            vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
728            LanceFileVersion::V2_1
729        ));
730
731        assert!(!is_fixed_size_encoding_applicable(
732            vec![vec![Some("ab"), None], vec![None, Some("d")]],
733            LanceFileVersion::V2_1
734        ));
735
736        assert!(is_fixed_size_encoding_applicable(
737            vec![vec![Some("a"), None], vec![None, Some("d")]],
738            LanceFileVersion::V2_1
739        ));
740
741        assert!(!is_fixed_size_encoding_applicable(
742            vec![vec![Some(""), None], vec![None, Some("")]],
743            LanceFileVersion::V2_1
744        ));
745
746        assert!(!is_fixed_size_encoding_applicable(
747            vec![vec![None, None], vec![None, None]],
748            LanceFileVersion::V2_1
749        ));
750    }
751
752    fn verify_array_encoder(
753        array: ArrayRef,
754        field_meta: Option<HashMap<String, String>>,
755        version: LanceFileVersion,
756        expected_encoder: &str,
757    ) {
758        let encoding_strategy = CoreArrayEncodingStrategy { version };
759        let mut field = Field::new("test_field", array.data_type().clone(), true);
760        if let Some(field_meta) = field_meta {
761            field.set_metadata(field_meta);
762        }
763        let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
764        let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
765        assert!(encoder_result.is_ok());
766        let encoder = encoder_result.unwrap();
767        assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
768    }
769
770    #[test]
771    fn test_choose_encoder_for_zstd_compressed_string_field() {
772        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
773                             Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
774                             LanceFileVersion::V2_1,
775                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
776    }
777
778    #[test]
779    fn test_choose_encoder_for_zstd_compression_level() {
780        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
781                             Some(HashMap::from([
782                                 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
783                                 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
784                             ])),
785                             LanceFileVersion::V2_1,
786                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
787    }
788}