1use std::{collections::HashMap, env, sync::Arc};
4
5use arrow::array::AsArray;
6use arrow::datatypes::UInt64Type;
7use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
8use arrow_schema::DataType;
9use bytes::{Bytes, BytesMut};
10use futures::future::BoxFuture;
11use lance_core::datatypes::{
12 Field, Schema, BLOB_DESC_FIELD, BLOB_META_KEY, COMPRESSION_LEVEL_META_KEY,
13 COMPRESSION_META_KEY, PACKED_STRUCT_LEGACY_META_KEY, PACKED_STRUCT_META_KEY,
14};
15use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
16use lance_core::{Error, Result};
17use snafu::location;
18
19use crate::buffer::LanceBuffer;
20use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
21use crate::decoder::PageEncoding;
22use crate::encodings::logical::blob::BlobFieldEncoder;
23use crate::encodings::logical::list::ListStructuralEncoder;
24use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
25use crate::encodings::logical::r#struct::StructFieldEncoder;
26use crate::encodings::logical::r#struct::StructStructuralEncoder;
27use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
28use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
29use crate::encodings::physical::bitpack_fastlanes::{
30 compute_compressed_bit_width_for_non_neg, InlineBitpacking,
31};
32use crate::encodings::physical::block_compress::{
33 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
34};
35use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
36use crate::encodings::physical::fsst::{
37 FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
38};
39use crate::encodings::physical::packed_struct::PackedStructEncoder;
40use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
41use crate::format::ProtobufUtils;
42use crate::repdef::RepDefBuilder;
43use crate::statistics::{GetStat, Stat};
44use crate::version::LanceFileVersion;
45use crate::{
46 decoder::{ColumnInfo, PageInfo},
47 encodings::{
48 logical::{list::ListFieldEncoder, primitive::PrimitiveFieldEncoder},
49 physical::{
50 basic::BasicEncoder, binary::BinaryEncoder, dictionary::DictionaryEncoder,
51 fixed_size_binary::FixedSizeBinaryEncoder, fixed_size_list::FslEncoder,
52 value::ValueEncoder,
53 },
54 },
55 format::pb,
56};
57use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
58
59use hyperloglogplus::{HyperLogLog, HyperLogLogPlus};
60use std::collections::hash_map::RandomState;
61
62pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
64
65#[derive(Debug)]
72pub struct EncodedArray {
73 pub data: DataBlock,
75 pub encoding: pb::ArrayEncoding,
77}
78
79impl EncodedArray {
80 pub fn new(data: DataBlock, encoding: pb::ArrayEncoding) -> Self {
81 Self { data, encoding }
82 }
83
84 pub fn into_buffers(self) -> (Vec<LanceBuffer>, pb::ArrayEncoding) {
85 let buffers = self.data.into_buffers();
86 (buffers, self.encoding)
87 }
88}
89
90#[derive(Debug)]
96pub struct EncodedPage {
97 pub data: Vec<LanceBuffer>,
99 pub description: PageEncoding,
101 pub num_rows: u64,
103 pub row_number: u64,
110 pub column_idx: u32,
112}
113
114#[derive(Debug)]
115pub struct EncodedBufferMeta {
116 pub bits_per_value: u64,
117
118 pub bitpacking: Option<BitpackingBufferMeta>,
119
120 pub compression_scheme: Option<CompressionScheme>,
121}
122
123#[derive(Debug)]
124pub struct BitpackingBufferMeta {
125 pub bits_per_value: u64,
126
127 pub signed: bool,
128}
129
130pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
136 fn encode(
141 &self,
142 data: DataBlock,
143 data_type: &DataType,
144 buffer_index: &mut u32,
145 ) -> Result<EncodedArray>;
146}
147
148pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;
149pub const MAX_MINIBLOCK_VALUES: u64 = 4096;
150
151#[derive(Debug)]
154pub struct MiniBlockCompressed {
155 pub data: Vec<LanceBuffer>,
157 pub chunks: Vec<MiniBlockChunk>,
159 pub num_values: u64,
161}
162
163#[derive(Debug)]
173pub struct MiniBlockChunk {
174 pub buffer_sizes: Vec<u16>,
178 pub log_num_values: u8,
187}
188
189impl MiniBlockChunk {
190 pub fn num_values(&self, vals_in_prev_blocks: u64, total_num_values: u64) -> u64 {
197 if self.log_num_values == 0 {
198 total_num_values - vals_in_prev_blocks
199 } else {
200 1 << self.log_num_values
201 }
202 }
203}
204
205pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
210 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)>;
217}
218
219#[derive(Debug)]
226pub enum PerValueDataBlock {
227 Fixed(FixedWidthDataBlock),
228 Variable(VariableWidthBlock),
229}
230
231impl PerValueDataBlock {
232 pub fn data_size(&self) -> u64 {
233 match self {
234 Self::Fixed(fixed) => fixed.data_size(),
235 Self::Variable(variable) => variable.data_size(),
236 }
237 }
238}
239
240pub trait PerValueCompressor: std::fmt::Debug + Send + Sync {
249 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)>;
253}
254
255pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
266 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
271}
272
273pub fn values_column_encoding() -> pb::ColumnEncoding {
274 pb::ColumnEncoding {
275 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
276 }
277}
278
279pub struct EncodedColumn {
280 pub column_buffers: Vec<LanceBuffer>,
281 pub encoding: pb::ColumnEncoding,
282 pub final_pages: Vec<EncodedPage>,
283}
284
285impl Default for EncodedColumn {
286 fn default() -> Self {
287 Self {
288 column_buffers: Default::default(),
289 encoding: pb::ColumnEncoding {
290 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
291 },
292 final_pages: Default::default(),
293 }
294 }
295}
296
297pub struct OutOfLineBuffers {
311 position: u64,
312 buffer_alignment: u64,
313 buffers: Vec<LanceBuffer>,
314}
315
316impl OutOfLineBuffers {
317 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
318 Self {
319 position: base_position,
320 buffer_alignment,
321 buffers: Vec::new(),
322 }
323 }
324
325 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
326 let position = self.position;
327 self.position += buffer.len() as u64;
328 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
329 self.buffers.push(buffer);
330 position
331 }
332
333 pub fn take_buffers(self) -> Vec<LanceBuffer> {
334 self.buffers
335 }
336
337 pub fn reset_position(&mut self, position: u64) {
338 self.position = position;
339 }
340}
341
342pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
344
345pub trait FieldEncoder: Send {
356 fn maybe_encode(
376 &mut self,
377 array: ArrayRef,
378 external_buffers: &mut OutOfLineBuffers,
379 repdef: RepDefBuilder,
380 row_number: u64,
381 num_rows: u64,
382 ) -> Result<Vec<EncodeTask>>;
383 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
392 fn finish(
398 &mut self,
399 external_buffers: &mut OutOfLineBuffers,
400 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
401
402 fn num_columns(&self) -> u32;
404}
405
406pub trait ArrayEncodingStrategy: Send + Sync + std::fmt::Debug {
412 fn create_array_encoder(
413 &self,
414 arrays: &[ArrayRef],
415 field: &Field,
416 ) -> Result<Box<dyn ArrayEncoder>>;
417}
418
419pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
432 fn create_block_compressor(
434 &self,
435 field: &Field,
436 data: &DataBlock,
437 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
438
439 fn create_per_value(
441 &self,
442 field: &Field,
443 data: &DataBlock,
444 ) -> Result<Box<dyn PerValueCompressor>>;
445
446 fn create_miniblock_compressor(
448 &self,
449 field: &Field,
450 data: &DataBlock,
451 ) -> Result<Box<dyn MiniBlockCompressor>>;
452}
453
454#[derive(Debug, Default)]
457pub struct CoreArrayEncodingStrategy {
458 pub version: LanceFileVersion,
459}
460
461const BINARY_DATATYPES: [DataType; 4] = [
462 DataType::Binary,
463 DataType::LargeBinary,
464 DataType::Utf8,
465 DataType::LargeUtf8,
466];
467
468impl CoreArrayEncodingStrategy {
469 fn can_use_fsst(data_type: &DataType, data_size: u64, version: LanceFileVersion) -> bool {
470 version >= LanceFileVersion::V2_1
471 && matches!(data_type, DataType::Utf8 | DataType::Binary)
472 && data_size > 4 * 1024 * 1024
473 }
474
475 fn get_field_compression(field_meta: &HashMap<String, String>) -> Option<CompressionConfig> {
476 let compression = field_meta.get(COMPRESSION_META_KEY)?;
477 let compression_scheme = compression.parse::<CompressionScheme>();
478 match compression_scheme {
479 Ok(compression_scheme) => Some(CompressionConfig::new(
480 compression_scheme,
481 field_meta
482 .get(COMPRESSION_LEVEL_META_KEY)
483 .and_then(|level| level.parse().ok()),
484 )),
485 Err(_) => None,
486 }
487 }
488
489 fn default_binary_encoder(
490 arrays: &[ArrayRef],
491 data_type: &DataType,
492 field_meta: Option<&HashMap<String, String>>,
493 data_size: u64,
494 version: LanceFileVersion,
495 ) -> Result<Box<dyn ArrayEncoder>> {
496 let bin_indices_encoder =
497 Self::choose_array_encoder(arrays, &DataType::UInt64, data_size, false, version, None)?;
498
499 if let Some(compression) = field_meta.and_then(Self::get_field_compression) {
500 if compression.scheme == CompressionScheme::Fsst {
501 let raw_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
503 Ok(Box::new(FsstArrayEncoder::new(raw_encoder)))
504 } else {
505 Ok(Box::new(BinaryEncoder::new(
507 bin_indices_encoder,
508 Some(compression),
509 )))
510 }
511 } else {
512 let bin_encoder = Box::new(BinaryEncoder::new(bin_indices_encoder, None));
514 if Self::can_use_fsst(data_type, data_size, version) {
515 Ok(Box::new(FsstArrayEncoder::new(bin_encoder)))
516 } else {
517 Ok(bin_encoder)
518 }
519 }
520 }
521
522 fn choose_array_encoder(
523 arrays: &[ArrayRef],
524 data_type: &DataType,
525 data_size: u64,
526 use_dict_encoding: bool,
527 version: LanceFileVersion,
528 field_meta: Option<&HashMap<String, String>>,
529 ) -> Result<Box<dyn ArrayEncoder>> {
530 match data_type {
531 DataType::FixedSizeList(inner, dimension) => {
532 Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
533 Self::choose_array_encoder(
534 arrays,
535 inner.data_type(),
536 data_size,
537 use_dict_encoding,
538 version,
539 None,
540 )?,
541 *dimension as u32,
542 )))))
543 }
544 DataType::Dictionary(key_type, value_type) => {
545 let key_encoder =
546 Self::choose_array_encoder(arrays, key_type, data_size, false, version, None)?;
547 let value_encoder = Self::choose_array_encoder(
548 arrays, value_type, data_size, false, version, None,
549 )?;
550
551 Ok(Box::new(AlreadyDictionaryEncoder::new(
552 key_encoder,
553 value_encoder,
554 )))
555 }
556 DataType::Utf8 | DataType::LargeUtf8 | DataType::Binary | DataType::LargeBinary => {
557 if use_dict_encoding {
558 let dict_indices_encoder = Self::choose_array_encoder(
559 &[Arc::new(UInt8Array::from_iter_values(0_u8..255_u8))],
564 &DataType::UInt8,
565 data_size,
566 false,
567 version,
568 None,
569 )?;
570 let dict_items_encoder = Self::choose_array_encoder(
571 arrays,
572 &DataType::Utf8,
573 data_size,
574 false,
575 version,
576 None,
577 )?;
578
579 Ok(Box::new(DictionaryEncoder::new(
580 dict_indices_encoder,
581 dict_items_encoder,
582 )))
583 }
584 else if BINARY_DATATYPES.contains(arrays[0].data_type()) {
587 if let Some(byte_width) = check_fixed_size_encoding(arrays, version) {
588 let bytes_encoder = Self::choose_array_encoder(
590 arrays,
591 &DataType::UInt8,
592 data_size,
593 false,
594 version,
595 None,
596 )?;
597
598 Ok(Box::new(BasicEncoder::new(Box::new(
599 FixedSizeBinaryEncoder::new(bytes_encoder, byte_width as usize),
600 ))))
601 } else {
602 Self::default_binary_encoder(
603 arrays, data_type, field_meta, data_size, version,
604 )
605 }
606 } else {
607 Self::default_binary_encoder(arrays, data_type, field_meta, data_size, version)
608 }
609 }
610 DataType::Struct(fields) => {
611 let num_fields = fields.len();
612 let mut inner_encoders = Vec::new();
613
614 for i in 0..num_fields {
615 let inner_datatype = fields[i].data_type();
616 let inner_encoder = Self::choose_array_encoder(
617 arrays,
618 inner_datatype,
619 data_size,
620 use_dict_encoding,
621 version,
622 None,
623 )?;
624 inner_encoders.push(inner_encoder);
625 }
626
627 Ok(Box::new(PackedStructEncoder::new(inner_encoders)))
628 }
629 DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
630 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
631 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
632 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
633 compressed_bit_width as usize,
634 data_type.clone(),
635 )))
636 } else {
637 Ok(Box::new(BasicEncoder::new(Box::new(
638 ValueEncoder::default(),
639 ))))
640 }
641 }
642
643 DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => {
647 if version >= LanceFileVersion::V2_1 && arrays[0].data_type() == data_type {
648 let compressed_bit_width = compute_compressed_bit_width_for_non_neg(arrays);
649 Ok(Box::new(BitpackedForNonNegArrayEncoder::new(
650 compressed_bit_width as usize,
651 data_type.clone(),
652 )))
653 } else {
654 Ok(Box::new(BasicEncoder::new(Box::new(
655 ValueEncoder::default(),
656 ))))
657 }
658 }
659 _ => Ok(Box::new(BasicEncoder::new(Box::new(
660 ValueEncoder::default(),
661 )))),
662 }
663 }
664}
665
666fn get_dict_encoding_threshold() -> u64 {
667 env::var("LANCE_DICT_ENCODING_THRESHOLD")
668 .ok()
669 .and_then(|val| val.parse().ok())
670 .unwrap_or(100)
671}
672
673fn check_dict_encoding(arrays: &[ArrayRef], threshold: u64) -> bool {
681 let num_total_rows = arrays.iter().map(|arr| arr.len()).sum::<usize>();
682 if num_total_rows < threshold as usize {
683 return false;
684 }
685 const PRECISION: u8 = 12;
686
687 let mut hll: HyperLogLogPlus<String, RandomState> =
688 HyperLogLogPlus::new(PRECISION, RandomState::new()).unwrap();
689
690 for arr in arrays {
691 let string_array = arrow_array::cast::as_string_array(arr);
692 for value in string_array.iter().flatten() {
693 hll.insert(value);
694 let estimated_cardinality = hll.count() as u64;
695 if estimated_cardinality >= threshold {
696 return false;
697 }
698 }
699 }
700
701 true
702}
703
704fn check_fixed_size_encoding(arrays: &[ArrayRef], version: LanceFileVersion) -> Option<u64> {
705 if version < LanceFileVersion::V2_1 || arrays.is_empty() {
706 return None;
707 }
708
709 if !arrays.iter().all(|arr| {
711 if let Some(arr) = arr.as_string_opt::<i32>() {
712 arr.iter().flatten().all(|s| !s.is_empty())
713 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
714 arr.iter().flatten().all(|s| !s.is_empty())
715 } else if let Some(arr) = arr.as_string_opt::<i64>() {
716 arr.iter().flatten().all(|s| !s.is_empty())
717 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
718 arr.iter().flatten().all(|s| !s.is_empty())
719 } else {
720 panic!("wrong dtype");
721 }
722 }) {
723 return None;
724 }
725
726 let lengths = arrays
727 .iter()
728 .flat_map(|arr| {
729 if let Some(arr) = arr.as_string_opt::<i32>() {
730 let offsets = arr.offsets().inner();
731 offsets
732 .windows(2)
733 .map(|w| (w[1] - w[0]) as u64)
734 .collect::<Vec<_>>()
735 } else if let Some(arr) = arr.as_binary_opt::<i32>() {
736 let offsets = arr.offsets().inner();
737 offsets
738 .windows(2)
739 .map(|w| (w[1] - w[0]) as u64)
740 .collect::<Vec<_>>()
741 } else if let Some(arr) = arr.as_string_opt::<i64>() {
742 let offsets = arr.offsets().inner();
743 offsets
744 .windows(2)
745 .map(|w| (w[1] - w[0]) as u64)
746 .collect::<Vec<_>>()
747 } else if let Some(arr) = arr.as_binary_opt::<i64>() {
748 let offsets = arr.offsets().inner();
749 offsets
750 .windows(2)
751 .map(|w| (w[1] - w[0]) as u64)
752 .collect::<Vec<_>>()
753 } else {
754 panic!("wrong dtype");
755 }
756 })
757 .collect::<Vec<_>>();
758
759 let first_non_zero = lengths.iter().position(|&x| x != 0);
761 if let Some(first_non_zero) = first_non_zero {
762 if !lengths
764 .iter()
765 .all(|&x| x == 0 || x == lengths[first_non_zero])
766 {
767 return None;
768 }
769
770 Some(lengths[first_non_zero])
772 } else {
773 None
774 }
775}
776
777impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
778 fn create_array_encoder(
779 &self,
780 arrays: &[ArrayRef],
781 field: &Field,
782 ) -> Result<Box<dyn ArrayEncoder>> {
783 let data_size = arrays
784 .iter()
785 .map(|arr| arr.get_buffer_memory_size() as u64)
786 .sum::<u64>();
787 let data_type = arrays[0].data_type();
788
789 let use_dict_encoding = data_type == &DataType::Utf8
790 && check_dict_encoding(arrays, get_dict_encoding_threshold());
791
792 Self::choose_array_encoder(
793 arrays,
794 data_type,
795 data_size,
796 use_dict_encoding,
797 self.version,
798 Some(&field.metadata),
799 )
800 }
801}
802
803impl CompressionStrategy for CoreArrayEncodingStrategy {
804 fn create_miniblock_compressor(
805 &self,
806 field: &Field,
807 data: &DataBlock,
808 ) -> Result<Box<dyn MiniBlockCompressor>> {
809 match data {
810 DataBlock::FixedWidth(fixed_width_data) => {
811 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
812 if compression == "none" {
813 return Ok(Box::new(ValueEncoder::default()));
814 }
815 }
816
817 let bit_widths = data.expect_stat(Stat::BitWidth);
818 let bit_widths = bit_widths.as_primitive::<UInt64Type>();
819 let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
822 let too_small = bit_widths.len() == 1
825 && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
826 if !has_all_zeros
827 && !too_small
828 && (fixed_width_data.bits_per_value == 8
829 || fixed_width_data.bits_per_value == 16
830 || fixed_width_data.bits_per_value == 32
831 || fixed_width_data.bits_per_value == 64)
832 {
833 Ok(Box::new(InlineBitpacking::new(
834 fixed_width_data.bits_per_value,
835 )))
836 } else {
837 Ok(Box::new(ValueEncoder::default()))
838 }
839 }
840 DataBlock::VariableWidth(variable_width_data) => {
841 if variable_width_data.bits_per_offset == 32 {
842 let data_size =
843 variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
844 let max_len =
845 variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
846
847 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
848 && data_size >= FSST_LEAST_INPUT_SIZE as u64
849 {
850 Ok(Box::new(FsstMiniBlockEncoder::default()))
851 } else {
852 Ok(Box::new(BinaryMiniBlockEncoder::default()))
853 }
854 } else {
855 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
856 }
857 }
858 DataBlock::Struct(struct_data_block) => {
859 if struct_data_block
862 .children
863 .iter()
864 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
865 {
866 panic!("packed struct encoding currently only supports fixed-width fields.")
867 }
868 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
869 }
870 DataBlock::FixedSizeList(_) => {
871 Ok(Box::new(ValueEncoder::default()))
879 }
880 _ => Err(Error::NotSupported {
881 source: format!(
882 "Mini-block compression not yet supported for block type {}",
883 data.name()
884 )
885 .into(),
886 location: location!(),
887 }),
888 }
889 }
890
891 fn create_per_value(
892 &self,
893 _field: &Field,
894 data: &DataBlock,
895 ) -> Result<Box<dyn PerValueCompressor>> {
896 match data {
897 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
898 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
899 DataBlock::VariableWidth(variable_width) => {
900 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
901 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
902
903 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
907 return Ok(Box::new(CompressedBufferEncoder::default()));
908 }
909
910 if variable_width.bits_per_offset == 32 {
911 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
912 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
913
914 let variable_compression = Box::new(VariableEncoder::default());
915
916 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
917 && data_size >= FSST_LEAST_INPUT_SIZE as u64
918 {
919 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
920 } else {
921 Ok(variable_compression)
922 }
923 } else {
924 todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
925 }
926 }
927 _ => unreachable!(
928 "Per-value compression not yet supported for block type: {}",
929 data.name()
930 ),
931 }
932 }
933
934 fn create_block_compressor(
935 &self,
936 _field: &Field,
937 data: &DataBlock,
938 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
939 match data {
941 DataBlock::FixedWidth(fixed_width) => {
944 let encoder = Box::new(ValueEncoder::default());
945 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
946 Ok((encoder, encoding))
947 }
948 DataBlock::VariableWidth(variable_width) => {
949 let encoder = Box::new(VariableEncoder::default());
950 let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
951 Ok((encoder, encoding))
952 }
953 _ => unreachable!(),
954 }
955 }
956}
957#[derive(Debug, Default)]
960pub struct ColumnIndexSequence {
961 current_index: u32,
962 mapping: Vec<(u32, u32)>,
963}
964
965impl ColumnIndexSequence {
966 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
967 let idx = self.current_index;
968 self.current_index += 1;
969 self.mapping.push((field_id, idx));
970 idx
971 }
972
973 pub fn skip(&mut self) {
974 self.current_index += 1;
975 }
976}
977
978pub struct EncodingOptions {
980 pub cache_bytes_per_column: u64,
984 pub max_page_bytes: u64,
987 pub keep_original_array: bool,
992 pub buffer_alignment: u64,
997}
998
999impl Default for EncodingOptions {
1000 fn default() -> Self {
1001 Self {
1002 cache_bytes_per_column: 8 * 1024 * 1024,
1003 max_page_bytes: 32 * 1024 * 1024,
1004 keep_original_array: true,
1005 buffer_alignment: 64,
1006 }
1007 }
1008}
1009
1010pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
1016 fn create_field_encoder(
1028 &self,
1029 encoding_strategy_root: &dyn FieldEncodingStrategy,
1030 field: &Field,
1031 column_index: &mut ColumnIndexSequence,
1032 options: &EncodingOptions,
1033 ) -> Result<Box<dyn FieldEncoder>>;
1034}
1035
1036pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
1037 match version.resolve() {
1038 LanceFileVersion::Legacy => panic!(),
1039 LanceFileVersion::V2_0 => Box::new(CoreFieldEncodingStrategy::default()),
1040 _ => Box::new(StructuralEncodingStrategy::default()),
1041 }
1042}
1043
1044#[derive(Debug)]
1047pub struct CoreFieldEncodingStrategy {
1048 pub array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
1049 pub version: LanceFileVersion,
1050}
1051
1052#[allow(clippy::derivable_impls)]
1055impl Default for CoreFieldEncodingStrategy {
1056 fn default() -> Self {
1057 Self {
1058 array_encoding_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1059 version: LanceFileVersion::default(),
1060 }
1061 }
1062}
1063
1064impl CoreFieldEncodingStrategy {
1065 fn is_primitive_type(data_type: &DataType) -> bool {
1066 matches!(
1067 data_type,
1068 DataType::Boolean
1069 | DataType::Date32
1070 | DataType::Date64
1071 | DataType::Decimal128(_, _)
1072 | DataType::Decimal256(_, _)
1073 | DataType::Duration(_)
1074 | DataType::Float16
1075 | DataType::Float32
1076 | DataType::Float64
1077 | DataType::Int16
1078 | DataType::Int32
1079 | DataType::Int64
1080 | DataType::Int8
1081 | DataType::Interval(_)
1082 | DataType::Null
1083 | DataType::Time32(_)
1084 | DataType::Time64(_)
1085 | DataType::Timestamp(_, _)
1086 | DataType::UInt16
1087 | DataType::UInt32
1088 | DataType::UInt64
1089 | DataType::UInt8
1090 | DataType::FixedSizeBinary(_)
1091 | DataType::FixedSizeList(_, _)
1092 | DataType::Binary
1093 | DataType::LargeBinary
1094 | DataType::Utf8
1095 | DataType::LargeUtf8,
1096 )
1097 }
1098}
1099
1100impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
1101 fn create_field_encoder(
1102 &self,
1103 encoding_strategy_root: &dyn FieldEncodingStrategy,
1104 field: &Field,
1105 column_index: &mut ColumnIndexSequence,
1106 options: &EncodingOptions,
1107 ) -> Result<Box<dyn FieldEncoder>> {
1108 let data_type = field.data_type();
1109 if Self::is_primitive_type(&data_type) {
1110 let column_index = column_index.next_column_index(field.id as u32);
1111 if field.metadata.contains_key(BLOB_META_KEY) {
1112 let mut packed_meta = HashMap::new();
1113 packed_meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
1114 let desc_field =
1115 Field::try_from(BLOB_DESC_FIELD.clone().with_metadata(packed_meta)).unwrap();
1116 let desc_encoder = Box::new(PrimitiveFieldEncoder::try_new(
1117 options,
1118 self.array_encoding_strategy.clone(),
1119 column_index,
1120 desc_field,
1121 )?);
1122 Ok(Box::new(BlobFieldEncoder::new(desc_encoder)))
1123 } else {
1124 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1125 options,
1126 self.array_encoding_strategy.clone(),
1127 column_index,
1128 field.clone(),
1129 )?))
1130 }
1131 } else {
1132 match data_type {
1133 DataType::List(_child) | DataType::LargeList(_child) => {
1134 let list_idx = column_index.next_column_index(field.id as u32);
1135 let inner_encoding = encoding_strategy_root.create_field_encoder(
1136 encoding_strategy_root,
1137 &field.children[0],
1138 column_index,
1139 options,
1140 )?;
1141 let offsets_encoder =
1142 Arc::new(BasicEncoder::new(Box::new(ValueEncoder::default())));
1143 Ok(Box::new(ListFieldEncoder::new(
1144 inner_encoding,
1145 offsets_encoder,
1146 options.cache_bytes_per_column,
1147 options.keep_original_array,
1148 list_idx,
1149 )))
1150 }
1151 DataType::Struct(_) => {
1152 let field_metadata = &field.metadata;
1153 if field_metadata
1154 .get(PACKED_STRUCT_LEGACY_META_KEY)
1155 .map(|v| v == "true")
1156 .unwrap_or(field_metadata.contains_key(PACKED_STRUCT_META_KEY))
1157 {
1158 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1159 options,
1160 self.array_encoding_strategy.clone(),
1161 column_index.next_column_index(field.id as u32),
1162 field.clone(),
1163 )?))
1164 } else {
1165 let header_idx = column_index.next_column_index(field.id as u32);
1166 let children_encoders = field
1167 .children
1168 .iter()
1169 .map(|field| {
1170 self.create_field_encoder(
1171 encoding_strategy_root,
1172 field,
1173 column_index,
1174 options,
1175 )
1176 })
1177 .collect::<Result<Vec<_>>>()?;
1178 Ok(Box::new(StructFieldEncoder::new(
1179 children_encoders,
1180 header_idx,
1181 )))
1182 }
1183 }
1184 DataType::Dictionary(_, value_type) => {
1185 if Self::is_primitive_type(&value_type) {
1187 Ok(Box::new(PrimitiveFieldEncoder::try_new(
1188 options,
1189 self.array_encoding_strategy.clone(),
1190 column_index.next_column_index(field.id as u32),
1191 field.clone(),
1192 )?))
1193 } else {
1194 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1200 }
1201 }
1202 _ => todo!("Implement encoding for field {}", field),
1203 }
1204 }
1205 }
1206}
1207
1208#[derive(Debug)]
1210pub struct StructuralEncodingStrategy {
1211 pub compression_strategy: Arc<dyn CompressionStrategy>,
1212 pub version: LanceFileVersion,
1213}
1214
1215#[allow(clippy::derivable_impls)]
1218impl Default for StructuralEncodingStrategy {
1219 fn default() -> Self {
1220 Self {
1221 compression_strategy: Arc::<CoreArrayEncodingStrategy>::default(),
1222 version: LanceFileVersion::default(),
1223 }
1224 }
1225}
1226
1227impl StructuralEncodingStrategy {
1228 fn is_primitive_type(data_type: &DataType) -> bool {
1229 matches!(
1230 data_type,
1231 DataType::Boolean
1232 | DataType::Date32
1233 | DataType::Date64
1234 | DataType::Decimal128(_, _)
1235 | DataType::Decimal256(_, _)
1236 | DataType::Duration(_)
1237 | DataType::Float16
1238 | DataType::Float32
1239 | DataType::Float64
1240 | DataType::Int16
1241 | DataType::Int32
1242 | DataType::Int64
1243 | DataType::Int8
1244 | DataType::Interval(_)
1245 | DataType::Null
1246 | DataType::Time32(_)
1247 | DataType::Time64(_)
1248 | DataType::Timestamp(_, _)
1249 | DataType::UInt16
1250 | DataType::UInt32
1251 | DataType::UInt64
1252 | DataType::UInt8
1253 | DataType::FixedSizeBinary(_)
1254 | DataType::FixedSizeList(_, _)
1255 | DataType::Binary
1256 | DataType::LargeBinary
1257 | DataType::Utf8
1258 | DataType::LargeUtf8,
1259 )
1260 }
1261
1262 fn do_create_field_encoder(
1263 &self,
1264 _encoding_strategy_root: &dyn FieldEncodingStrategy,
1265 field: &Field,
1266 column_index: &mut ColumnIndexSequence,
1267 options: &EncodingOptions,
1268 root_field_metadata: &HashMap<String, String>,
1269 ) -> Result<Box<dyn FieldEncoder>> {
1270 let data_type = field.data_type();
1271 if Self::is_primitive_type(&data_type) {
1272 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1273 options,
1274 self.compression_strategy.clone(),
1275 column_index.next_column_index(field.id as u32),
1276 field.clone(),
1277 Arc::new(root_field_metadata.clone()),
1278 )?))
1279 } else {
1280 match data_type {
1281 DataType::List(_) | DataType::LargeList(_) => {
1282 let child = field.children.first().expect("List should have a child");
1283 let child_encoder = self.do_create_field_encoder(
1284 _encoding_strategy_root,
1285 child,
1286 column_index,
1287 options,
1288 root_field_metadata,
1289 )?;
1290 Ok(Box::new(ListStructuralEncoder::new(
1291 options.keep_original_array,
1292 child_encoder,
1293 )))
1294 }
1295 DataType::Struct(_) => {
1296 if field.is_packed_struct() {
1297 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1298 options,
1299 self.compression_strategy.clone(),
1300 column_index.next_column_index(field.id as u32),
1301 field.clone(),
1302 Arc::new(root_field_metadata.clone()),
1303 )?))
1304 } else {
1305 let children_encoders = field
1306 .children
1307 .iter()
1308 .map(|field| {
1309 self.do_create_field_encoder(
1310 _encoding_strategy_root,
1311 field,
1312 column_index,
1313 options,
1314 root_field_metadata,
1315 )
1316 })
1317 .collect::<Result<Vec<_>>>()?;
1318 Ok(Box::new(StructStructuralEncoder::new(
1319 options.keep_original_array,
1320 children_encoders,
1321 )))
1322 }
1323 }
1324 DataType::Dictionary(_, value_type) => {
1325 if Self::is_primitive_type(&value_type) {
1327 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
1328 options,
1329 self.compression_strategy.clone(),
1330 column_index.next_column_index(field.id as u32),
1331 field.clone(),
1332 Arc::new(root_field_metadata.clone()),
1333 )?))
1334 } else {
1335 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
1341 }
1342 }
1343 _ => todo!("Implement encoding for field {}", field),
1344 }
1345 }
1346 }
1347}
1348
1349impl FieldEncodingStrategy for StructuralEncodingStrategy {
1350 fn create_field_encoder(
1351 &self,
1352 encoding_strategy_root: &dyn FieldEncodingStrategy,
1353 field: &Field,
1354 column_index: &mut ColumnIndexSequence,
1355 options: &EncodingOptions,
1356 ) -> Result<Box<dyn FieldEncoder>> {
1357 self.do_create_field_encoder(
1358 encoding_strategy_root,
1359 field,
1360 column_index,
1361 options,
1362 &field.metadata,
1363 )
1364 }
1365}
1366
1367pub struct BatchEncoder {
1370 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
1371 pub field_id_to_column_index: Vec<(u32, u32)>,
1372}
1373
1374impl BatchEncoder {
1375 pub fn try_new(
1376 schema: &Schema,
1377 strategy: &dyn FieldEncodingStrategy,
1378 options: &EncodingOptions,
1379 ) -> Result<Self> {
1380 let mut col_idx = 0;
1381 let mut col_idx_sequence = ColumnIndexSequence::default();
1382 let field_encoders = schema
1383 .fields
1384 .iter()
1385 .map(|field| {
1386 let encoder = strategy.create_field_encoder(
1387 strategy,
1388 field,
1389 &mut col_idx_sequence,
1390 options,
1391 )?;
1392 col_idx += encoder.as_ref().num_columns();
1393 Ok(encoder)
1394 })
1395 .collect::<Result<Vec<_>>>()?;
1396 Ok(Self {
1397 field_encoders,
1398 field_id_to_column_index: col_idx_sequence.mapping,
1399 })
1400 }
1401
1402 pub fn num_columns(&self) -> u32 {
1403 self.field_encoders
1404 .iter()
1405 .map(|field_encoder| field_encoder.num_columns())
1406 .sum::<u32>()
1407 }
1408}
1409
1410#[derive(Debug)]
1414pub struct EncodedBatch {
1415 pub data: Bytes,
1416 pub page_table: Vec<Arc<ColumnInfo>>,
1417 pub schema: Arc<Schema>,
1418 pub top_level_columns: Vec<u32>,
1419 pub num_rows: u64,
1420}
1421
1422fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
1423 let buffers = page.data;
1424 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
1425 for buffer in buffers {
1426 let buffer_offset = data_buffer.len() as u64;
1427 data_buffer.extend_from_slice(&buffer);
1428 let size = data_buffer.len() as u64 - buffer_offset;
1429 buffer_offsets_and_sizes.push((buffer_offset, size));
1430 }
1431
1432 PageInfo {
1433 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
1434 encoding: page.description,
1435 num_rows: page.num_rows,
1436 priority: page.row_number,
1437 }
1438}
1439
1440pub async fn encode_batch(
1445 batch: &RecordBatch,
1446 schema: Arc<Schema>,
1447 encoding_strategy: &dyn FieldEncodingStrategy,
1448 options: &EncodingOptions,
1449) -> Result<EncodedBatch> {
1450 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
1451 {
1452 return Err(Error::InvalidInput {
1453 source: format!(
1454 "buffer_alignment must be a power of two and at least {}",
1455 MIN_PAGE_BUFFER_ALIGNMENT
1456 )
1457 .into(),
1458 location: location!(),
1459 });
1460 }
1461
1462 let mut data_buffer = BytesMut::new();
1463 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
1464 let options = EncodingOptions {
1465 keep_original_array: true,
1466 ..*options
1467 };
1468 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
1469 let mut page_table = Vec::new();
1470 let mut col_idx_offset = 0;
1471 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
1472 let mut external_buffers =
1473 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1474 let repdef = RepDefBuilder::default();
1475 let encoder = encoder.as_mut();
1476 let num_rows = arr.len() as u64;
1477 let mut tasks =
1478 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
1479 tasks.extend(encoder.flush(&mut external_buffers)?);
1480 for buffer in external_buffers.take_buffers() {
1481 data_buffer.extend_from_slice(&buffer);
1482 }
1483 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
1484 for task in tasks {
1485 let encoded_page = task.await?;
1486 pages
1488 .entry(encoded_page.column_idx)
1489 .or_default()
1490 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
1491 }
1492 let mut external_buffers =
1493 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
1494 let encoded_columns = encoder.finish(&mut external_buffers).await?;
1495 for buffer in external_buffers.take_buffers() {
1496 data_buffer.extend_from_slice(&buffer);
1497 }
1498 let num_columns = encoded_columns.len();
1499 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
1500 let col_idx = col_idx + col_idx_offset;
1501 let mut col_buffer_offsets_and_sizes = Vec::new();
1502 for buffer in encoded_column.column_buffers {
1503 let buffer_offset = data_buffer.len() as u64;
1504 data_buffer.extend_from_slice(&buffer);
1505 let size = data_buffer.len() as u64 - buffer_offset;
1506 col_buffer_offsets_and_sizes.push((buffer_offset, size));
1507 }
1508 for page in encoded_column.final_pages {
1509 pages
1510 .entry(page.column_idx)
1511 .or_default()
1512 .push(write_page_to_data_buffer(page, &mut data_buffer));
1513 }
1514 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
1515 page_table.push(Arc::new(ColumnInfo {
1516 index: col_idx as u32,
1517 buffer_offsets_and_sizes: Arc::from(
1518 col_buffer_offsets_and_sizes.into_boxed_slice(),
1519 ),
1520 page_infos: Arc::from(col_pages.into_boxed_slice()),
1521 encoding: encoded_column.encoding,
1522 }))
1523 }
1524 col_idx_offset += num_columns;
1525 }
1526 let top_level_columns = batch_encoder
1527 .field_id_to_column_index
1528 .iter()
1529 .map(|(_, idx)| *idx)
1530 .collect();
1531 Ok(EncodedBatch {
1532 data: data_buffer.freeze(),
1533 top_level_columns,
1534 page_table,
1535 schema,
1536 num_rows: batch.num_rows() as u64,
1537 })
1538}
1539
1540#[cfg(test)]
1541pub mod tests {
1542 use crate::version::LanceFileVersion;
1543 use arrow_array::{ArrayRef, StringArray};
1544 use arrow_schema::Field;
1545 use lance_core::datatypes::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY};
1546 use std::collections::HashMap;
1547 use std::sync::Arc;
1548
1549 use super::check_fixed_size_encoding;
1550 use super::{check_dict_encoding, ArrayEncodingStrategy, CoreArrayEncodingStrategy};
1551
1552 fn is_dict_encoding_applicable(arr: Vec<Option<&str>>, threshold: u64) -> bool {
1553 let arr = StringArray::from(arr);
1554 let arr = Arc::new(arr) as ArrayRef;
1555 check_dict_encoding(&[arr], threshold)
1556 }
1557
1558 #[test]
1559 fn test_dict_encoding_should_be_applied_if_cardinality_less_than_threshold() {
1560 assert!(is_dict_encoding_applicable(
1561 vec![Some("a"), Some("b"), Some("a"), Some("b")],
1562 3,
1563 ));
1564 }
1565
1566 #[test]
1567 fn test_dict_encoding_should_not_be_applied_if_cardinality_larger_than_threshold() {
1568 assert!(!is_dict_encoding_applicable(
1569 vec![Some("a"), Some("b"), Some("c"), Some("d")],
1570 3,
1571 ));
1572 }
1573
1574 #[test]
1575 fn test_dict_encoding_should_not_be_applied_if_cardinality_equal_to_threshold() {
1576 assert!(!is_dict_encoding_applicable(
1577 vec![Some("a"), Some("b"), Some("c"), Some("a")],
1578 3,
1579 ));
1580 }
1581
1582 #[test]
1583 fn test_dict_encoding_should_not_be_applied_for_empty_arrays() {
1584 assert!(!is_dict_encoding_applicable(vec![], 3));
1585 }
1586
1587 #[test]
1588 fn test_dict_encoding_should_not_be_applied_for_smaller_than_threshold_arrays() {
1589 assert!(!is_dict_encoding_applicable(vec![Some("a"), Some("a")], 3));
1590 }
1591
1592 fn is_fixed_size_encoding_applicable(
1593 arrays: Vec<Vec<Option<&str>>>,
1594 version: LanceFileVersion,
1595 ) -> bool {
1596 let mut final_arrays = Vec::new();
1597 for arr in arrays {
1598 let arr = StringArray::from(arr);
1599 let arr = Arc::new(arr) as ArrayRef;
1600 final_arrays.push(arr);
1601 }
1602
1603 check_fixed_size_encoding(&final_arrays.clone(), version).is_some()
1604 }
1605
1606 #[test]
1607 fn test_fixed_size_binary_encoding_applicable() {
1608 assert!(!is_fixed_size_encoding_applicable(
1609 vec![vec![]],
1610 LanceFileVersion::V2_1
1611 ));
1612
1613 assert!(is_fixed_size_encoding_applicable(
1614 vec![vec![Some("a"), Some("b")]],
1615 LanceFileVersion::V2_1
1616 ));
1617
1618 assert!(!is_fixed_size_encoding_applicable(
1619 vec![vec![Some("abc"), Some("de")]],
1620 LanceFileVersion::V2_1
1621 ));
1622
1623 assert!(is_fixed_size_encoding_applicable(
1624 vec![vec![Some("pqr"), None]],
1625 LanceFileVersion::V2_1
1626 ));
1627
1628 assert!(!is_fixed_size_encoding_applicable(
1629 vec![vec![Some("pqr"), Some("")]],
1630 LanceFileVersion::V2_1
1631 ));
1632
1633 assert!(!is_fixed_size_encoding_applicable(
1634 vec![vec![Some(""), Some("")]],
1635 LanceFileVersion::V2_1
1636 ));
1637 }
1638
1639 #[test]
1640 fn test_fixed_size_binary_encoding_applicable_multiple_arrays() {
1641 assert!(is_fixed_size_encoding_applicable(
1642 vec![vec![Some("a"), Some("b")], vec![Some("c"), Some("d")]],
1643 LanceFileVersion::V2_1
1644 ));
1645
1646 assert!(!is_fixed_size_encoding_applicable(
1647 vec![vec![Some("ab"), Some("bc")], vec![Some("c"), Some("d")]],
1648 LanceFileVersion::V2_1
1649 ));
1650
1651 assert!(!is_fixed_size_encoding_applicable(
1652 vec![vec![Some("ab"), None], vec![None, Some("d")]],
1653 LanceFileVersion::V2_1
1654 ));
1655
1656 assert!(is_fixed_size_encoding_applicable(
1657 vec![vec![Some("a"), None], vec![None, Some("d")]],
1658 LanceFileVersion::V2_1
1659 ));
1660
1661 assert!(!is_fixed_size_encoding_applicable(
1662 vec![vec![Some(""), None], vec![None, Some("")]],
1663 LanceFileVersion::V2_1
1664 ));
1665
1666 assert!(!is_fixed_size_encoding_applicable(
1667 vec![vec![None, None], vec![None, None]],
1668 LanceFileVersion::V2_1
1669 ));
1670 }
1671
1672 fn verify_array_encoder(
1673 array: ArrayRef,
1674 field_meta: Option<HashMap<String, String>>,
1675 version: LanceFileVersion,
1676 expected_encoder: &str,
1677 ) {
1678 let encoding_strategy = CoreArrayEncodingStrategy { version };
1679 let mut field = Field::new("test_field", array.data_type().clone(), true);
1680 if let Some(field_meta) = field_meta {
1681 field.set_metadata(field_meta);
1682 }
1683 let lance_field = lance_core::datatypes::Field::try_from(field).unwrap();
1684 let encoder_result = encoding_strategy.create_array_encoder(&[array], &lance_field);
1685 assert!(encoder_result.is_ok());
1686 let encoder = encoder_result.unwrap();
1687 assert_eq!(format!("{:?}", encoder).as_str(), expected_encoder);
1688 }
1689
1690 #[test]
1691 fn test_choose_encoder_for_zstd_compressed_string_field() {
1692 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1693 Some(HashMap::from([(COMPRESSION_META_KEY.to_string(), "zstd".to_string())])),
1694 LanceFileVersion::V2_1,
1695 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: None }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 0 }) }");
1696 }
1697
1698 #[test]
1699 fn test_choose_encoder_for_zstd_compression_level() {
1700 verify_array_encoder(Arc::new(StringArray::from(vec!["a", "bb", "ccc"])),
1701 Some(HashMap::from([
1702 (COMPRESSION_META_KEY.to_string(), "zstd".to_string()),
1703 (COMPRESSION_LEVEL_META_KEY.to_string(), "22".to_string())
1704 ])),
1705 LanceFileVersion::V2_1,
1706 "BinaryEncoder { indices_encoder: BasicEncoder { values_encoder: ValueEncoder }, compression_config: Some(CompressionConfig { scheme: Zstd, level: Some(22) }), buffer_compressor: Some(ZstdBufferCompressor { compression_level: 22 }) }");
1707 }
1708}