1use std::{collections::HashMap, env, sync::Arc};
4
5use arrow::array::AsArray;
6use arrow::datatypes::UInt64Type;
7use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
8use arrow_schema::DataType;
9use bytes::{Bytes, BytesMut};
10use futures::future::BoxFuture;
11use lance_core::datatypes::{
12 Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
13 COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
14};
15use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
16use lance_core::{Error, Result};
17use snafu::location;
18
19use crate::buffer::LanceBuffer;
20use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
21use crate::decoder::PageEncoding;
22use crate::encodings::logical::blob::BlobFieldEncoder;
23use crate::encodings::logical::list::ListStructuralEncoder;
24use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
25use crate::encodings::logical::r#struct::StructFieldEncoder;
26use crate::encodings::logical::r#struct::StructStructuralEncoder;
27use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
28use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
29use crate::encodings::physical::bitpack_fastlanes::{
30 compute_compressed_bit_width_for_non_neg, InlineBitpacking,
31};
32use crate::encodings::physical::block_compress::{
33 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
34};
35use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
36use crate::encodings::physical::fsst::{
37 FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
38};
39use crate::encodings::physical::packed_struct::PackedStructEncoder;
40use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
41use crate::format::ProtobufUtils;
42use crate::repdef::RepDefBuilder;
43use crate::statistics::{GetStat, Stat};
44use crate::version::LanceFileVersion;
45use crate::{
46 decoder::{ColumnInfo, PageInfo},
47 encodings::{
48 logical::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder},
49 physical::{
50 basic::BasicEncoder, binary::BinaryEncoder, dictionary::DictionaryEncoder,
51 fixed_size_binary::FixedSizeBinaryEncoder, fixed_size_list::FslEncoder,
52 value::ValueEncoder,
53 },
54 },
55 format::pb,
56};
57use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
58
59use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
60use std::collections::hash_map::RandomState;
61
62pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
64
65#[derive(Debug)]
72pub struct EncodedArray {
73 pub data: DataBlock,
75 pub encoding: pb::ArrayEncoding,
77}
78
79impl EncodedArray {
80 pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
81 Self { data, encoding }
82 }
83
84 pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
85 let buffers = self.data.into_buffers();
86 (buffers, self.encoding)
87 }
88}
89
90#[derive(Debug)]
96pub struct EncodedPage {
97 pub data: Vec<LanceBuffer>,
99 pub description: PageEncoding,
101 pub num_rows: u64,
103 pub row_number: u64,
110 pub column_idx: u32,
112}
113
114#[derive(Debug)]
115pub struct EncodedBufferMeta {
116 pub bits_per_value: u64,
117
118 pub bitpacking: Option<BitpackingBufferMeta>,
119
120 pub compression_scheme: Option<CompressionScheme>,
121}
122
123#[derive(Debug)]
124pub struct BitpackingBufferMeta {
125 pub bits_per_value: u64,
126
127 pub signed: bool,
128}
129
130pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
136 fn encode(
141 &self,
142 data: DataBlock,
143 data_type: &DataType,
144 buffer_index: &mut u32,
145 ) -> Result<EncodedArray>;
146}
147
148pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;
149pub const MAX_MINIBLOCK_VALUES: u64 = 4096;
150
151#[derive(Debug)]
154pub struct MiniBlockCompressed {
155 pub data: Vec<LanceBuffer>,
157 pub chunks: Vec<MiniBlockChunk>,
159 pub num_values: u64,
161}
162
163#[derive(Debug)]
173pub struct MiniBlockChunk {
174 pub buffer_sizes: Vec<u16>,
178 pub log_num_values: u8,
187}
188
189impl MiniBlockChunk {
190 pub fn num_values(&self, vals_in_prev_blocks: u64, total_num_values: u64) -> u64 {
197 if self.log_num_values == 0 {
198 total_num_values - vals_in_prev_blocks
199 } else {
200 1 << self.log_num_values
201 }
202 }
203}
204
205pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
210 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
217}
218
219#[derive(Debug)]
226pub enum PerValueDataBlock {
227 Fixed(FixedWidthDataBlock),
228 Variable(VariableWidthBlock),
229}
230
231impl PerValueDataBlock {
232 pub fn data_size(&self) -> u64 {
233 match self {
234 Self::Fixed(fixed) => fixed.data_size(),
235 Self::Variable(variable) => variable.data_size(),
236 }
237 }
238}
239
240pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
249 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
253}
254
255pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
266 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
271}
272
273pub fn values_column_encoding() -> pb::ColumnEncoding {
274 pb::ColumnEncoding {
275 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
276 }
277}
278
279pub struct EncodedColumn {
280 pub column_buffers: Vec<LanceBuffer>,
281 pub encoding: pb::ColumnEncoding,
282 pub final_pages: Vec<EncodedPage>,
283}
284
285impl Default for EncodedColumn {
286 fn default() -> Self {
287 Self {
288 column_buffers: Default::default(),
289 encoding: pb::ColumnEncoding {
290 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
291 },
292 final_pages: Default::default(),
293 }
294 }
295}
296
297pub struct OutOfLineBuffers {
311 position: u64,
312 buffer_alignment: u64,
313 buffers: Vec<LanceBuffer>,
314}
315
316impl OutOfLineBuffers {
317 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
318 Self {
319 position: base_position,
320 buffer_alignment,
321 buffers: Vec::new(),
322 }
323 }
324
325 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
326 let position = self.position;
327 self.position += buffer.len() as u64;
328 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
329 self.buffers.push(buffer);
330 position
331 }
332
333 pub fn take_buffers(self) -> Vec<LanceBuffer> {
334 self.buffers
335 }
336
337 pub fn reset_position(&mut self, position: u64) {
338 self.position = position;
339 }
340}
341
342pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
344
345pub trait FieldEncoder: Send {
356 fn maybe_encode(
376 &mut self,
377 array: ArrayRef,
378 external_buffers: &mut OutOfLineBuffers,
379 repdef: RepDefBuilder,
380 row_number: u64,
381 num_rows: u64,
382 ) -> Result<Vec<EncodeTask>>;
383 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
392 fn finish(
398 &mut self,
399 external_buffers: &mut OutOfLineBuffers,
400 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
401
402 fn num_columns(&self) -> u32;
404}
405
406pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
412 fn create_array_encoder(
413 &self,
414 arrays: &[ArrayRef],
415 field: &Field,
416 ) -> Result<Box<dyn ArrayEncoder>>;
417}
418
419pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
432 fn create_block_compressor(
434 &self,
435 field: &Field,
436 data: &DataBlock,
437 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
438
439 fn create_per_value(
441 &self,
442 field: &Field,
443 data: &DataBlock,
444 ) -> Result<Box<dyn PerValueCompressor>>;
445
446 fn create_miniblock_compressor(
448 &self,
449 field: &Field,
450 data: &DataBlock,
451 ) -> Result<Box<dyn MiniBlockCompressor>>;
452}
453
454#[derive(Debug, Default)]
457pub struct CoreArrayEncodingStrategy {
458 pub version: LanceFileVersion,
459}
460
461const BINARY_DATATYPES: [DataType; 4] = [
462 DataType::Binary,
463 DataType::LargeBinary,
464 DataType::Utf8,
465 DataType::LargeUtf8,
466];
467
468impl CoreArrayEncodingStrategy {
469 fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
470 version >= LanceFileVersion::V2_1
471 && matches!(data_type, DataType::Utf8 | DataType::Binary)
472 && data_size > 4 * 1024 * 1024
473 }
474
475 fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
476 let compression = field_meta.get(COMPRESSION_META_KEY)?;
477 let compression_scheme = compression.parse::<CompressionScheme>();
478 match compression_scheme {
479 Ok(compression_scheme) => Some(CompressionConfig::new(
480 compression_scheme,
481 field_meta
482 .get(COMPRESSION_LEVEL_META_KEY)
483 .and_then(|level| level.parse().ok()),
484 )),
485 Err(_) => None,
486 }
487 }
488
489 fn default_binary_encoder(
490 arrays: &[ArrayRef],
491 data_type: &DataType,
492 field_meta: Option<&HashMap<String, String>>,
493 data_size: u64,
494 version: LanceFileVersion,
495 ) -> Result<Box<dyn ArrayEncoder>> {
496 let bin_indices_encoder =
497 Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
498
499 if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
500 if compression.scheme == CompressionScheme::Fsst {
501 let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
503 Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
504 } else {
505 Ok(Box::new(BinaryEncoder::new(
507 bin_indices_encoder,
508 Some(compression),
509 )))
510 }
511 } else {
512 let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
514 if Self::can_use_fsst(data_type, data_size, version) {
515 Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
516 } else {
517 Ok(bin_encoder)
518 }
519 }
520 }
521
522 fn choose_array_encoder(
523 arrays: &[ArrayRef],
524 data_type: &DataType,
525 data_size: u64,
526 use_dict_encoding: bool,
527 version: LanceFileVersion,
528 field_meta: Option<&HashMap<String, String>>,
529 ) -> Result<Box<dyn ArrayEncoder>> {
530 match data_type {
531 DataType::FixedSizeList(inner, dimension) => {
532 Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
533 Self::choose_array_encoder(
534 arrays,
535 inner.data_type(),
536 data_size,
537 use_dict_encoding,
538 version,
539 None,
540 )?,
541 *dimension as u32,
542 )))))
543 }
544 DataType::Dictionary(key_type, value_type) => {
545 let key_encoder =
546 Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
547 let value_encoder = Self::choose_array_encoder(
548 arrays, value_type, data_size, false, version, None,
549 )?;
550
551 Ok(Box::new(AlreadyDictionaryEncoder::new(
552 key_encoder,
553 value_encoder,
554 )))
555 }
556 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
557 if use_dict_encoding {
558 let dict_indices_encoder = Self::choose_array_encoder(
559 &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
564 &DataType::UInt8,
565 data_size,
566 false,
567 version,
568 None,
569 )?;
570 let dict_items_encoder = Self::choose_array_encoder(
571 arrays,
572 &DataType::Utf8,
573 data_size,
574 false,
575 version,
576 None,
577 )?;
578
579 Ok(Box::new(DictionaryEncoder::new(
580 dict_indices_encoder,
581 dict_items_encoder,
582 )))
583 }
584 else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
587 if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
588 let bytes_encoder = Self::choose_array_encoder(
590 arrays,
591 &DataType::UInt8,
592 data_size,
593 false,
594 version,
595 None,
596 )?;
597
598 Ok(Box::new(BasicEncoder::new(Box::new(
599 FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
600 ))))
601 } else {
602 Self::default_binary_encoder(
603 arrays, data_type, field_meta, data_size, version,
604 )
605 }
606 } else {
607 Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
608 }
609 }
610 DataType::Struct(fields) => {
611 let num_fields = fields.len();
612 let mut inner_encoders = Vec::new();
613
614 for i in 0..num_fields {
615 let inner_datatype = fields[i].data_type();
616 let inner_encoder = Self::choose_array_encoder(
617 arrays,
618 inner_datatype,
619 data_size,
620 use_dict_encoding,
621 version,
622 None,
623 )?;
624 inner_encoders.push(inner_encoder);
625 }
626
627 Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
628 }
629 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
630 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
631 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
632 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
633 compressed_bit_width as usize,
634 data_type.clone(),
635 )))
636 } else {
637 Ok(Box::new(BasicEncoder::new(Box::new(
638 ValueEncoder::default(),
639 ))))
640 }
641 }
642
643 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
647 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
648 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
649 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
650 compressed_bit_width as usize,
651 data_type.clone(),
652 )))
653 } else {
654 Ok(Box::new(BasicEncoder::new(Box::new(
655 ValueEncoder::default(),
656 ))))
657 }
658 }
659 _ => Ok(Box::new(BasicEncoder::new(Box::new(
660 ValueEncoder::default(),
661 )))),
662 }
663 }
664}
665
666fn get_dict_encoding_threshold() -> u64 {
667 env::var("LANCE_DICT_ENCODING_THRESHOLD")
668 .ok()
669 .and_then(|val| val.parse().ok())
670 .unwrap_or(100)
671}
672
673fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
681 let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
682 if num_total_rows < threshold as usize {
683 return false;
684 }
685 const PRECISION: u8 = 12;
686
687 let mut hll: HyperLogLogPlus<String, RandomState> =
688 HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
689
690 for arr in arrays {
691 let string_array = arrow_array::cast::as_string_array(arr);
692 for value in string_array.iter().flatten() {
693 hll.insert(value);
694 let estimated_cardinality = hll.count() as u64;
695 if estimated_cardinality >= threshold {
696 return false;
697 }
698 }
699 }
700
701 true
702}
703
704fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
705 if version < LanceFileVersion::V2_1 || arrays.is_empty() {
706 return None;
707 }
708
709 if !arrays.iter().all(|arr| {
711 if let Some(arr) = arr.as_string_opt::<i32>() {
712 arr.iter().flatten().all(|s| !s.is_empty())
713 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
714 arr.iter().flatten().all(|s| !s.is_empty())
715 } else if let Some(arr) = arr.as_string_opt::<i64>() {
716 arr.iter().flatten().all(|s| !s.is_empty())
717 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
718 arr.iter().flatten().all(|s| !s.is_empty())
719 } else {
720 panic!("wrong dtype");
721 }
722 }) {
723 return None;
724 }
725
726 let lengths = arrays
727 .iter()
728 .flat_map(|arr| {
729 if let Some(arr) = arr.as_string_opt::<i32>() {
730 let offsets = arr.offsets().inner();
731 offsets
732 .windows(2)
733 .map(|w| (w[1] - w[0]) as u64)
734 .collect::<Vec<_>>()
735 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
736 let offsets = arr.offsets().inner();
737 offsets
738 .windows(2)
739 .map(|w| (w[1] - w[0]) as u64)
740 .collect::<Vec<_>>()
741 } else if let Some(arr) = arr.as_string_opt::<i64>() {
742 let offsets = arr.offsets().inner();
743 offsets
744 .windows(2)
745 .map(|w| (w[1] - w[0]) as u64)
746 .collect::<Vec<_>>()
747 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
748 let offsets = arr.offsets().inner();
749 offsets
750 .windows(2)
751 .map(|w| (w[1] - w[0]) as u64)
752 .collect::<Vec<_>>()
753 } else {
754 panic!("wrong dtype");
755 }
756 })
757 .collect::<Vec<_>>();
758
759 let first_non_zero = lengths.iter().position(|&x| x != 0);
761 if let Some(first_non_zero) = first_non_zero {
762 if !lengths
764 .iter()
765 .all(|&x| x == 0 || x == lengths[first_non_zero])
766 {
767 return None;
768 }
769
770 Some(lengths[first_non_zero])
772 } else {
773 None
774 }
775}
776
777impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
778 fn create_array_encoder(
779 &self,
780 arrays: &[ArrayRef],
781 field: &Field,
782 ) -> Result<Box<dyn ArrayEncoder>> {
783 let data_size = arrays
784 .iter()
785 .map(|arr| arr.get_buffer_memory_size() as u64)
786 .sum::<u64>();
787 let data_type = arrays[0].data_type();
788
789 let use_dict_encoding = data_type == &DataType::Utf8
790 && check_dict_encoding(arrays, get_dict_encoding_threshold());
791
792 Self::choose_array_encoder(
793 arrays,
794 data_type,
795 data_size,
796 use_dict_encoding,
797 self.version,
798 Some(&field.metadata),
799 )
800 }
801}
802
803impl CompressionStrategy for CoreArrayEncodingStrategy {
804 fn create_miniblock_compressor(
805 &self,
806 field: &Field,
807 data: &DataBlock,
808 ) -> Result<Box<dyn MiniBlockCompressor>> {
809 match data {
810 DataBlock::FixedWidth(fixed_width_data) => {
811 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
812 if compression == "none" {
813 return Ok(Box::new(ValueEncoder::default()));
814 }
815 }
816
817 let bit_widths = data.expect_stat(Stat::BitWidth);
818 let bit_widths = bit_widths.as_primitive::<UInt64Type>();
819 let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
822 let too_small = bit_widths.len() == 1
825 && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
826 if !has_all_zeros
827 && !too_small
828 && (fixed_width_data.bits_per_value == 8
829 || fixed_width_data.bits_per_value == 16
830 || fixed_width_data.bits_per_value == 32
831 || fixed_width_data.bits_per_value == 64)
832 {
833 Ok(Box::new(InlineBitpacking::new(
834 fixed_width_data.bits_per_value,
835 )))
836 } else {
837 Ok(Box::new(ValueEncoder::default()))
838 }
839 }
840 DataBlock::VariableWidth(variable_width_data) => {
841 if variable_width_data.bits_per_offset == 32 {
842 let data_size =
843 variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
844 let max_len =
845 variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
846
847 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
848 && data_size >= FSST_LEAST_INPUT_SIZE as u64
849 {
850 Ok(Box::new(FsstMiniBlockEncoder::default()))
851 } else {
852 Ok(Box::new(BinaryMiniBlockEncoder::default()))
853 }
854 } else {
855 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
856 }
857 }
858 DataBlock::Struct(struct_data_block) => {
859 if struct_data_block
862 .children
863 .iter()
864 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
865 {
866 panic!("packed struct encoding currently only supports fixed-width fields.")
867 }
868 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
869 }
870 DataBlock::FixedSizeList(_) => {
871 Ok(Box::new(ValueEncoder::default()))
879 }
880 _ => Err(Error::NotSupported {
881 source: format!(
882 "Mini-block compression not yet supported for block type {}",
883 data.name()
884 )
885 .into(),
886 location: location!(),
887 }),
888 }
889 }
890
891 fn create_per_value(
892 &self,
893 _field: &Field,
894 data: &DataBlock,
895 ) -> Result<Box<dyn PerValueCompressor>> {
896 match data {
897 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
898 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
899 DataBlock::VariableWidth(variable_width) => {
900 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
901 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
902
903 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
907 return Ok(Box::new(CompressedBufferEncoder::default()));
908 }
909
910 if variable_width.bits_per_offset == 32 {
911 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
912 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
913
914 let variable_compression = Box::new(VariableEncoder::default());
915
916 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
917 && data_size >= FSST_LEAST_INPUT_SIZE as u64
918 {
919 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
920 } else {
921 Ok(variable_compression)
922 }
923 } else {
924 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
925 }
926 }
927 _ => unreachable!(
928 "Per-value compression not yet supported for block type: {}",
929 data.name()
930 ),
931 }
932 }
933
934 fn create_block_compressor(
935 &self,
936 _field: &Field,
937 data: &DataBlock,
938 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
939 match data {
940 DataBlock::FixedWidth(fixed_width) => {
943 let encoder = Box::new(ValueEncoder::default());
944 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
945 Ok((encoder, encoding))
946 }
947 DataBlock::VariableWidth(variable_width) => {
948 let encoder = Box::new(VariableEncoder::default());
949 let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
950 Ok((encoder, encoding))
951 }
952 _ => unreachable!(),
953 }
954 }
955}
956#[derive(Debug, Default)]
959pub struct ColumnIndexSequence {
960 current_index: u32,
961 mapping: Vec<(u32, u32)>,
962}
963
964impl ColumnIndexSequence {
965 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
966 let idx = self.current_index;
967 self.current_index += 1;
968 self.mapping.push((field_id, idx));
969 idx
970 }
971
972 pub fn skip(&mut self) {
973 self.current_index += 1;
974 }
975}
976
977pub struct EncodingOptions {
979 pub cache_bytes_per_column: u64,
983 pub max_page_bytes: u64,
986 pub keep_original_array: bool,
991 pub buffer_alignment: u64,
996}
997
998impl Default for EncodingOptions {
999 fn default() -> Self {
1000 Self {
1001 cache_bytes_per_column: 8 * 1024 * 1024,
1002 max_page_bytes: 32 * 1024 * 1024,
1003 keep_original_array: true,
1004 buffer_alignment: 64,
1005 }
1006 }
1007}
1008
1009pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
1015 fn create_field_encoder(
1027 &self,
1028 encoding_strategy_root: &dyn FieldEncodingStrategy,
1029 field: &Field,
1030 column_index: &mut ColumnIndexSequence,
1031 options: &EncodingOptions,
1032 ) -> Result<Box<dyn FieldEncoder>>;
1033}
1034
1035pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
1036 match version.resolve() {
1037 LanceFileVersion::Legacy => panic!(),
1038 LanceFileVersion::V2_0 => Box::new(CoreFieldEncodingStrategy::default()),
1039 _ => Box::new(StructuralEncodingStrategy::default()),
1040 }
1041}
1042
1043#[derive(Debug)]
1046pub struct CoreFieldEncodingStrategy {
1047 pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
1048 pub version: LanceFileVersion,
1049}
1050
1051#[allow(clippy::derivable_impls)]
1054impl Default for CoreFieldEncodingStrategy {
1055 fn default() -> Self {
1056 Self {
1057 array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1058 version: LanceFileVersion::default(),
1059 }
1060 }
1061}
1062
1063impl CoreFieldEncodingStrategy {
1064 fn is_primitive_type(data_type: &DataType) -> bool {
1065 matches!(
1066 data_type,
1067 DataType::Boolean
1068 | DataType::Date32
1069 | DataType::Date64
1070 | DataType::Decimal128(_, _)
1071 | DataType::Decimal256(_, _)
1072 | DataType::Duration(_)
1073 | DataType::Float16
1074 | DataType::Float32
1075 | DataType::Float64
1076 | DataType::Int16
1077 | DataType::Int32
1078 | DataType::Int64
1079 | DataType::Int8
1080 | DataType::Interval(_)
1081 | DataType::Null
1082 | DataType::Time32(_)
1083 | DataType::Time64(_)
1084 | DataType::Timestamp(_, _)
1085 | DataType::UInt16
1086 | DataType::UInt32
1087 | DataType::UInt64
1088 | DataType::UInt8
1089 | DataType::FixedSizeBinary(_)
1090 | DataType::FixedSizeList(_, _)
1091 | DataType::Binary
1092 | DataType::LargeBinary
1093 | DataType::Utf8
1094 | DataType::LargeUtf8,
1095 )
1096 }
1097}
1098
1099impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
1100 fn create_field_encoder(
1101 &self,
1102 encoding_strategy_root: &dyn FieldEncodingStrategy,
1103 field: &Field,
1104 column_index: &mut ColumnIndexSequence,
1105 options: &EncodingOptions,
1106 ) -> Result<Box<dyn FieldEncoder>> {
1107 let data_type = field.data_type();
1108 if Self::is_primitive_type(&data_type) {
1109 let column_index = column_index.next_column_index(field.id as u32);
1110 if field.metadata.contains_key(BLOB_META_KEY) {
1111 let mut packed_meta = HashMap::new();
1112 packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
1113 let desc_field =
1114 Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
1115 let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
1116 options,
1117 self.array_encoding_strategy.clone(),
1118 column_index,
1119 desc_field,
1120 )?);
1121 Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
1122 } else {
1123 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1124 options,
1125 self.array_encoding_strategy.clone(),
1126 column_index,
1127 field.clone(),
1128 )?))
1129 }
1130 } else {
1131 match data_type {
1132 DataType::List(_child) | DataType::LargeList(_child) => {
1133 let list_idx = column_index.next_column_index(field.id as u32);
1134 let inner_encoding = encoding_strategy_root.create_field_encoder(
1135 encoding_strategy_root,
1136 &field.children[0],
1137 column_index,
1138 options,
1139 )?;
1140 let offsets_encoder =
1141 Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
1142 Ok(Box::new(ListFieldEncoder::new(
1143 inner_encoding,
1144 offsets_encoder,
1145 options.cache_bytes_per_column,
1146 options.keep_original_array,
1147 list_idx,
1148 )))
1149 }
1150 DataType::Struct(_) => {
1151 let field_metadata = &field.metadata;
1152 if field_metadata
1153 .get(PACKED_STRUCT_LEGACY_META_KEY)
1154 .map(|v| v == "true")
1155 .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
1156 {
1157 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1158 options,
1159 self.array_encoding_strategy.clone(),
1160 column_index.next_column_index(field.id as u32),
1161 field.clone(),
1162 )?))
1163 } else {
1164 let header_idx = column_index.next_column_index(field.id as u32);
1165 let children_encoders = field
1166 .children
1167 .iter()
1168 .map(|field| {
1169 self.create_field_encoder(
1170 encoding_strategy_root,
1171 field,
1172 column_index,
1173 options,
1174 )
1175 })
1176 .collect::<Result<Vec<_>>>()?;
1177 Ok(Box::new(StructFieldEncoder::new(
1178 children_encoders,
1179 header_idx,
1180 )))
1181 }
1182 }
1183 DataType::Dictionary(_, value_type) => {
1184 if Self::is_primitive_type(&value_type) {
1186 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1187 options,
1188 self.array_encoding_strategy.clone(),
1189 column_index.next_column_index(field.id as u32),
1190 field.clone(),
1191 )?))
1192 } else {
1193 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1199 }
1200 }
1201 _ => todo!("Implement encoding for field {}", field),
1202 }
1203 }
1204 }
1205}
1206
1207#[derive(Debug)]
1209pub struct StructuralEncodingStrategy {
1210 pub compression_strategy: Arc<dyn CompressionStrategy>,
1211 pub version: LanceFileVersion,
1212}
1213
1214#[allow(clippy::derivable_impls)]
1217impl Default for StructuralEncodingStrategy {
1218 fn default() -> Self {
1219 Self {
1220 compression_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1221 version: LanceFileVersion::default(),
1222 }
1223 }
1224}
1225
1226impl StructuralEncodingStrategy {
1227 fn is_primitive_type(data_type: &DataType) -> bool {
1228 matches!(
1229 data_type,
1230 DataType::Boolean
1231 | DataType::Date32
1232 | DataType::Date64
1233 | DataType::Decimal128(_, _)
1234 | DataType::Decimal256(_, _)
1235 | DataType::Duration(_)
1236 | DataType::Float16
1237 | DataType::Float32
1238 | DataType::Float64
1239 | DataType::Int16
1240 | DataType::Int32
1241 | DataType::Int64
1242 | DataType::Int8
1243 | DataType::Interval(_)
1244 | DataType::Null
1245 | DataType::Time32(_)
1246 | DataType::Time64(_)
1247 | DataType::Timestamp(_, _)
1248 | DataType::UInt16
1249 | DataType::UInt32
1250 | DataType::UInt64
1251 | DataType::UInt8
1252 | DataType::FixedSizeBinary(_)
1253 | DataType::FixedSizeList(_, _)
1254 | DataType::Binary
1255 | DataType::LargeBinary
1256 | DataType::Utf8
1257 | DataType::LargeUtf8,
1258 )
1259 }
1260
1261 fn do_create_field_encoder(
1262 &self,
1263 _encoding_strategy_root: &dyn FieldEncodingStrategy,
1264 field: &Field,
1265 column_index: &mut ColumnIndexSequence,
1266 options: &EncodingOptions,
1267 root_field_metadata: &HashMap<String, String>,
1268 ) -> Result<Box<dyn FieldEncoder>> {
1269 let data_type = field.data_type();
1270 if Self::is_primitive_type(&data_type) {
1271 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1272 options,
1273 self.compression_strategy.clone(),
1274 column_index.next_column_index(field.id as u32),
1275 field.clone(),
1276 Arc::new(root_field_metadata.clone()),
1277 )?))
1278 } else {
1279 match data_type {
1280 DataType::List(_) | DataType::LargeList(_) => {
1281 let child = field.children.first().expect("List should have a child");
1282 let child_encoder = self.do_create_field_encoder(
1283 _encoding_strategy_root,
1284 child,
1285 column_index,
1286 options,
1287 root_field_metadata,
1288 )?;
1289 Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
1290 }
1291 DataType::Struct(_) => {
1292 if field.is_packed_struct() {
1293 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1294 options,
1295 self.compression_strategy.clone(),
1296 column_index.next_column_index(field.id as u32),
1297 field.clone(),
1298 Arc::new(root_field_metadata.clone()),
1299 )?))
1300 } else {
1301 let children_encoders = field
1302 .children
1303 .iter()
1304 .map(|field| {
1305 self.do_create_field_encoder(
1306 _encoding_strategy_root,
1307 field,
1308 column_index,
1309 options,
1310 root_field_metadata,
1311 )
1312 })
1313 .collect::<Result<Vec<_>>>()?;
1314 Ok(Box::new(StructStructuralEncoder::new(children_encoders)))
1315 }
1316 }
1317 DataType::Dictionary(_, value_type) => {
1318 if Self::is_primitive_type(&value_type) {
1320 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1321 options,
1322 self.compression_strategy.clone(),
1323 column_index.next_column_index(field.id as u32),
1324 field.clone(),
1325 Arc::new(root_field_metadata.clone()),
1326 )?))
1327 } else {
1328 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1334 }
1335 }
1336 _ => todo!("Implement encoding for field {}", field),
1337 }
1338 }
1339 }
1340}
1341
1342impl FieldEncodingStrategy for StructuralEncodingStrategy {
1343 fn create_field_encoder(
1344 &self,
1345 encoding_strategy_root: &dyn FieldEncodingStrategy,
1346 field: &Field,
1347 column_index: &mut ColumnIndexSequence,
1348 options: &EncodingOptions,
1349 ) -> Result<Box<dyn FieldEncoder>> {
1350 self.do_create_field_encoder(
1351 encoding_strategy_root,
1352 field,
1353 column_index,
1354 options,
1355 &field.metadata,
1356 )
1357 }
1358}
1359
1360pub struct BatchEncoder {
1363 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
1364 pub field_id_to_column_index: Vec<(u32, u32)>,
1365}
1366
1367impl BatchEncoder {
1368 pub fn try_new(
1369 schema: &Schema,
1370 strategy: &dyn FieldEncodingStrategy,
1371 options: &EncodingOptions,
1372 ) -> Result<Self> {
1373 let mut col_idx = 0;
1374 let mut col_idx_sequence = ColumnIndexSequence::default();
1375 let field_encoders = schema
1376 .fields
1377 .iter()
1378 .map(|field| {
1379 let encoder = strategy.create_field_encoder(
1380 strategy,
1381 field,
1382 &mut col_idx_sequence,
1383 options,
1384 )?;
1385 col_idx += encoder.as_ref().num_columns();
1386 Ok(encoder)
1387 })
1388 .collect::<Result<Vec<_>>>()?;
1389 Ok(Self {
1390 field_encoders,
1391 field_id_to_column_index: col_idx_sequence.mapping,
1392 })
1393 }
1394
1395 pub fn num_columns(&self) -> u32 {
1396 self.field_encoders
1397 .iter()
1398 .map(|field_encoder| field_encoder.num_columns())
1399 .sum::<u32>()
1400 }
1401}
1402
1403#[derive(Debug)]
1407pub struct EncodedBatch {
1408 pub data: Bytes,
1409 pub page_table: Vec<Arc<ColumnInfo>>,
1410 pub schema: Arc<Schema>,
1411 pub top_level_columns: Vec<u32>,
1412 pub num_rows: u64,
1413}
1414
1415fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
1416 let buffers = page.data;
1417 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
1418 for buffer in buffers {
1419 let buffer_offset = data_buffer.len() as u64;
1420 data_buffer.extend_from_slice(&buffer);
1421 let size = data_buffer.len() as u64 - buffer_offset;
1422 buffer_offsets_and_sizes.push((buffer_offset, size));
1423 }
1424
1425 PageInfo {
1426 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
1427 encoding: page.description,
1428 num_rows: page.num_rows,
1429 priority: page.row_number,
1430 }
1431}
1432
1433pub async fn encode_batch(
1438 batch: &RecordBatch,
1439 schema: Arc<Schema>,
1440 encoding_strategy: &dyn FieldEncodingStrategy,
1441 options: &EncodingOptions,
1442) -> Result<EncodedBatch> {
1443 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
1444 {
1445 return Err(Error::InvalidInput {
1446 source: format!(
1447 "buffer_alignment must be a power of two and at least {}",
1448 MIN_PAGE_BUFFER_ALIGNMENT
1449 )
1450 .into(),
1451 location: location!(),
1452 });
1453 }
1454
1455 let mut data_buffer = BytesMut::new();
1456 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
1457 let options = EncodingOptions {
1458 keep_original_array: true,
1459 ..*options
1460 };
1461 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
1462 let mut page_table = Vec::new();
1463 let mut col_idx_offset = 0;
1464 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
1465 let mut external_buffers =
1466 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1467 let repdef = RepDefBuilder::default();
1468 let encoder = encoder.as_mut();
1469 let num_rows = arr.len() as u64;
1470 let mut tasks =
1471 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
1472 tasks.extend(encoder.flush(&mut external_buffers)?);
1473 for buffer in external_buffers.take_buffers() {
1474 data_buffer.extend_from_slice(&buffer);
1475 }
1476 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
1477 for task in tasks {
1478 let encoded_page = task.await?;
1479 pages
1481 .entry(encoded_page.column_idx)
1482 .or_default()
1483 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
1484 }
1485 let mut external_buffers =
1486 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1487 let encoded_columns = encoder.finish(&mut external_buffers).await?;
1488 for buffer in external_buffers.take_buffers() {
1489 data_buffer.extend_from_slice(&buffer);
1490 }
1491 let num_columns = encoded_columns.len();
1492 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
1493 let col_idx = col_idx + col_idx_offset;
1494 let mut col_buffer_offsets_and_sizes = Vec::new();
1495 for buffer in encoded_column.column_buffers {
1496 let buffer_offset = data_buffer.len() as u64;
1497 data_buffer.extend_from_slice(&buffer);
1498 let size = data_buffer.len() as u64 - buffer_offset;
1499 col_buffer_offsets_and_sizes.push((buffer_offset, size));
1500 }
1501 for page in encoded_column.final_pages {
1502 pages
1503 .entry(page.column_idx)
1504 .or_default()
1505 .push(write_page_to_data_buffer(page, &mut data_buffer));
1506 }
1507 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
1508 page_table.push(Arc::new(ColumnInfo {
1509 index: col_idx as u32,
1510 buffer_offsets_and_sizes: Arc::from(
1511 col_buffer_offsets_and_sizes.into_boxed_slice(),
1512 ),
1513 page_infos: Arc::from(col_pages.into_boxed_slice()),
1514 encoding: encoded_column.encoding,
1515 }))
1516 }
1517 col_idx_offset += num_columns;
1518 }
1519 let top_level_columns = batch_encoder
1520 .field_id_to_column_index
1521 .iter()
1522 .map(|(_, idx)| *idx)
1523 .collect();
1524 Ok(EncodedBatch {
1525 data: data_buffer.freeze(),
1526 top_level_columns,
1527 page_table,
1528 schema,
1529 num_rows: batch.num_rows() as u64,
1530 })
1531}
1532
1533#[cfg(test)]
1534pub mod tests {
1535 use crate::version::LanceFileVersion;
1536 use arrow_array::{ArrayRef, StringArray};
1537 use arrow_schema::Field;
1538 use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
1539 use std::collections::HashMap;
1540 use std::sync::Arc;
1541
1542 use super::check_fixed_size_encoding;
1543 use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};
1544
1545 fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
1546 let arr = StringArray::from(arr);
1547 let arr = Arc::new(arr) as ArrayRef;
1548 check_dict_encoding(&[arr], threshold)
1549 }
1550
1551 #[test]
1552 fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
1553 assert!(is_dict_encoding_applicable(
1554 vec![Some("a"), Some("b"), Some("a"), Some("b")],
1555 3,
1556 ));
1557 }
1558
1559 #[test]
1560 fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
1561 assert!(!is_dict_encoding_applicable(
1562 vec![Some("a"), Some("b"), Some("c"), Some("d")],
1563 3,
1564 ));
1565 }
1566
1567 #[test]
1568 fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
1569 assert!(!is_dict_encoding_applicable(
1570 vec![Some("a"), Some("b"), Some("c"), Some("a")],
1571 3,
1572 ));
1573 }
1574
1575 #[test]
1576 fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
1577 assert!(!is_dict_encoding_applicable(vec![], 3));
1578 }
1579
1580 #[test]
1581 fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
1582 assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
1583 }
1584
1585 fn is_fixed_size_encoding_applicable(
1586 arrays: Vec<Vec<Option<&str>>>,
1587 version: LanceFileVersion,
1588 ) -> bool {
1589 let mut final_arrays = Vec::new();
1590 for arr in arrays {
1591 let arr = StringArray::from(arr);
1592 let arr = Arc::new(arr) as ArrayRef;
1593 final_arrays.push(arr);
1594 }
1595
1596 check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
1597 }
1598
1599 #[test]
1600 fn test_fixed_size_binary_encoding_applicable() {
1601 assert!(!is_fixed_size_encoding_applicable(
1602 vec![vec![]],
1603 LanceFileVersion::V2_1
1604 ));
1605
1606 assert!(is_fixed_size_encoding_applicable(
1607 vec![vec![Some("a"), Some("b")]],
1608 LanceFileVersion::V2_1
1609 ));
1610
1611 assert!(!is_fixed_size_encoding_applicable(
1612 vec![vec![Some("abc"), Some("de")]],
1613 LanceFileVersion::V2_1
1614 ));
1615
1616 assert!(is_fixed_size_encoding_applicable(
1617 vec![vec![Some("pqr"), None]],
1618 LanceFileVersion::V2_1
1619 ));
1620
1621 assert!(!is_fixed_size_encoding_applicable(
1622 vec![vec![Some("pqr"), Some("")]],
1623 LanceFileVersion::V2_1
1624 ));
1625
1626 assert!(!is_fixed_size_encoding_applicable(
1627 vec![vec![Some(""), Some("")]],
1628 LanceFileVersion::V2_1
1629 ));
1630 }
1631
1632 #[test]
1633 fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
1634 assert!(is_fixed_size_encoding_applicable(
1635 vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
1636 LanceFileVersion::V2_1
1637 ));
1638
1639 assert!(!is_fixed_size_encoding_applicable(
1640 vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
1641 LanceFileVersion::V2_1
1642 ));
1643
1644 assert!(!is_fixed_size_encoding_applicable(
1645 vec![vec![Some("ab"), None], vec![None, Some("d")]],
1646 LanceFileVersion::V2_1
1647 ));
1648
1649 assert!(is_fixed_size_encoding_applicable(
1650 vec![vec![Some("a"), None], vec![None, Some("d")]],
1651 LanceFileVersion::V2_1
1652 ));
1653
1654 assert!(!is_fixed_size_encoding_applicable(
1655 vec![vec![Some(""), None], vec![None, Some("")]],
1656 LanceFileVersion::V2_1
1657 ));
1658
1659 assert!(!is_fixed_size_encoding_applicable(
1660 vec![vec![None, None], vec![None, None]],
1661 LanceFileVersion::V2_1
1662 ));
1663 }
1664
1665 fn verify_array_encoder(
1666 array: ArrayRef,
1667 field_meta: Option<HashMap<String, String>>,
1668 version: LanceFileVersion,
1669 expected_encoder: &str,
1670 ) {
1671 let encoding_strategy = CoreArrayEncodingStrategy { version };
1672 let mut field = Field::new("test_field", array.data_type().clone(), true);
1673 if let Some(field_meta) = field_meta {
1674 field.set_metadata(field_meta);
1675 }
1676 let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
1677 let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
1678 assert!(encoder_result.is_ok());
1679 let encoder = encoder_result.unwrap();
1680 assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
1681 }
1682
1683 #[test]
1684 fn test_choose_encoder_for_zstd_compressed_string_field() {
1685 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1686 Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
1687 LanceFileVersion::V2_1,
1688 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
1689 }
1690
1691 #[test]
1692 fn test_choose_encoder_for_zstd_compression_level() {
1693 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1694 Some(HashMap::from([
1695 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
1696 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
1697 ])),
1698 LanceFileVersion::V2_1,
1699 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
1700 }
1701}