1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleDecompressor, RleEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 ProtobufUtils21,
60 pb21::{CompressiveEncoding, compressive_encoding::Compression},
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115 fn create_block_compressor(
117 &self,
118 field: &Field,
119 data: &DataBlock,
120 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122 fn create_per_value(
124 &self,
125 field: &Field,
126 data: &DataBlock,
127 ) -> Result<Box<dyn PerValueCompressor>>;
128
129 fn create_miniblock_compressor(
131 &self,
132 field: &Field,
133 data: &DataBlock,
134 ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139 params: CompressionParams,
141 version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146 data: &FixedWidthDataBlock,
147 params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152 return None;
153 }
154
155 let mode = params.bss.unwrap_or(BssMode::Auto);
156 if should_use_bss(data, mode) {
158 return Some(Box::new(ByteStreamSplitEncoder::new(
159 data.bits_per_value as usize,
160 )));
161 }
162 None
163}
164
165fn try_rle_for_mini_block(
166 data: &FixedWidthDataBlock,
167 params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169 let bits = data.bits_per_value;
170 if !matches!(bits, 8 | 16 | 32 | 64) {
171 return None;
172 }
173
174 let type_size = bits / 8;
175 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176 let threshold = params
177 .rle_threshold
178 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180 let passes_threshold = match params.rle_threshold {
183 Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184 None => true,
185 };
186
187 if !passes_threshold {
188 return None;
189 }
190
191 let num_values = data.num_values;
197 let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199 let raw_bytes = (num_values as u128) * (type_size as u128);
200 let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202 if rle_bytes < raw_bytes {
203 #[cfg(feature = "bitpacking")]
204 {
205 if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206 && (bitpack_bytes as u128) < rle_bytes
207 {
208 return None;
209 }
210 }
211 return Some(Box::new(RleEncoder::new()));
212 }
213 None
214}
215
216fn try_rle_for_block(
217 data: &FixedWidthDataBlock,
218 version: LanceFileVersion,
219 params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221 if version < LanceFileVersion::V2_2 {
222 return None;
223 }
224
225 let bits = data.bits_per_value;
226 if !matches!(bits, 8 | 16 | 32 | 64) {
227 return None;
228 }
229
230 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231 let threshold = params
232 .rle_threshold
233 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235 if (run_count as f64) < (data.num_values as f64) * threshold {
236 let compressor = Box::new(RleEncoder::new());
237 let encoding = ProtobufUtils21::rle(
238 ProtobufUtils21::flat(bits, None),
239 ProtobufUtils21::flat(8, None),
240 );
241 return Some((compressor, encoding));
242 }
243 None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247 #[cfg(feature = "bitpacking")]
248 {
249 let bits = _data.bits_per_value;
250 if estimate_inline_bitpacking_bytes(_data).is_some() {
251 return Some(Box::new(InlineBitpacking::new(bits)));
252 }
253 None
254 }
255 #[cfg(not(feature = "bitpacking"))]
256 {
257 None
258 }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263 use arrow_array::cast::AsArray;
264
265 let bits = data.bits_per_value;
266 if !matches!(bits, 8 | 16 | 32 | 64) {
267 return None;
268 }
269 if data.num_values == 0 {
270 return None;
271 }
272
273 let bit_widths = data.expect_stat(Stat::BitWidth);
274 let widths = bit_widths.as_primitive::<UInt64Type>();
275
276 let words_per_chunk: u128 = 1;
277 let word_bytes: u128 = (bits / 8) as u128;
278 let mut total_words: u128 = 0;
279 for i in 0..widths.len() {
280 let bit_width = widths.value(i) as u128;
281 let packed_words = (1024u128 * bit_width) / (bits as u128);
282 total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283 }
284
285 let estimated_bytes = total_words.saturating_mul(word_bytes);
286 let raw_bytes = data.data_size() as u128;
287
288 if estimated_bytes >= raw_bytes {
289 return None;
290 }
291
292 u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296 data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298 let bits = data.bits_per_value;
299 if !matches!(bits, 8 | 16 | 32 | 64) {
300 return None;
301 }
302
303 let bit_widths = data.expect_stat(Stat::BitWidth);
304 let widths = bit_widths.as_primitive::<UInt64Type>();
305 let has_all_zeros = widths.values().contains(&0);
306 let max_bit_width = *widths.values().iter().max().unwrap();
307
308 let too_small =
309 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
310
311 if has_all_zeros || too_small {
312 return None;
313 }
314
315 if data.num_values <= 1024 {
316 let compressor = Box::new(InlineBitpacking::new(bits));
317 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
318 Some((compressor, encoding))
319 } else {
320 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
321 let encoding = ProtobufUtils21::out_of_line_bitpacking(
322 bits,
323 ProtobufUtils21::flat(max_bit_width, None),
324 );
325 Some((compressor, encoding))
326 }
327}
328
329fn maybe_wrap_general_for_mini_block(
330 inner: Box<dyn MiniBlockCompressor>,
331 params: &CompressionFieldParams,
332) -> Result<Box<dyn MiniBlockCompressor>> {
333 match params.compression.as_deref() {
334 None | Some("none") | Some("fsst") => Ok(inner),
335 Some(raw) => {
336 let scheme = CompressionScheme::from_str(raw)
337 .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
338 let cfg = CompressionConfig::new(scheme, params.compression_level);
339 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
340 }
341 }
342}
343
344fn try_general_compression(
345 version: LanceFileVersion,
346 field_params: &CompressionFieldParams,
347 data: &DataBlock,
348) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
349 if field_params.compression.as_deref() == Some("none") {
351 return Ok(None);
352 }
353
354 if let Some(compression_scheme) = &field_params.compression
357 && version >= LanceFileVersion::V2_2
358 {
359 let scheme: CompressionScheme = compression_scheme.parse()?;
360 let config = CompressionConfig::new(scheme, field_params.compression_level);
361 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
362 return Ok(Some((compressor, config)));
363 }
364
365 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
367 && version >= LanceFileVersion::V2_2
368 {
369 let compressor = Box::new(CompressedBufferEncoder::default());
370 let config = compressor.compressor.config();
371 return Ok(Some((compressor, config)));
372 }
373
374 Ok(None)
375}
376
377impl DefaultCompressionStrategy {
378 pub fn new() -> Self {
380 Self::default()
381 }
382
383 pub fn with_params(params: CompressionParams) -> Self {
385 Self {
386 params,
387 version: LanceFileVersion::default(),
388 }
389 }
390
391 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
393 self.version = version;
394 self
395 }
396
397 fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
399 let mut params = CompressionFieldParams::default();
400
401 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
403 params.compression = Some(compression.clone());
404 }
405
406 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
408 params.compression_level = level.parse().ok();
409 }
410
411 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
413 params.rle_threshold = threshold.parse().ok();
414 }
415
416 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
418 match BssMode::parse(bss_str) {
419 Some(mode) => params.bss = Some(mode),
420 None => {
421 log::warn!("Invalid BSS mode '{}', using default", bss_str);
422 }
423 }
424 }
425
426 if let Some(minichunk_size_str) = field
428 .metadata
429 .get(super::constants::MINICHUNK_SIZE_META_KEY)
430 {
431 if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
432 if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
434 log::warn!(
435 "minichunk_size '{}' too large for version '{}', using default",
436 minichunk_size,
437 version
438 );
439 } else {
440 params.minichunk_size = Some(minichunk_size);
441 }
442 } else {
443 log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
444 }
445 }
446
447 params
448 }
449
450 fn build_fixed_width_compressor(
451 &self,
452 params: &CompressionFieldParams,
453 data: &FixedWidthDataBlock,
454 ) -> Result<Box<dyn MiniBlockCompressor>> {
455 if params.compression.as_deref() == Some("none") {
456 return Ok(Box::new(ValueEncoder::default()));
457 }
458
459 let base = try_bss_for_mini_block(data, params)
460 .or_else(|| try_rle_for_mini_block(data, params))
461 .or_else(|| try_bitpack_for_mini_block(data))
462 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
463
464 maybe_wrap_general_for_mini_block(base, params)
465 }
466
467 fn build_variable_width_compressor(
469 &self,
470 field: &Field,
471 data: &VariableWidthBlock,
472 ) -> Result<Box<dyn MiniBlockCompressor>> {
473 let params = self.get_merged_field_params(field);
474 let compression = params.compression.as_deref();
475 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
476 return Err(Error::invalid_input(format!(
477 "Variable width compression not supported for {} bit offsets",
478 data.bits_per_offset
479 )));
480 }
481
482 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
484 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
485
486 if compression == Some("none") {
488 return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
489 }
490
491 let use_fsst = compression == Some("fsst")
492 || (!matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
493 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
494 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
495
496 let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
498 Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
499 } else {
500 Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
501 };
502
503 if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
505 let scheme: CompressionScheme = compression_scheme.parse()?;
506 let config = CompressionConfig::new(scheme, params.compression_level);
507 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
508 }
509
510 Ok(base_encoder)
511 }
512
513 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
516 let mut field_params = self
517 .params
518 .get_field_params(&field.name, &field.data_type());
519
520 let metadata_params = Self::parse_field_metadata(field, &self.version);
522 field_params.merge(&metadata_params);
523
524 field_params
525 }
526}
527
528impl CompressionStrategy for DefaultCompressionStrategy {
529 fn create_miniblock_compressor(
530 &self,
531 field: &Field,
532 data: &DataBlock,
533 ) -> Result<Box<dyn MiniBlockCompressor>> {
534 match data {
535 DataBlock::FixedWidth(fixed_width_data) => {
536 let field_params = self.get_merged_field_params(field);
537 self.build_fixed_width_compressor(&field_params, fixed_width_data)
538 }
539 DataBlock::VariableWidth(variable_width_data) => {
540 self.build_variable_width_compressor(field, variable_width_data)
541 }
542 DataBlock::Struct(struct_data_block) => {
543 if struct_data_block.has_variable_width_child() {
546 return Err(Error::invalid_input(
547 "Packed struct mini-block encoding supports only fixed-width children",
548 ));
549 }
550 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
551 }
552 DataBlock::FixedSizeList(_) => {
553 Ok(Box::new(ValueEncoder::default()))
561 }
562 _ => Err(Error::not_supported_source(
563 format!(
564 "Mini-block compression not yet supported for block type {}",
565 data.name()
566 )
567 .into(),
568 )),
569 }
570 }
571
572 fn create_per_value(
573 &self,
574 field: &Field,
575 data: &DataBlock,
576 ) -> Result<Box<dyn PerValueCompressor>> {
577 let field_params = self.get_merged_field_params(field);
578
579 match data {
580 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
581 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
582 DataBlock::Struct(struct_block) => {
583 if field.children.len() != struct_block.children.len() {
584 return Err(Error::invalid_input(
585 "Struct field metadata does not match data block children",
586 ));
587 }
588 let has_variable_child = struct_block.has_variable_width_child();
589 if has_variable_child {
590 if self.version < LanceFileVersion::V2_2 {
591 return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
592 }
593 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
594 self.clone(),
595 field.children.clone(),
596 )))
597 } else {
598 Err(Error::invalid_input(
599 "Packed struct per-value compression should not be used for fixed-width-only structs",
600 ))
601 }
602 }
603 DataBlock::VariableWidth(variable_width) => {
604 let compression = field_params.compression.as_deref();
605 if compression == Some("none") {
607 return Ok(Box::new(VariableEncoder::default()));
608 }
609
610 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
611 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
612
613 let per_value_requested =
618 compression.is_some_and(|compression| compression != "fsst");
619
620 if (max_len > 32 * 1024 || per_value_requested)
621 && data_size >= FSST_LEAST_INPUT_SIZE as u64
622 {
623 return Ok(Box::new(CompressedBufferEncoder::default()));
624 }
625
626 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
627 let variable_compression = Box::new(VariableEncoder::default());
628 let use_fsst = compression == Some("fsst")
629 || (!matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
630 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
631 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
632
633 if use_fsst {
635 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
636 } else {
637 Ok(variable_compression)
638 }
639 } else {
640 panic!(
641 "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
642 variable_width.bits_per_offset
643 );
644 }
645 }
646 _ => unreachable!(
647 "Per-value compression not yet supported for block type: {}",
648 data.name()
649 ),
650 }
651 }
652
653 fn create_block_compressor(
654 &self,
655 field: &Field,
656 data: &DataBlock,
657 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
658 let field_params = self.get_merged_field_params(field);
659
660 match data {
661 DataBlock::FixedWidth(fixed_width) => {
662 if let Some((compressor, encoding)) =
663 try_rle_for_block(fixed_width, self.version, &field_params)
664 {
665 return Ok((compressor, encoding));
666 }
667 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
668 return Ok((compressor, encoding));
669 }
670
671 if let Some((compressor, config)) =
673 try_general_compression(self.version, &field_params, data)?
674 {
675 let encoding = ProtobufUtils21::wrapped(
676 config,
677 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
678 )?;
679 return Ok((compressor, encoding));
680 }
681
682 let encoder = Box::new(ValueEncoder::default());
683 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
684 Ok((encoder, encoding))
685 }
686 DataBlock::VariableWidth(variable_width) => {
687 if let Some((compressor, config)) =
689 try_general_compression(self.version, &field_params, data)?
690 {
691 let encoding = ProtobufUtils21::wrapped(
692 config,
693 ProtobufUtils21::variable(
694 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
695 None,
696 ),
697 )?;
698 return Ok((compressor, encoding));
699 }
700
701 let encoder = Box::new(VariableEncoder::default());
702 let encoding = ProtobufUtils21::variable(
703 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
704 None,
705 );
706 Ok((encoder, encoding))
707 }
708 _ => unreachable!(),
709 }
710 }
711}
712
713pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
714 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
715}
716
717pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
718 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
720 fn bits_per_value(&self) -> u64;
724}
725
726pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
727 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
729}
730
731pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
732 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
733}
734
735pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
736 fn create_miniblock_decompressor(
737 &self,
738 description: &CompressiveEncoding,
739 decompression_strategy: &dyn DecompressionStrategy,
740 ) -> Result<Box<dyn MiniBlockDecompressor>>;
741
742 fn create_fixed_per_value_decompressor(
743 &self,
744 description: &CompressiveEncoding,
745 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
746
747 fn create_variable_per_value_decompressor(
748 &self,
749 description: &CompressiveEncoding,
750 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
751
752 fn create_block_decompressor(
753 &self,
754 description: &CompressiveEncoding,
755 ) -> Result<Box<dyn BlockDecompressor>>;
756}
757
758#[derive(Debug, Default)]
759pub struct DefaultDecompressionStrategy {}
760
761impl DecompressionStrategy for DefaultDecompressionStrategy {
762 fn create_miniblock_decompressor(
763 &self,
764 description: &CompressiveEncoding,
765 decompression_strategy: &dyn DecompressionStrategy,
766 ) -> Result<Box<dyn MiniBlockDecompressor>> {
767 match description.compression.as_ref().unwrap() {
768 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
769 #[cfg(feature = "bitpacking")]
770 Compression::InlineBitpacking(description) => {
771 Ok(Box::new(InlineBitpacking::from_description(description)))
772 }
773 #[cfg(not(feature = "bitpacking"))]
774 Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
775 "this runtime was not built with bitpacking support".into(),
776 )),
777 Compression::Variable(variable) => {
778 let Compression::Flat(offsets) = variable
779 .offsets
780 .as_ref()
781 .unwrap()
782 .compression
783 .as_ref()
784 .unwrap()
785 else {
786 panic!("Variable compression only supports flat offsets")
787 };
788 Ok(Box::new(BinaryMiniBlockDecompressor::new(
789 offsets.bits_per_value as u8,
790 )))
791 }
792 Compression::Fsst(description) => {
793 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
794 description.values.as_ref().unwrap(),
795 decompression_strategy,
796 )?;
797 Ok(Box::new(FsstMiniBlockDecompressor::new(
798 description,
799 inner_decompressor,
800 )))
801 }
802 Compression::PackedStruct(description) => Ok(Box::new(
803 PackedStructFixedWidthMiniBlockDecompressor::new(description),
804 )),
805 Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
806 "variable packed struct decoding is not yet implemented".into(),
807 )),
808 Compression::FixedSizeList(fsl) => {
809 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
812 }
813 Compression::Rle(rle) => {
814 let bits_per_value = validate_rle_compression(rle)?;
815 Ok(Box::new(RleDecompressor::new(bits_per_value)))
816 }
817 Compression::ByteStreamSplit(bss) => {
818 let Compression::Flat(values) =
819 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
820 else {
821 panic!("ByteStreamSplit compression only supports flat values")
822 };
823 Ok(Box::new(ByteStreamSplitDecompressor::new(
824 values.bits_per_value as usize,
825 )))
826 }
827 Compression::General(general) => {
828 let inner_decompressor = self.create_miniblock_decompressor(
830 general.values.as_ref().ok_or_else(|| {
831 Error::invalid_input("GeneralMiniBlock missing inner encoding")
832 })?,
833 decompression_strategy,
834 )?;
835
836 let compression = general.compression.as_ref().ok_or_else(|| {
838 Error::invalid_input("GeneralMiniBlock missing compression config")
839 })?;
840
841 let scheme = compression.scheme().try_into()?;
842
843 let compression_config = CompressionConfig::new(scheme, compression.level);
844
845 Ok(Box::new(GeneralMiniBlockDecompressor::new(
846 inner_decompressor,
847 compression_config,
848 )))
849 }
850 _ => todo!(),
851 }
852 }
853
854 fn create_fixed_per_value_decompressor(
855 &self,
856 description: &CompressiveEncoding,
857 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
858 match description.compression.as_ref().unwrap() {
859 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
860 constant
861 .value
862 .as_ref()
863 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
864 ))),
865 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
866 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
867 _ => todo!("fixed-per-value decompressor for {:?}", description),
868 }
869 }
870
871 fn create_variable_per_value_decompressor(
872 &self,
873 description: &CompressiveEncoding,
874 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
875 match description.compression.as_ref().unwrap() {
876 Compression::Variable(variable) => {
877 let Compression::Flat(offsets) = variable
878 .offsets
879 .as_ref()
880 .unwrap()
881 .compression
882 .as_ref()
883 .unwrap()
884 else {
885 panic!("Variable compression only supports flat offsets")
886 };
887 assert!(offsets.bits_per_value < u8::MAX as u64);
888 Ok(Box::new(VariableDecoder::default()))
889 }
890 Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
891 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
892 Box::new(VariableDecoder::default()),
893 ))),
894 Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
895 general.compression.as_ref().expect_ok()?.scheme(),
896 )?)),
897 Compression::VariablePackedStruct(description) => {
898 let mut fields = Vec::with_capacity(description.fields.len());
899 for field in &description.fields {
900 let value_encoding = field.value.as_ref().ok_or_else(|| {
901 Error::invalid_input("VariablePackedStruct field is missing value encoding")
902 })?;
903 let decoder = match field.layout.as_ref().ok_or_else(|| {
904 Error::invalid_input("VariablePackedStruct field is missing layout details")
905 })? {
906 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
907 bits_per_value,
908 ) => {
909 let decompressor =
910 self.create_fixed_per_value_decompressor(value_encoding)?;
911 VariablePackedStructFieldDecoder {
912 kind: VariablePackedStructFieldKind::Fixed {
913 bits_per_value: *bits_per_value,
914 decompressor: Arc::from(decompressor),
915 },
916 }
917 }
918 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
919 bits_per_length,
920 ) => {
921 let decompressor =
922 self.create_variable_per_value_decompressor(value_encoding)?;
923 VariablePackedStructFieldDecoder {
924 kind: VariablePackedStructFieldKind::Variable {
925 bits_per_length: *bits_per_length,
926 decompressor: Arc::from(decompressor),
927 },
928 }
929 }
930 };
931 fields.push(decoder);
932 }
933 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
934 fields,
935 )))
936 }
937 _ => todo!("variable-per-value decompressor for {:?}", description),
938 }
939 }
940
941 fn create_block_decompressor(
942 &self,
943 description: &CompressiveEncoding,
944 ) -> Result<Box<dyn BlockDecompressor>> {
945 match description.compression.as_ref().unwrap() {
946 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
947 InlineBitpacking::from_description(inline_bitpacking),
948 )),
949 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
950 Compression::Constant(constant) => {
951 let scalar = constant
952 .value
953 .as_ref()
954 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
955 Ok(Box::new(ConstantDecompressor::new(scalar)))
956 }
957 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
958 Compression::FixedSizeList(fsl) => {
959 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
960 }
961 Compression::OutOfLineBitpacking(out_of_line) => {
962 let compressed_bit_width = match out_of_line
964 .values
965 .as_ref()
966 .unwrap()
967 .compression
968 .as_ref()
969 .unwrap()
970 {
971 Compression::Flat(flat) => flat.bits_per_value,
972 _ => {
973 return Err(Error::invalid_input_source(
974 "OutOfLineBitpacking values must use Flat encoding".into(),
975 ));
976 }
977 };
978 Ok(Box::new(OutOfLineBitpacking::new(
979 compressed_bit_width,
980 out_of_line.uncompressed_bits_per_value,
981 )))
982 }
983 Compression::General(general) => {
984 let inner_desc = general
985 .values
986 .as_ref()
987 .ok_or_else(|| {
988 Error::invalid_input("General compression missing inner encoding")
989 })?
990 .as_ref();
991 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
992
993 let compression = general.compression.as_ref().ok_or_else(|| {
994 Error::invalid_input("General compression missing compression config")
995 })?;
996 let scheme = compression.scheme().try_into()?;
997 let config = CompressionConfig::new(scheme, compression.level);
998 let general_decompressor =
999 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1000
1001 Ok(Box::new(general_decompressor))
1002 }
1003 Compression::Rle(rle) => {
1004 let bits_per_value = validate_rle_compression(rle)?;
1005 Ok(Box::new(RleDecompressor::new(bits_per_value)))
1006 }
1007 _ => todo!(),
1008 }
1009 }
1010}
1011fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1013 let values = rle
1014 .values
1015 .as_ref()
1016 .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1017 let run_lengths = rle
1018 .run_lengths
1019 .as_ref()
1020 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1021
1022 let values = values
1023 .compression
1024 .as_ref()
1025 .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1026 let Compression::Flat(values) = values else {
1027 return Err(Error::invalid_input(
1028 "RLE compression only supports flat values",
1029 ));
1030 };
1031
1032 let run_lengths = run_lengths
1033 .compression
1034 .as_ref()
1035 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1036 let Compression::Flat(run_lengths) = run_lengths else {
1037 return Err(Error::invalid_input(
1038 "RLE compression only supports flat run lengths",
1039 ));
1040 };
1041
1042 if run_lengths.bits_per_value != 8 {
1043 return Err(Error::invalid_input(format!(
1044 "RLE compression only supports 8-bit run lengths, got {}",
1045 run_lengths.bits_per_value
1046 )));
1047 }
1048
1049 Ok(values.bits_per_value)
1050}
1051
1052#[cfg(test)]
1053mod tests {
1054 use super::*;
1055 use crate::buffer::LanceBuffer;
1056 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1057 use crate::statistics::ComputeStat;
1058 use crate::testing::extract_array_encoding_chain;
1059 use arrow_schema::{DataType, Field as ArrowField};
1060 use std::collections::HashMap;
1061
1062 fn create_test_field(name: &str, data_type: DataType) -> Field {
1063 let arrow_field = ArrowField::new(name, data_type, true);
1064 let mut field = Field::try_from(&arrow_field).unwrap();
1065 field.id = -1;
1066 field
1067 }
1068
1069 fn create_fixed_width_block_with_stats(
1070 bits_per_value: u64,
1071 num_values: u64,
1072 run_count: u64,
1073 ) -> DataBlock {
1074 let bytes_per_value = (bits_per_value / 8) as usize;
1076 let total_bytes = bytes_per_value * num_values as usize;
1077 let mut data = vec![0u8; total_bytes];
1078
1079 let values_per_run = (num_values / run_count).max(1);
1081 let mut run_value = 0u8;
1082
1083 for i in 0..num_values as usize {
1084 if i % values_per_run as usize == 0 {
1085 run_value = run_value.wrapping_add(17); }
1087 for j in 0..bytes_per_value {
1089 let byte_offset = i * bytes_per_value + j;
1090 if byte_offset < data.len() {
1091 data[byte_offset] = run_value.wrapping_add(j as u8);
1092 }
1093 }
1094 }
1095
1096 let mut block = FixedWidthDataBlock {
1097 bits_per_value,
1098 data: LanceBuffer::reinterpret_vec(data),
1099 num_values,
1100 block_info: BlockInfo::default(),
1101 };
1102
1103 use crate::statistics::ComputeStat;
1105 block.compute_stat();
1106
1107 DataBlock::FixedWidth(block)
1108 }
1109
1110 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1111 let bytes_per_value = (bits_per_value / 8) as usize;
1113 let total_bytes = bytes_per_value * num_values as usize;
1114 let mut data = vec![0u8; total_bytes];
1115
1116 for i in 0..num_values as usize {
1118 let byte_offset = i * bytes_per_value;
1119 if byte_offset < data.len() {
1120 data[byte_offset] = (i % 256) as u8;
1121 }
1122 }
1123
1124 let mut block = FixedWidthDataBlock {
1125 bits_per_value,
1126 data: LanceBuffer::reinterpret_vec(data),
1127 num_values,
1128 block_info: BlockInfo::default(),
1129 };
1130
1131 use crate::statistics::ComputeStat;
1133 block.compute_stat();
1134
1135 DataBlock::FixedWidth(block)
1136 }
1137
1138 fn create_variable_width_block(
1139 bits_per_offset: u8,
1140 num_values: u64,
1141 avg_value_size: usize,
1142 ) -> DataBlock {
1143 use crate::statistics::ComputeStat;
1144
1145 let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1147 let mut current_offset = 0i64;
1148 offsets.push(current_offset);
1149
1150 for i in 0..num_values {
1152 let value_size = if avg_value_size == 0 {
1153 1
1154 } else {
1155 ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1156 .min(avg_value_size * 2)
1157 };
1158 current_offset += value_size as i64;
1159 offsets.push(current_offset);
1160 }
1161
1162 let total_data_size = current_offset as usize;
1164 let mut data = vec![0u8; total_data_size];
1165
1166 for i in 0..num_values {
1168 let start_offset = offsets[i as usize] as usize;
1169 let end_offset = offsets[(i + 1) as usize] as usize;
1170
1171 let content = (i % 256) as u8;
1172 for j in 0..end_offset - start_offset {
1173 data[start_offset + j] = content.wrapping_add(j as u8);
1174 }
1175 }
1176
1177 let offsets_buffer = match bits_per_offset {
1179 32 => {
1180 let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1181 LanceBuffer::reinterpret_vec(offsets_32)
1182 }
1183 64 => LanceBuffer::reinterpret_vec(offsets),
1184 _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1185 };
1186
1187 let mut block = VariableWidthBlock {
1188 data: LanceBuffer::from(data),
1189 offsets: offsets_buffer,
1190 bits_per_offset,
1191 num_values,
1192 block_info: BlockInfo::default(),
1193 };
1194
1195 block.compute_stat();
1196 DataBlock::VariableWidth(block)
1197 }
1198
1199 fn create_fsst_candidate_variable_width_block() -> DataBlock {
1200 create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1201 }
1202
1203 #[test]
1204 fn test_parameter_based_compression() {
1205 let mut params = CompressionParams::new();
1206
1207 params.columns.insert(
1209 "*_id".to_string(),
1210 CompressionFieldParams {
1211 rle_threshold: Some(0.3),
1212 compression: Some("lz4".to_string()),
1213 compression_level: None,
1214 bss: Some(BssMode::Off), minichunk_size: None,
1216 },
1217 );
1218
1219 let strategy = DefaultCompressionStrategy::with_params(params);
1220 let field = create_test_field("user_id", DataType::Int32);
1221
1222 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1227 let debug_str = format!("{:?}", compressor);
1229
1230 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1232 assert!(debug_str.contains("RleEncoder"));
1233 }
1234
1235 #[test]
1236 fn test_type_level_parameters() {
1237 let mut params = CompressionParams::new();
1238
1239 params.types.insert(
1241 "Int32".to_string(),
1242 CompressionFieldParams {
1243 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1245 compression_level: Some(3),
1246 bss: Some(BssMode::Off), minichunk_size: None,
1248 },
1249 );
1250
1251 let strategy = DefaultCompressionStrategy::with_params(params);
1252 let field = create_test_field("some_column", DataType::Int32);
1253 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1255
1256 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1257 assert!(format!("{:?}", compressor).contains("RleEncoder"));
1259 }
1260
1261 #[test]
1262 #[cfg(feature = "bitpacking")]
1263 fn test_low_cardinality_prefers_bitpacking_over_rle() {
1264 let strategy = DefaultCompressionStrategy::new();
1265 let field = create_test_field("int_score", DataType::Int64);
1266
1267 let mut values: Vec<u64> = Vec::with_capacity(256);
1270 for run_idx in 0..64 {
1271 let value = match run_idx % 3 {
1272 0 => 3u64,
1273 1 => 4u64,
1274 _ => 5u64,
1275 };
1276 values.extend(std::iter::repeat_n(value, 4));
1277 }
1278
1279 let mut block = FixedWidthDataBlock {
1280 bits_per_value: 64,
1281 data: LanceBuffer::reinterpret_vec(values),
1282 num_values: 256,
1283 block_info: BlockInfo::default(),
1284 };
1285
1286 use crate::statistics::ComputeStat;
1287 block.compute_stat();
1288
1289 let data = DataBlock::FixedWidth(block);
1290 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1291 let debug_str = format!("{:?}", compressor);
1292 assert!(
1293 debug_str.contains("InlineBitpacking"),
1294 "expected InlineBitpacking, got: {debug_str}"
1295 );
1296 assert!(
1297 !debug_str.contains("RleEncoder"),
1298 "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1299 );
1300 }
1301
1302 fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1303 let chain = extract_array_encoding_chain(encoding);
1304 if variable {
1305 assert_eq!(chain.len(), 2);
1306 assert_eq!(chain.first().unwrap().as_str(), "variable");
1307 assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1308 } else {
1309 assert_eq!(chain.len(), 1);
1310 assert_eq!(chain.first().unwrap().as_str(), "flat");
1311 }
1312 }
1313
1314 #[test]
1315 fn test_none_compression() {
1316 let mut params = CompressionParams::new();
1317
1318 params.columns.insert(
1320 "embeddings".to_string(),
1321 CompressionFieldParams {
1322 compression: Some("none".to_string()),
1323 ..Default::default()
1324 },
1325 );
1326
1327 let strategy = DefaultCompressionStrategy::with_params(params);
1328 let field = create_test_field("embeddings", DataType::Float32);
1329 let fixed_data = create_fixed_width_block(32, 1000);
1330 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1331
1332 let compressor = strategy
1334 .create_miniblock_compressor(&field, &fixed_data)
1335 .unwrap();
1336 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1337 check_uncompressed_encoding(&encoding, false);
1338 let compressor = strategy
1339 .create_miniblock_compressor(&field, &variable_data)
1340 .unwrap();
1341 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1342 check_uncompressed_encoding(&encoding, true);
1343
1344 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1346 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1347 check_uncompressed_encoding(&encoding, false);
1348 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1349 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1350 check_uncompressed_encoding(&encoding, true);
1351 }
1352
1353 #[test]
1354 fn test_field_metadata_none_compression() {
1355 let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1357 let mut metadata = HashMap::new();
1358 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1359 arrow_field = arrow_field.with_metadata(metadata);
1360 let field = Field::try_from(&arrow_field).unwrap();
1361
1362 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1363
1364 let fixed_data = create_fixed_width_block(32, 1000);
1366 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1367
1368 let compressor = strategy
1369 .create_miniblock_compressor(&field, &fixed_data)
1370 .unwrap();
1371 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1372 check_uncompressed_encoding(&encoding, false);
1373
1374 let compressor = strategy
1375 .create_miniblock_compressor(&field, &variable_data)
1376 .unwrap();
1377 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1378 check_uncompressed_encoding(&encoding, true);
1379
1380 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1382 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1383 check_uncompressed_encoding(&encoding, false);
1384
1385 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1386 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1387 check_uncompressed_encoding(&encoding, true);
1388 }
1389
1390 #[test]
1391 fn test_auto_fsst_disabled_for_binary_fields() {
1392 let strategy = DefaultCompressionStrategy::new();
1393 let field = create_test_field("bytes", DataType::Binary);
1394 let variable_data = create_fsst_candidate_variable_width_block();
1395
1396 let miniblock = strategy
1397 .create_miniblock_compressor(&field, &variable_data)
1398 .unwrap();
1399 let miniblock_debug = format!("{:?}", miniblock);
1400 assert!(
1401 miniblock_debug.contains("BinaryMiniBlockEncoder"),
1402 "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1403 );
1404 assert!(
1405 !miniblock_debug.contains("FsstMiniBlockEncoder"),
1406 "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1407 );
1408
1409 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1410 let per_value_debug = format!("{:?}", per_value);
1411 assert!(
1412 per_value_debug.contains("VariableEncoder"),
1413 "expected VariableEncoder, got: {per_value_debug}"
1414 );
1415 assert!(
1416 !per_value_debug.contains("FsstPerValueEncoder"),
1417 "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1418 );
1419 }
1420
1421 #[test]
1422 fn test_auto_fsst_still_enabled_for_utf8_fields() {
1423 let strategy = DefaultCompressionStrategy::new();
1424 let field = create_test_field("text", DataType::Utf8);
1425 let variable_data = create_fsst_candidate_variable_width_block();
1426
1427 let miniblock = strategy
1428 .create_miniblock_compressor(&field, &variable_data)
1429 .unwrap();
1430 let miniblock_debug = format!("{:?}", miniblock);
1431 assert!(
1432 miniblock_debug.contains("FsstMiniBlockEncoder"),
1433 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1434 );
1435
1436 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1437 let per_value_debug = format!("{:?}", per_value);
1438 assert!(
1439 per_value_debug.contains("FsstPerValueEncoder"),
1440 "expected FsstPerValueEncoder, got: {per_value_debug}"
1441 );
1442 }
1443
1444 #[test]
1445 fn test_explicit_fsst_still_supported_for_binary_fields() {
1446 let mut params = CompressionParams::new();
1447 params.columns.insert(
1448 "bytes".to_string(),
1449 CompressionFieldParams {
1450 compression: Some("fsst".to_string()),
1451 ..Default::default()
1452 },
1453 );
1454
1455 let strategy = DefaultCompressionStrategy::with_params(params);
1456 let field = create_test_field("bytes", DataType::Binary);
1457 let variable_data = create_fsst_candidate_variable_width_block();
1458
1459 let miniblock = strategy
1460 .create_miniblock_compressor(&field, &variable_data)
1461 .unwrap();
1462 let miniblock_debug = format!("{:?}", miniblock);
1463 assert!(
1464 miniblock_debug.contains("FsstMiniBlockEncoder"),
1465 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1466 );
1467
1468 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1469 let per_value_debug = format!("{:?}", per_value);
1470 assert!(
1471 per_value_debug.contains("FsstPerValueEncoder"),
1472 "expected FsstPerValueEncoder, got: {per_value_debug}"
1473 );
1474 }
1475
1476 #[test]
1477 fn test_parameter_merge_priority() {
1478 let mut params = CompressionParams::new();
1479
1480 params.types.insert(
1482 "Int32".to_string(),
1483 CompressionFieldParams {
1484 rle_threshold: Some(0.5),
1485 compression: Some("lz4".to_string()),
1486 ..Default::default()
1487 },
1488 );
1489
1490 params.columns.insert(
1492 "user_id".to_string(),
1493 CompressionFieldParams {
1494 rle_threshold: Some(0.2),
1495 compression: Some("zstd".to_string()),
1496 compression_level: Some(6),
1497 bss: None,
1498 minichunk_size: None,
1499 },
1500 );
1501
1502 let strategy = DefaultCompressionStrategy::with_params(params);
1503
1504 let merged = strategy
1506 .params
1507 .get_field_params("user_id", &DataType::Int32);
1508
1509 assert_eq!(merged.rle_threshold, Some(0.2));
1511 assert_eq!(merged.compression, Some("zstd".to_string()));
1512 assert_eq!(merged.compression_level, Some(6));
1513
1514 let merged = strategy
1516 .params
1517 .get_field_params("other_field", &DataType::Int32);
1518 assert_eq!(merged.rle_threshold, Some(0.5));
1519 assert_eq!(merged.compression, Some("lz4".to_string()));
1520 assert_eq!(merged.compression_level, None);
1521 }
1522
1523 #[test]
1524 fn test_pattern_matching() {
1525 let mut params = CompressionParams::new();
1526
1527 params.columns.insert(
1529 "log_*".to_string(),
1530 CompressionFieldParams {
1531 compression: Some("zstd".to_string()),
1532 compression_level: Some(6),
1533 ..Default::default()
1534 },
1535 );
1536
1537 let strategy = DefaultCompressionStrategy::with_params(params);
1538
1539 let merged = strategy
1541 .params
1542 .get_field_params("log_messages", &DataType::Utf8);
1543 assert_eq!(merged.compression, Some("zstd".to_string()));
1544 assert_eq!(merged.compression_level, Some(6));
1545
1546 let merged = strategy
1548 .params
1549 .get_field_params("messages_log", &DataType::Utf8);
1550 assert_eq!(merged.compression, None);
1551 }
1552
1553 #[test]
1554 fn test_legacy_metadata_support() {
1555 let params = CompressionParams::new();
1556 let strategy = DefaultCompressionStrategy::with_params(params);
1557
1558 let mut metadata = HashMap::new();
1560 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1561 let mut field = create_test_field("some_column", DataType::Int32);
1562 field.metadata = metadata;
1563
1564 let data = create_fixed_width_block(32, 1000);
1565 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1566
1567 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1569 }
1570
1571 #[test]
1572 fn test_default_behavior() {
1573 let params = CompressionParams::new();
1575 let strategy = DefaultCompressionStrategy::with_params(params);
1576
1577 let field = create_test_field("random_column", DataType::Int32);
1578 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1580
1581 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1582 let debug_str = format!("{:?}", compressor);
1584 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1585 }
1586
1587 #[test]
1588 fn test_field_metadata_compression() {
1589 let params = CompressionParams::new();
1590 let strategy = DefaultCompressionStrategy::with_params(params);
1591
1592 let mut metadata = HashMap::new();
1594 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1595 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1596 let mut field = create_test_field("test_column", DataType::Int32);
1597 field.metadata = metadata;
1598
1599 let data = create_fixed_width_block(32, 1000);
1600 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1601
1602 let debug_str = format!("{:?}", compressor);
1604 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1605 }
1606
1607 #[test]
1608 fn test_field_metadata_rle_threshold() {
1609 let params = CompressionParams::new();
1610 let strategy = DefaultCompressionStrategy::with_params(params);
1611
1612 let mut metadata = HashMap::new();
1614 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1615 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1617 field.metadata = metadata;
1618
1619 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1622
1623 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1624
1625 let debug_str = format!("{:?}", compressor);
1627 assert!(debug_str.contains("RleEncoder"));
1628 }
1629
1630 #[test]
1631 fn test_field_metadata_override_params() {
1632 let mut params = CompressionParams::new();
1634 params.columns.insert(
1635 "test_column".to_string(),
1636 CompressionFieldParams {
1637 rle_threshold: Some(0.3),
1638 compression: Some("lz4".to_string()),
1639 compression_level: None,
1640 bss: None,
1641 minichunk_size: None,
1642 },
1643 );
1644
1645 let strategy = DefaultCompressionStrategy::with_params(params);
1646
1647 let mut metadata = HashMap::new();
1649 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1650 let mut field = create_test_field("test_column", DataType::Int32);
1651 field.metadata = metadata;
1652
1653 let data = create_fixed_width_block(32, 1000);
1654 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1655
1656 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1658 }
1659
1660 #[test]
1661 fn test_field_metadata_mixed_configuration() {
1662 let mut params = CompressionParams::new();
1664 params.types.insert(
1665 "Int32".to_string(),
1666 CompressionFieldParams {
1667 rle_threshold: Some(0.5),
1668 compression: Some("lz4".to_string()),
1669 ..Default::default()
1670 },
1671 );
1672
1673 let strategy = DefaultCompressionStrategy::with_params(params);
1674
1675 let mut metadata = HashMap::new();
1677 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1678 let mut field = create_test_field("test_column", DataType::Int32);
1679 field.metadata = metadata;
1680
1681 let data = create_fixed_width_block(32, 1000);
1682 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1683
1684 let debug_str = format!("{:?}", compressor);
1686 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1687 }
1688
1689 #[test]
1690 fn test_bss_field_metadata() {
1691 let params = CompressionParams::new();
1692 let strategy = DefaultCompressionStrategy::with_params(params);
1693
1694 let mut metadata = HashMap::new();
1696 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1697 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1698 let arrow_field =
1699 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1700 let field = Field::try_from(&arrow_field).unwrap();
1701
1702 let data = create_fixed_width_block(32, 100);
1704
1705 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1706 let debug_str = format!("{:?}", compressor);
1707 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1708 }
1709
1710 #[test]
1711 fn test_bss_with_compression() {
1712 let params = CompressionParams::new();
1713 let strategy = DefaultCompressionStrategy::with_params(params);
1714
1715 let mut metadata = HashMap::new();
1717 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1718 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1719 let arrow_field =
1720 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1721 let field = Field::try_from(&arrow_field).unwrap();
1722
1723 let data = create_fixed_width_block(64, 100);
1725
1726 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1727 let debug_str = format!("{:?}", compressor);
1728 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1730 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1731 }
1732
1733 #[test]
1734 #[cfg(any(feature = "lz4", feature = "zstd"))]
1735 fn test_general_block_decompression_fixed_width_v2_2() {
1736 let mut params = CompressionParams::new();
1738 params.columns.insert(
1739 "dict_values".to_string(),
1740 CompressionFieldParams {
1741 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1742 ..Default::default()
1743 },
1744 );
1745
1746 let mut strategy = DefaultCompressionStrategy::with_params(params);
1747 strategy.version = LanceFileVersion::V2_2;
1748
1749 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1750 let data = create_fixed_width_block(24, 1024);
1751 let DataBlock::FixedWidth(expected_block) = &data else {
1752 panic!("expected fixed width block");
1753 };
1754 let expected_bits = expected_block.bits_per_value;
1755 let expected_num_values = expected_block.num_values;
1756 let num_values = expected_num_values;
1757
1758 let (compressor, encoding) = strategy
1759 .create_block_compressor(&field, &data)
1760 .expect("general compression should be selected");
1761 match encoding.compression.as_ref() {
1762 Some(Compression::General(_)) => {}
1763 other => panic!("expected general compression, got {:?}", other),
1764 }
1765
1766 let compressed_buffer = compressor
1767 .compress(data.clone())
1768 .expect("write path general compression should succeed");
1769
1770 let decompressor = DefaultDecompressionStrategy::default()
1771 .create_block_decompressor(&encoding)
1772 .expect("general block decompressor should be created");
1773
1774 let decoded = decompressor
1775 .decompress(compressed_buffer, num_values)
1776 .expect("decompression should succeed");
1777
1778 match decoded {
1779 DataBlock::FixedWidth(block) => {
1780 assert_eq!(block.bits_per_value, expected_bits);
1781 assert_eq!(block.num_values, expected_num_values);
1782 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1783 }
1784 _ => panic!("expected fixed width block"),
1785 }
1786 }
1787
1788 #[test]
1789 #[cfg(any(feature = "lz4", feature = "zstd"))]
1790 fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1791 let mut params = CompressionParams::new();
1792 params.columns.insert(
1793 "dict_values".to_string(),
1794 CompressionFieldParams {
1795 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1796 ..Default::default()
1797 },
1798 );
1799
1800 let strategy =
1801 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1802 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1803 let data = create_fixed_width_block(24, 1024);
1804
1805 let (_compressor, encoding) = strategy
1806 .create_block_compressor(&field, &data)
1807 .expect("block compressor selection should succeed");
1808
1809 assert!(
1810 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1811 "general compression should not be selected for V2.1"
1812 );
1813 }
1814
1815 #[test]
1816 fn test_none_compression_disables_auto_general_block_compression() {
1817 let mut params = CompressionParams::new();
1818 params.columns.insert(
1819 "dict_values".to_string(),
1820 CompressionFieldParams {
1821 compression: Some("none".to_string()),
1822 ..Default::default()
1823 },
1824 );
1825
1826 let strategy =
1827 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1828 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1829 let data = create_fixed_width_block(24, 20_000);
1830
1831 assert!(
1832 data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1833 "test requires block size above automatic general compression threshold"
1834 );
1835
1836 let (_compressor, encoding) = strategy
1837 .create_block_compressor(&field, &data)
1838 .expect("block compressor selection should succeed");
1839
1840 assert!(
1841 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1842 "compression=none should disable automatic block general compression"
1843 );
1844 }
1845
1846 #[test]
1847 fn test_rle_block_used_for_version_v2_2() {
1848 let field = create_test_field("test_repdef", DataType::UInt16);
1849
1850 let num_values = 1000u64;
1852 let mut data = Vec::with_capacity(num_values as usize);
1853 for i in 0..10 {
1854 for _ in 0..100 {
1855 data.push(i as u16);
1856 }
1857 }
1858
1859 let mut block = FixedWidthDataBlock {
1860 bits_per_value: 16,
1861 data: LanceBuffer::reinterpret_vec(data),
1862 num_values,
1863 block_info: BlockInfo::default(),
1864 };
1865
1866 block.compute_stat();
1867
1868 let data_block = DataBlock::FixedWidth(block);
1869
1870 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1871 .with_version(LanceFileVersion::V2_2);
1872
1873 let (compressor, _) = strategy
1874 .create_block_compressor(&field, &data_block)
1875 .unwrap();
1876
1877 let debug_str = format!("{:?}", compressor);
1878 assert!(debug_str.contains("RleEncoder"));
1879 }
1880
1881 #[test]
1882 fn test_rle_block_not_used_for_version_v2_1() {
1883 let field = create_test_field("test_repdef", DataType::UInt16);
1884
1885 let num_values = 1000u64;
1887 let mut data = Vec::with_capacity(num_values as usize);
1888 for i in 0..10 {
1889 for _ in 0..100 {
1890 data.push(i as u16);
1891 }
1892 }
1893
1894 let mut block = FixedWidthDataBlock {
1895 bits_per_value: 16,
1896 data: LanceBuffer::reinterpret_vec(data),
1897 num_values,
1898 block_info: BlockInfo::default(),
1899 };
1900
1901 block.compute_stat();
1902
1903 let data_block = DataBlock::FixedWidth(block);
1904
1905 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1906 .with_version(LanceFileVersion::V2_1);
1907
1908 let (compressor, _) = strategy
1909 .create_block_compressor(&field, &data_block)
1910 .unwrap();
1911
1912 let debug_str = format!("{:?}", compressor);
1913 assert!(
1914 !debug_str.contains("RleEncoder"),
1915 "RLE should not be used for V2.1"
1916 );
1917 }
1918}