lance_encoding/v2/
encoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, env, hash::RandomState, sync::Arc};
5
6use arrow::array::AsArray;
7use arrow_array::{ArrayRef, UInt8Array};
8use arrow_schema::DataType;
9use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
10use snafu::location;
11
12use crate::{
13    buffer::LanceBuffer,
14    data::DataBlock,
15    encoder::{ColumnIndexSequence, EncodingOptions, FieldEncoder, FieldEncodingStrategy},
16    encodings::{
17        logical::r#struct::StructFieldEncoder,
18        physical::{
19            block::{CompressionConfig, CompressionScheme},
20            value::ValueEncoder,
21        },
22    },
23    format::pb,
24    v2::encodings::{
25        logical::{
26            blob::BlobFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder,
27        },
28        physical::{
29            basic::BasicEncoder,
30            binary::BinaryEncoder,
31            bitpack::{compute_compressed_bit_width_for_non_neg, BitpackedForNonNegArrayEncoder},
32            dictionary::{AlreadyDictionaryEncoder, DictionaryEncoder},
33            fixed_size_binary::FixedSizeBinaryEncoder,
34            fixed_size_list::FslEncoder,
35            fsst::FsstArrayEncoder,
36            packed_struct::PackedStructEncoder,
37        },
38    },
39    version::LanceFileVersion,
40};
41
42use lance_core::datatypes::{
43    Field, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY,
44    PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
45};
46use lance_core::{Error, Result};
47
48/// An encoded array
49///
50/// Maps to a single Arrow array
51///
52/// This contains the encoded data as well as a description of the encoding that was applied which
53/// can be used to decode the data later.
54#[derive(Debug)]
55pub struct EncodedArray {
56    /// The encoded buffers
57    pub data: DataBlock,
58    /// A description of the encoding used to encode the array
59    pub encoding: pb::ArrayEncoding,
60}
61
62impl EncodedArray {
63    pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
64        Self { data, encoding }
65    }
66
67    pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
68        let buffers = self.data.into_buffers();
69        (buffers, self.encoding)
70    }
71}
72
73/// Encodes data from one format to another (hopefully more compact or useful) format
74///
75/// The array encoder must be Send + Sync.  Encoding is always done on its own
76/// thread task in the background and there could potentially be multiple encode
77/// tasks running for a column at once.
78pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
79    /// Encode data
80    ///
81    /// The result should contain a description of the encoding that was chosen.
82    /// This can be used to decode the data later.
83    fn encode(
84        &self,
85        data: DataBlock,
86        data_type: &DataType,
87        buffer_index: &mut u32,
88    ) -> Result<EncodedArray>;
89}
90
91/// A trait to pick which encoding strategy to use for a single page
92/// of data
93///
94/// Presumably, implementations will make encoding decisions based on
95/// array statistics.
96pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
97    fn create_array_encoder(
98        &self,
99        arrays: &[ArrayRef],
100        field: &Field,
101    ) -> Result<Box<dyn ArrayEncoder>>;
102}
103
104/// The core field encoding strategy is a set of basic encodings that
105/// are generally applicable in most scenarios.
106#[derive(Debug)]
107pub struct CoreFieldEncodingStrategy {
108    pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
109    pub version: LanceFileVersion,
110}
111
112// For some reason clippy has a false negative and thinks this can be derived but
113// it can't because ArrayEncodingStrategy has no default implementation
114#[allow(clippy::derivable_impls)]
115impl Default for CoreFieldEncodingStrategy {
116    fn default() -> Self {
117        Self {
118            array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
119            version: LanceFileVersion::default(),
120        }
121    }
122}
123
124impl CoreFieldEncodingStrategy {
125    fn is_primitive_type(data_type: &DataType) -> bool {
126        matches!(
127            data_type,
128            DataType::Boolean
129                | DataType::Date32
130                | DataType::Date64
131                | DataType::Decimal128(_, _)
132                | DataType::Decimal256(_, _)
133                | DataType::Duration(_)
134                | DataType::Float16
135                | DataType::Float32
136                | DataType::Float64
137                | DataType::Int16
138                | DataType::Int32
139                | DataType::Int64
140                | DataType::Int8
141                | DataType::Interval(_)
142                | DataType::Null
143                | DataType::Time32(_)
144                | DataType::Time64(_)
145                | DataType::Timestamp(_, _)
146                | DataType::UInt16
147                | DataType::UInt32
148                | DataType::UInt64
149                | DataType::UInt8
150                | DataType::FixedSizeBinary(_)
151                | DataType::FixedSizeList(_, _)
152                | DataType::Binary
153                | DataType::LargeBinary
154                | DataType::Utf8
155                | DataType::LargeUtf8,
156        )
157    }
158}
159
160impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
161    fn create_field_encoder(
162        &self,
163        encoding_strategy_root: &dyn FieldEncodingStrategy,
164        field: &Field,
165        column_index: &mut ColumnIndexSequence,
166        options: &EncodingOptions,
167    ) -> Result<Box<dyn FieldEncoder>> {
168        let data_type = field.data_type();
169        if Self::is_primitive_type(&data_type) {
170            let column_index = column_index.next_column_index(field.id as u32);
171            if field.metadata.contains_key(BLOB_META_KEY) {
172                let mut packed_meta = HashMap::new();
173                packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
174                let desc_field =
175                    Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
176                let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
177                    options,
178                    self.array_encoding_strategy.clone(),
179                    column_index,
180                    desc_field,
181                )?);
182                Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
183            } else {
184                Ok(Box::new(PrimitiveFieldEncoder::try_new(
185                    options,
186                    self.array_encoding_strategy.clone(),
187                    column_index,
188                    field.clone(),
189                )?))
190            }
191        } else {
192            match data_type {
193                DataType::List(_child) | DataType::LargeList(_child) => {
194                    let list_idx = column_index.next_column_index(field.id as u32);
195                    let inner_encoding = encoding_strategy_root.create_field_encoder(
196                        encoding_strategy_root,
197                        &field.children[0],
198                        column_index,
199                        options,
200                    )?;
201                    let offsets_encoder =
202                        Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
203                    Ok(Box::new(ListFieldEncoder::new(
204                        inner_encoding,
205                        offsets_encoder,
206                        options.cache_bytes_per_column,
207                        options.keep_original_array,
208                        list_idx,
209                    )))
210                }
211                DataType::Struct(_) => {
212                    let field_metadata = &field.metadata;
213                    if field_metadata
214                        .get(PACKED_STRUCT_LEGACY_META_KEY)
215                        .map(|v| v == "true")
216                        .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
217                    {
218                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
219                            options,
220                            self.array_encoding_strategy.clone(),
221                            column_index.next_column_index(field.id as u32),
222                            field.clone(),
223                        )?))
224                    } else {
225                        let header_idx = column_index.next_column_index(field.id as u32);
226                        let children_encoders = field
227                            .children
228                            .iter()
229                            .map(|field| {
230                                self.create_field_encoder(
231                                    encoding_strategy_root,
232                                    field,
233                                    column_index,
234                                    options,
235                                )
236                            })
237                            .collect::<Result<Vec<_>>>()?;
238                        Ok(Box::new(StructFieldEncoder::new(
239                            children_encoders,
240                            header_idx,
241                        )))
242                    }
243                }
244                DataType::Dictionary(_, value_type) => {
245                    // A dictionary of primitive is, itself, primitive
246                    if Self::is_primitive_type(&value_type) {
247                        Ok(Box::new(PrimitiveFieldEncoder::try_new(
248                            options,
249                            self.array_encoding_strategy.clone(),
250                            column_index.next_column_index(field.id as u32),
251                            field.clone(),
252                        )?))
253                    } else {
254                        // A dictionary of logical is, itself, logical and we don't support that today
255                        // It could be possible (e.g. store indices in one column and values in remaining columns)
256                        // but would be a significant amount of work
257                        //
258                        // An easier fallback implementation would be to decode-on-write and encode-on-read
259                        Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
260                    }
261                }
262                _ => todo!("Implement encoding for field {}", field),
263            }
264        }
265    }
266}
267
268/// The core array encoding strategy is a set of basic encodings that
269/// are generally applicable in most scenarios.
270#[derive(Debug, Default)]
271pub struct CoreArrayEncodingStrategy {
272    pub version: LanceFileVersion,
273}
274
275const BINARY_DATATYPES: [DataType; 4] = [
276    DataType::Binary,
277    DataType::LargeBinary,
278    DataType::Utf8,
279    DataType::LargeUtf8,
280];
281
282impl CoreArrayEncodingStrategy {
283    fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
284        version >= LanceFileVersion::V2_1
285            && matches!(data_type, DataType::Utf8 | DataType::Binary)
286            && data_size > 4 * 1024 * 1024
287    }
288
289    fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
290        let compression = field_meta.get(COMPRESSION_META_KEY)?;
291        let compression_scheme = compression.parse::<CompressionScheme>();
292        match compression_scheme {
293            Ok(compression_scheme) => Some(CompressionConfig::new(
294                compression_scheme,
295                field_meta
296                    .get(COMPRESSION_LEVEL_META_KEY)
297                    .and_then(|level| level.parse().ok()),
298            )),
299            Err(_) => None,
300        }
301    }
302
303    fn default_binary_encoder(
304        arrays: &[ArrayRef],
305        data_type: &DataType,
306        field_meta: Option<&HashMap<String, String>>,
307        data_size: u64,
308        version: LanceFileVersion,
309    ) -> Result<Box<dyn ArrayEncoder>> {
310        let bin_indices_encoder =
311            Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
312
313        if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
314            if compression.scheme == CompressionScheme::Fsst {
315                // User requested FSST
316                let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
317                Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
318            } else {
319                // Generic compression
320                Ok(Box::new(BinaryEncoder::new(
321                    bin_indices_encoder,
322                    Some(compression),
323                )))
324            }
325        } else {
326            // No user-specified compression, use FSST if we can
327            let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
328            if Self::can_use_fsst(data_type, data_size, version) {
329                Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
330            } else {
331                Ok(bin_encoder)
332            }
333        }
334    }
335
336    fn choose_array_encoder(
337        arrays: &[ArrayRef],
338        data_type: &DataType,
339        data_size: u64,
340        use_dict_encoding: bool,
341        version: LanceFileVersion,
342        field_meta: Option<&HashMap<String, String>>,
343    ) -> Result<Box<dyn ArrayEncoder>> {
344        match data_type {
345            DataType::FixedSizeList(inner, dimension) => {
346                Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
347                    Self::choose_array_encoder(
348                        arrays,
349                        inner.data_type(),
350                        data_size,
351                        use_dict_encoding,
352                        version,
353                        None,
354                    )?,
355                    *dimension as u32,
356                )))))
357            }
358            DataType::Dictionary(key_type, value_type) => {
359                let key_encoder =
360                    Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
361                let value_encoder = Self::choose_array_encoder(
362                    arrays, value_type, data_size, false, version, None,
363                )?;
364
365                Ok(Box::new(AlreadyDictionaryEncoder::new(
366                    key_encoder,
367                    value_encoder,
368                )))
369            }
370            DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
371                if use_dict_encoding {
372                    let dict_indices_encoder = Self::choose_array_encoder(
373                        // We need to pass arrays to this method to figure out what kind of compression to
374                        // use but we haven't actually calculated the indices yet.  For now, we just assume
375                        // worst case and use the full range.  In the future maybe we can pass in statistics
376                        // instead of the actual data
377                        &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
378                        &DataType::UInt8,
379                        data_size,
380                        false,
381                        version,
382                        None,
383                    )?;
384                    let dict_items_encoder = Self::choose_array_encoder(
385                        arrays,
386                        &DataType::Utf8,
387                        data_size,
388                        false,
389                        version,
390                        None,
391                    )?;
392
393                    Ok(Box::new(DictionaryEncoder::new(
394                        dict_indices_encoder,
395                        dict_items_encoder,
396                    )))
397                }
398                // The parent datatype should be binary or utf8 to use the fixed size encoding
399                // The variable 'data_type' is passed through recursion so comparing with it would be incorrect
400                else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
401                    if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
402                        // use FixedSizeBinaryEncoder
403                        let bytes_encoder = Self::choose_array_encoder(
404                            arrays,
405                            &DataType::UInt8,
406                            data_size,
407                            false,
408                            version,
409                            None,
410                        )?;
411
412                        Ok(Box::new(BasicEncoder::new(Box::new(
413                            FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
414                        ))))
415                    } else {
416                        Self::default_binary_encoder(
417                            arrays, data_type, field_meta, data_size, version,
418                        )
419                    }
420                } else {
421                    Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
422                }
423            }
424            DataType::Struct(fields) => {
425                let num_fields = fields.len();
426                let mut inner_encoders = Vec::new();
427
428                for i in 0..num_fields {
429                    let inner_datatype = fields[i].data_type();
430                    let inner_encoder = Self::choose_array_encoder(
431                        arrays,
432                        inner_datatype,
433                        data_size,
434                        use_dict_encoding,
435                        version,
436                        None,
437                    )?;
438                    inner_encoders.push(inner_encoder);
439                }
440
441                Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
442            }
443            DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
444                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
445                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
446                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
447                        compressed_bit_width as usize,
448                        data_type.clone(),
449                    )))
450                } else {
451                    Ok(Box::new(BasicEncoder::new(Box::new(
452                        ValueEncoder::default(),
453                    ))))
454                }
455            }
456
457            // TODO: for signed integers, I intend to make it a cascaded encoding, a sparse array for the negative values and very wide(bit-width) values,
458            // then a bitpacked array for the narrow(bit-width) values, I need `BitpackedForNeg` to be merged first, I am
459            // thinking about putting this sparse array in the metadata so bitpacking remain using one page buffer only.
460            DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
461                if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
462                    let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
463                    Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
464                        compressed_bit_width as usize,
465                        data_type.clone(),
466                    )))
467                } else {
468                    Ok(Box::new(BasicEncoder::new(Box::new(
469                        ValueEncoder::default(),
470                    ))))
471                }
472            }
473            _ => Ok(Box::new(BasicEncoder::new(Box::new(
474                ValueEncoder::default(),
475            )))),
476        }
477    }
478}
479
480fn get_dict_encoding_threshold() -> u64 {
481    env::var("LANCE_DICT_ENCODING_THRESHOLD")
482        .ok()
483        .and_then(|val| val.parse().ok())
484        .unwrap_or(100)
485}
486
487// check whether we want to use dictionary encoding or not
488// by applying a threshold on cardinality
489// returns true if cardinality < threshold but false if the total number of rows is less than the threshold
490// The choice to use 100 is just a heuristic for now
491// hyperloglog is used for cardinality estimation
492// error rate = 1.04 / sqrt(2^p), where p is the precision
493// and error rate is 1.04 / sqrt(2^12) = 1.56%
494fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
495    let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
496    if num_total_rows < threshold as usize {
497        return false;
498    }
499    const PRECISION: u8 = 12;
500
501    let mut hll: HyperLogLogPlus<String, RandomState> =
502        HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
503
504    for arr in arrays {
505        let string_array = arrow_array::cast::as_string_array(arr);
506        for value in string_array.iter().flatten() {
507            hll.insert(value);
508            let estimated_cardinality = hll.count() as u64;
509            if estimated_cardinality >= threshold {
510                return false;
511            }
512        }
513    }
514
515    true
516}
517
518fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
519    if version < LanceFileVersion::V2_1 || arrays.is_empty() {
520        return None;
521    }
522
523    // make sure no array has an empty string
524    if !arrays.iter().all(|arr| {
525        if let Some(arr) = arr.as_string_opt::<i32>() {
526            arr.iter().flatten().all(|s| !s.is_empty())
527        } else if let Some(arr) = arr.as_binary_opt::<i32>() {
528            arr.iter().flatten().all(|s| !s.is_empty())
529        } else if let Some(arr) = arr.as_string_opt::<i64>() {
530            arr.iter().flatten().all(|s| !s.is_empty())
531        } else if let Some(arr) = arr.as_binary_opt::<i64>() {
532            arr.iter().flatten().all(|s| !s.is_empty())
533        } else {
534            panic!("wrong dtype");
535        }
536    }) {
537        return None;
538    }
539
540    let lengths = arrays
541        .iter()
542        .flat_map(|arr| {
543            if let Some(arr) = arr.as_string_opt::<i32>() {
544                let offsets = arr.offsets().inner();
545                offsets
546                    .windows(2)
547                    .map(|w| (w[1] - w[0]) as u64)
548                    .collect::<Vec<_>>()
549            } else if let Some(arr) = arr.as_binary_opt::<i32>() {
550                let offsets = arr.offsets().inner();
551                offsets
552                    .windows(2)
553                    .map(|w| (w[1] - w[0]) as u64)
554                    .collect::<Vec<_>>()
555            } else if let Some(arr) = arr.as_string_opt::<i64>() {
556                let offsets = arr.offsets().inner();
557                offsets
558                    .windows(2)
559                    .map(|w| (w[1] - w[0]) as u64)
560                    .collect::<Vec<_>>()
561            } else if let Some(arr) = arr.as_binary_opt::<i64>() {
562                let offsets = arr.offsets().inner();
563                offsets
564                    .windows(2)
565                    .map(|w| (w[1] - w[0]) as u64)
566                    .collect::<Vec<_>>()
567            } else {
568                panic!("wrong dtype");
569            }
570        })
571        .collect::<Vec<_>>();
572
573    // find first non-zero value in lengths
574    let first_non_zero = lengths.iter().position(|&x| x != 0);
575    if let Some(first_non_zero) = first_non_zero {
576        // make sure all lengths are equal to first_non_zero length or zero
577        if !lengths
578            .iter()
579            .all(|&x| x == 0 || x == lengths[first_non_zero])
580        {
581            return None;
582        }
583
584        // set the byte width
585        Some(lengths[first_non_zero])
586    } else {
587        None
588    }
589}
590
591impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
592    fn create_array_encoder(
593        &self,
594        arrays: &[ArrayRef],
595        field: &Field,
596    ) -> Result<Box<dyn ArrayEncoder>> {
597        let data_size = arrays
598            .iter()
599            .map(|arr| arr.get_buffer_memory_size() as u64)
600            .sum::<u64>();
601        let data_type = arrays[0].data_type();
602
603        let use_dict_encoding = data_type == &DataType::Utf8
604            && check_dict_encoding(arrays, get_dict_encoding_threshold());
605
606        Self::choose_array_encoder(
607            arrays,
608            data_type,
609            data_size,
610            use_dict_encoding,
611            self.version,
612            Some(&field.metadata),
613        )
614    }
615}
616
617#[cfg(test)]
618pub mod tests {
619    use crate::v2::encoder::{
620        check_dict_encoding, check_fixed_size_encoding, ArrayEncodingStrategy,
621        CoreArrayEncodingStrategy,
622    };
623    use crate::version::LanceFileVersion;
624    use arrow_array::{ArrayRef, StringArray};
625    use arrow_schema::Field;
626    use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
627    use std::collections::HashMap;
628    use std::sync::Arc;
629
630    fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
631        let arr = StringArray::from(arr);
632        let arr = Arc::new(arr) as ArrayRef;
633        check_dict_encoding(&[arr], threshold)
634    }
635
636    #[test]
637    fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
638        assert!(is_dict_encoding_applicable(
639            vec![Some("a"), Some("b"), Some("a"), Some("b")],
640            3,
641        ));
642    }
643
644    #[test]
645    fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
646        assert!(!is_dict_encoding_applicable(
647            vec![Some("a"), Some("b"), Some("c"), Some("d")],
648            3,
649        ));
650    }
651
652    #[test]
653    fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
654        assert!(!is_dict_encoding_applicable(
655            vec![Some("a"), Some("b"), Some("c"), Some("a")],
656            3,
657        ));
658    }
659
660    #[test]
661    fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
662        assert!(!is_dict_encoding_applicable(vec![], 3));
663    }
664
665    #[test]
666    fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
667        assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
668    }
669
670    fn is_fixed_size_encoding_applicable(
671        arrays: Vec<Vec<Option<&str>>>,
672        version: LanceFileVersion,
673    ) -> bool {
674        let mut final_arrays = Vec::new();
675        for arr in arrays {
676            let arr = StringArray::from(arr);
677            let arr = Arc::new(arr) as ArrayRef;
678            final_arrays.push(arr);
679        }
680
681        check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
682    }
683
684    #[test]
685    fn test_fixed_size_binary_encoding_applicable() {
686        assert!(!is_fixed_size_encoding_applicable(
687            vec![vec![]],
688            LanceFileVersion::V2_1
689        ));
690
691        assert!(is_fixed_size_encoding_applicable(
692            vec![vec![Some("a"), Some("b")]],
693            LanceFileVersion::V2_1
694        ));
695
696        assert!(!is_fixed_size_encoding_applicable(
697            vec![vec![Some("abc"), Some("de")]],
698            LanceFileVersion::V2_1
699        ));
700
701        assert!(is_fixed_size_encoding_applicable(
702            vec![vec![Some("pqr"), None]],
703            LanceFileVersion::V2_1
704        ));
705
706        assert!(!is_fixed_size_encoding_applicable(
707            vec![vec![Some("pqr"), Some("")]],
708            LanceFileVersion::V2_1
709        ));
710
711        assert!(!is_fixed_size_encoding_applicable(
712            vec![vec![Some(""), Some("")]],
713            LanceFileVersion::V2_1
714        ));
715    }
716
717    #[test]
718    fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
719        assert!(is_fixed_size_encoding_applicable(
720            vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
721            LanceFileVersion::V2_1
722        ));
723
724        assert!(!is_fixed_size_encoding_applicable(
725            vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
726            LanceFileVersion::V2_1
727        ));
728
729        assert!(!is_fixed_size_encoding_applicable(
730            vec![vec![Some("ab"), None], vec![None, Some("d")]],
731            LanceFileVersion::V2_1
732        ));
733
734        assert!(is_fixed_size_encoding_applicable(
735            vec![vec![Some("a"), None], vec![None, Some("d")]],
736            LanceFileVersion::V2_1
737        ));
738
739        assert!(!is_fixed_size_encoding_applicable(
740            vec![vec![Some(""), None], vec![None, Some("")]],
741            LanceFileVersion::V2_1
742        ));
743
744        assert!(!is_fixed_size_encoding_applicable(
745            vec![vec![None, None], vec![None, None]],
746            LanceFileVersion::V2_1
747        ));
748    }
749
750    fn verify_array_encoder(
751        array: ArrayRef,
752        field_meta: Option<HashMap<String, String>>,
753        version: LanceFileVersion,
754        expected_encoder: &str,
755    ) {
756        let encoding_strategy = CoreArrayEncodingStrategy { version };
757        let mut field = Field::new("test_field", array.data_type().clone(), true);
758        if let Some(field_meta) = field_meta {
759            field.set_metadata(field_meta);
760        }
761        let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
762        let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
763        assert!(encoder_result.is_ok());
764        let encoder = encoder_result.unwrap();
765        assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
766    }
767
768    #[test]
769    fn test_choose_encoder_for_zstd_compressed_string_field() {
770        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
771                             Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
772                             LanceFileVersion::V2_1,
773                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
774    }
775
776    #[test]
777    fn test_choose_encoder_for_zstd_compression_level() {
778        verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
779                             Some(HashMap::from([
780                                 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
781                                 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
782                             ])),
783                             LanceFileVersion::V2_1,
784                             "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
785    }
786}