1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleDecompressor, RleEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 ProtobufUtils21,
60 pb21::{CompressiveEncoding, compressive_encoding::Compression},
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115 fn create_block_compressor(
117 &self,
118 field: &Field,
119 data: &DataBlock,
120 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122 fn create_per_value(
124 &self,
125 field: &Field,
126 data: &DataBlock,
127 ) -> Result<Box<dyn PerValueCompressor>>;
128
129 fn create_miniblock_compressor(
131 &self,
132 field: &Field,
133 data: &DataBlock,
134 ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139 params: CompressionParams,
141 version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146 data: &FixedWidthDataBlock,
147 params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152 return None;
153 }
154
155 let mode = params.bss.unwrap_or(BssMode::Auto);
156 if should_use_bss(data, mode) {
158 return Some(Box::new(ByteStreamSplitEncoder::new(
159 data.bits_per_value as usize,
160 )));
161 }
162 None
163}
164
165fn try_rle_for_mini_block(
166 data: &FixedWidthDataBlock,
167 params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169 let bits = data.bits_per_value;
170 if !matches!(bits, 8 | 16 | 32 | 64) {
171 return None;
172 }
173
174 let type_size = bits / 8;
175 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176 let threshold = params
177 .rle_threshold
178 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180 let passes_threshold = match params.rle_threshold {
183 Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184 None => true,
185 };
186
187 if !passes_threshold {
188 return None;
189 }
190
191 let num_values = data.num_values;
197 let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199 let raw_bytes = (num_values as u128) * (type_size as u128);
200 let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202 if rle_bytes < raw_bytes {
203 #[cfg(feature = "bitpacking")]
204 {
205 if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206 && (bitpack_bytes as u128) < rle_bytes
207 {
208 return None;
209 }
210 }
211 return Some(Box::new(RleEncoder::new()));
212 }
213 None
214}
215
216fn try_rle_for_block(
217 data: &FixedWidthDataBlock,
218 version: LanceFileVersion,
219 params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221 if version < LanceFileVersion::V2_2 {
222 return None;
223 }
224
225 let bits = data.bits_per_value;
226 if !matches!(bits, 8 | 16 | 32 | 64) {
227 return None;
228 }
229
230 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231 let threshold = params
232 .rle_threshold
233 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235 if (run_count as f64) < (data.num_values as f64) * threshold {
236 let compressor = Box::new(RleEncoder::new());
237 let encoding = ProtobufUtils21::rle(
238 ProtobufUtils21::flat(bits, None),
239 ProtobufUtils21::flat(8, None),
240 );
241 return Some((compressor, encoding));
242 }
243 None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247 #[cfg(feature = "bitpacking")]
248 {
249 let bits = _data.bits_per_value;
250 if estimate_inline_bitpacking_bytes(_data).is_some() {
251 return Some(Box::new(InlineBitpacking::new(bits)));
252 }
253 None
254 }
255 #[cfg(not(feature = "bitpacking"))]
256 {
257 None
258 }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263 use arrow_array::cast::AsArray;
264
265 let bits = data.bits_per_value;
266 if !matches!(bits, 8 | 16 | 32 | 64) {
267 return None;
268 }
269 if data.num_values == 0 {
270 return None;
271 }
272
273 let bit_widths = data.expect_stat(Stat::BitWidth);
274 let widths = bit_widths.as_primitive::<UInt64Type>();
275
276 let words_per_chunk: u128 = 1;
277 let word_bytes: u128 = (bits / 8) as u128;
278 let mut total_words: u128 = 0;
279 for i in 0..widths.len() {
280 let bit_width = widths.value(i) as u128;
281 let packed_words = (1024u128 * bit_width) / (bits as u128);
282 total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283 }
284
285 let estimated_bytes = total_words.saturating_mul(word_bytes);
286 let raw_bytes = data.data_size() as u128;
287
288 if estimated_bytes >= raw_bytes {
289 return None;
290 }
291
292 u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296 data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298 let bits = data.bits_per_value;
299 if !matches!(bits, 8 | 16 | 32 | 64) {
300 return None;
301 }
302
303 let bit_widths = data.expect_stat(Stat::BitWidth);
304 let widths = bit_widths.as_primitive::<UInt64Type>();
305 let has_all_zeros = widths.values().contains(&0);
306 let max_bit_width = *widths.values().iter().max().unwrap();
307
308 let too_small =
309 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
310
311 if has_all_zeros || too_small {
312 return None;
313 }
314
315 if data.num_values <= 1024 {
316 let compressor = Box::new(InlineBitpacking::new(bits));
317 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
318 Some((compressor, encoding))
319 } else {
320 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
321 let encoding = ProtobufUtils21::out_of_line_bitpacking(
322 bits,
323 ProtobufUtils21::flat(max_bit_width, None),
324 );
325 Some((compressor, encoding))
326 }
327}
328
329fn maybe_wrap_general_for_mini_block(
330 inner: Box<dyn MiniBlockCompressor>,
331 params: &CompressionFieldParams,
332) -> Result<Box<dyn MiniBlockCompressor>> {
333 match params.compression.as_deref() {
334 None | Some("none") | Some("fsst") => Ok(inner),
335 Some(raw) => {
336 let scheme = CompressionScheme::from_str(raw)
337 .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
338 let cfg = CompressionConfig::new(scheme, params.compression_level);
339 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
340 }
341 }
342}
343
344fn try_general_compression(
345 version: LanceFileVersion,
346 field_params: &CompressionFieldParams,
347 data: &DataBlock,
348) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
349 if field_params.compression.as_deref() == Some("none") {
351 return Ok(None);
352 }
353
354 if let Some(compression_scheme) = &field_params.compression
357 && version >= LanceFileVersion::V2_2
358 {
359 let scheme: CompressionScheme = compression_scheme.parse()?;
360 let config = CompressionConfig::new(scheme, field_params.compression_level);
361 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
362 return Ok(Some((compressor, config)));
363 }
364
365 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
367 && version >= LanceFileVersion::V2_2
368 {
369 let compressor = Box::new(CompressedBufferEncoder::default());
370 let config = compressor.compressor.config();
371 return Ok(Some((compressor, config)));
372 }
373
374 Ok(None)
375}
376
377impl DefaultCompressionStrategy {
378 pub fn new() -> Self {
380 Self::default()
381 }
382
383 pub fn with_params(params: CompressionParams) -> Self {
385 Self {
386 params,
387 version: LanceFileVersion::default(),
388 }
389 }
390
391 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
393 self.version = version;
394 self
395 }
396
397 fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
399 let mut params = CompressionFieldParams::default();
400
401 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
403 params.compression = Some(compression.clone());
404 }
405
406 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
408 params.compression_level = level.parse().ok();
409 }
410
411 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
413 params.rle_threshold = threshold.parse().ok();
414 }
415
416 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
418 match BssMode::parse(bss_str) {
419 Some(mode) => params.bss = Some(mode),
420 None => {
421 log::warn!("Invalid BSS mode '{}', using default", bss_str);
422 }
423 }
424 }
425
426 if let Some(minichunk_size_str) = field
428 .metadata
429 .get(super::constants::MINICHUNK_SIZE_META_KEY)
430 {
431 if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
432 if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
434 log::warn!(
435 "minichunk_size '{}' too large for version '{}', using default",
436 minichunk_size,
437 version
438 );
439 } else {
440 params.minichunk_size = Some(minichunk_size);
441 }
442 } else {
443 log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
444 }
445 }
446
447 params
448 }
449
450 fn build_fixed_width_compressor(
451 &self,
452 params: &CompressionFieldParams,
453 data: &FixedWidthDataBlock,
454 ) -> Result<Box<dyn MiniBlockCompressor>> {
455 if params.compression.as_deref() == Some("none") {
456 return Ok(Box::new(ValueEncoder::default()));
457 }
458
459 let base = try_bss_for_mini_block(data, params)
460 .or_else(|| try_rle_for_mini_block(data, params))
461 .or_else(|| try_bitpack_for_mini_block(data))
462 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
463
464 maybe_wrap_general_for_mini_block(base, params)
465 }
466
467 fn build_variable_width_compressor(
469 &self,
470 field: &Field,
471 data: &VariableWidthBlock,
472 ) -> Result<Box<dyn MiniBlockCompressor>> {
473 let params = self.get_merged_field_params(field);
474 let compression = params.compression.as_deref();
475 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
476 return Err(Error::invalid_input(format!(
477 "Variable width compression not supported for {} bit offsets",
478 data.bits_per_offset
479 )));
480 }
481
482 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
484 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
485
486 if compression == Some("none") {
488 return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
489 }
490
491 let use_fsst = compression == Some("fsst")
492 || (compression.is_none()
493 && !matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
494 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
495 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
496
497 let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
499 Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
500 } else {
501 Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
502 };
503
504 if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
506 let scheme: CompressionScheme = compression_scheme.parse()?;
507 let config = CompressionConfig::new(scheme, params.compression_level);
508 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
509 }
510
511 Ok(base_encoder)
512 }
513
514 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
517 let mut field_params = self
518 .params
519 .get_field_params(&field.name, &field.data_type());
520
521 let metadata_params = Self::parse_field_metadata(field, &self.version);
523 field_params.merge(&metadata_params);
524
525 field_params
526 }
527}
528
529impl CompressionStrategy for DefaultCompressionStrategy {
530 fn create_miniblock_compressor(
531 &self,
532 field: &Field,
533 data: &DataBlock,
534 ) -> Result<Box<dyn MiniBlockCompressor>> {
535 match data {
536 DataBlock::FixedWidth(fixed_width_data) => {
537 let field_params = self.get_merged_field_params(field);
538 self.build_fixed_width_compressor(&field_params, fixed_width_data)
539 }
540 DataBlock::VariableWidth(variable_width_data) => {
541 self.build_variable_width_compressor(field, variable_width_data)
542 }
543 DataBlock::Struct(struct_data_block) => {
544 if struct_data_block.has_variable_width_child() {
547 return Err(Error::invalid_input(
548 "Packed struct mini-block encoding supports only fixed-width children",
549 ));
550 }
551 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
552 }
553 DataBlock::FixedSizeList(_) => {
554 Ok(Box::new(ValueEncoder::default()))
562 }
563 _ => Err(Error::not_supported_source(
564 format!(
565 "Mini-block compression not yet supported for block type {}",
566 data.name()
567 )
568 .into(),
569 )),
570 }
571 }
572
573 fn create_per_value(
574 &self,
575 field: &Field,
576 data: &DataBlock,
577 ) -> Result<Box<dyn PerValueCompressor>> {
578 let field_params = self.get_merged_field_params(field);
579
580 match data {
581 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
582 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
583 DataBlock::Struct(struct_block) => {
584 if field.children.len() != struct_block.children.len() {
585 return Err(Error::invalid_input(
586 "Struct field metadata does not match data block children",
587 ));
588 }
589 let has_variable_child = struct_block.has_variable_width_child();
590 if has_variable_child {
591 if self.version < LanceFileVersion::V2_2 {
592 return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
593 }
594 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
595 self.clone(),
596 field.children.clone(),
597 )))
598 } else {
599 Err(Error::invalid_input(
600 "Packed struct per-value compression should not be used for fixed-width-only structs",
601 ))
602 }
603 }
604 DataBlock::VariableWidth(variable_width) => {
605 let compression = field_params.compression.as_deref();
606 if compression == Some("none") {
608 return Ok(Box::new(VariableEncoder::default()));
609 }
610
611 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
612 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
613
614 let per_value_requested =
619 compression.is_some_and(|compression| compression != "fsst");
620
621 if (max_len > 32 * 1024 || per_value_requested)
622 && data_size >= FSST_LEAST_INPUT_SIZE as u64
623 {
624 return Ok(Box::new(CompressedBufferEncoder::default()));
625 }
626
627 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
628 let variable_compression = Box::new(VariableEncoder::default());
629 let use_fsst = compression == Some("fsst")
630 || (compression.is_none()
631 && !matches!(
632 field.data_type(),
633 DataType::Binary | DataType::LargeBinary
634 )
635 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
636 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
637
638 if use_fsst {
640 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
641 } else {
642 Ok(variable_compression)
643 }
644 } else {
645 panic!(
646 "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
647 variable_width.bits_per_offset
648 );
649 }
650 }
651 _ => unreachable!(
652 "Per-value compression not yet supported for block type: {}",
653 data.name()
654 ),
655 }
656 }
657
658 fn create_block_compressor(
659 &self,
660 field: &Field,
661 data: &DataBlock,
662 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
663 let field_params = self.get_merged_field_params(field);
664
665 match data {
666 DataBlock::FixedWidth(fixed_width) => {
667 if let Some((compressor, encoding)) =
668 try_rle_for_block(fixed_width, self.version, &field_params)
669 {
670 return Ok((compressor, encoding));
671 }
672 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
673 return Ok((compressor, encoding));
674 }
675
676 if let Some((compressor, config)) =
678 try_general_compression(self.version, &field_params, data)?
679 {
680 let encoding = ProtobufUtils21::wrapped(
681 config,
682 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
683 )?;
684 return Ok((compressor, encoding));
685 }
686
687 let encoder = Box::new(ValueEncoder::default());
688 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
689 Ok((encoder, encoding))
690 }
691 DataBlock::VariableWidth(variable_width) => {
692 if let Some((compressor, config)) =
694 try_general_compression(self.version, &field_params, data)?
695 {
696 let encoding = ProtobufUtils21::wrapped(
697 config,
698 ProtobufUtils21::variable(
699 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
700 None,
701 ),
702 )?;
703 return Ok((compressor, encoding));
704 }
705
706 let encoder = Box::new(VariableEncoder::default());
707 let encoding = ProtobufUtils21::variable(
708 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
709 None,
710 );
711 Ok((encoder, encoding))
712 }
713 _ => unreachable!(),
714 }
715 }
716}
717
718pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
719 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
720}
721
722pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
723 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
725 fn bits_per_value(&self) -> u64;
729}
730
731pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
732 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
734}
735
736pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
737 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
738}
739
740pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
741 fn create_miniblock_decompressor(
742 &self,
743 description: &CompressiveEncoding,
744 decompression_strategy: &dyn DecompressionStrategy,
745 ) -> Result<Box<dyn MiniBlockDecompressor>>;
746
747 fn create_fixed_per_value_decompressor(
748 &self,
749 description: &CompressiveEncoding,
750 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
751
752 fn create_variable_per_value_decompressor(
753 &self,
754 description: &CompressiveEncoding,
755 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
756
757 fn create_block_decompressor(
758 &self,
759 description: &CompressiveEncoding,
760 ) -> Result<Box<dyn BlockDecompressor>>;
761}
762
763#[derive(Debug, Default)]
764pub struct DefaultDecompressionStrategy {}
765
766impl DecompressionStrategy for DefaultDecompressionStrategy {
767 fn create_miniblock_decompressor(
768 &self,
769 description: &CompressiveEncoding,
770 decompression_strategy: &dyn DecompressionStrategy,
771 ) -> Result<Box<dyn MiniBlockDecompressor>> {
772 match description.compression.as_ref().unwrap() {
773 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
774 #[cfg(feature = "bitpacking")]
775 Compression::InlineBitpacking(description) => {
776 Ok(Box::new(InlineBitpacking::from_description(description)))
777 }
778 #[cfg(not(feature = "bitpacking"))]
779 Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
780 "this runtime was not built with bitpacking support".into(),
781 )),
782 Compression::Variable(variable) => {
783 let Compression::Flat(offsets) = variable
784 .offsets
785 .as_ref()
786 .unwrap()
787 .compression
788 .as_ref()
789 .unwrap()
790 else {
791 panic!("Variable compression only supports flat offsets")
792 };
793 Ok(Box::new(BinaryMiniBlockDecompressor::new(
794 offsets.bits_per_value as u8,
795 )))
796 }
797 Compression::Fsst(description) => {
798 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
799 description.values.as_ref().unwrap(),
800 decompression_strategy,
801 )?;
802 Ok(Box::new(FsstMiniBlockDecompressor::new(
803 description,
804 inner_decompressor,
805 )))
806 }
807 Compression::PackedStruct(description) => Ok(Box::new(
808 PackedStructFixedWidthMiniBlockDecompressor::new(description),
809 )),
810 Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
811 "variable packed struct decoding is not yet implemented".into(),
812 )),
813 Compression::FixedSizeList(fsl) => {
814 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
817 }
818 Compression::Rle(rle) => {
819 let bits_per_value = validate_rle_compression(rle)?;
820 Ok(Box::new(RleDecompressor::new(bits_per_value)))
821 }
822 Compression::ByteStreamSplit(bss) => {
823 let Compression::Flat(values) =
824 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
825 else {
826 panic!("ByteStreamSplit compression only supports flat values")
827 };
828 Ok(Box::new(ByteStreamSplitDecompressor::new(
829 values.bits_per_value as usize,
830 )))
831 }
832 Compression::General(general) => {
833 let inner_decompressor = self.create_miniblock_decompressor(
835 general.values.as_ref().ok_or_else(|| {
836 Error::invalid_input("GeneralMiniBlock missing inner encoding")
837 })?,
838 decompression_strategy,
839 )?;
840
841 let compression = general.compression.as_ref().ok_or_else(|| {
843 Error::invalid_input("GeneralMiniBlock missing compression config")
844 })?;
845
846 let scheme = compression.scheme().try_into()?;
847
848 let compression_config = CompressionConfig::new(scheme, compression.level);
849
850 Ok(Box::new(GeneralMiniBlockDecompressor::new(
851 inner_decompressor,
852 compression_config,
853 )))
854 }
855 _ => todo!(),
856 }
857 }
858
859 fn create_fixed_per_value_decompressor(
860 &self,
861 description: &CompressiveEncoding,
862 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
863 match description.compression.as_ref().unwrap() {
864 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
865 constant
866 .value
867 .as_ref()
868 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
869 ))),
870 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
871 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
872 _ => todo!("fixed-per-value decompressor for {:?}", description),
873 }
874 }
875
876 fn create_variable_per_value_decompressor(
877 &self,
878 description: &CompressiveEncoding,
879 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
880 match description.compression.as_ref().unwrap() {
881 Compression::Variable(variable) => {
882 let Compression::Flat(offsets) = variable
883 .offsets
884 .as_ref()
885 .unwrap()
886 .compression
887 .as_ref()
888 .unwrap()
889 else {
890 panic!("Variable compression only supports flat offsets")
891 };
892 assert!(offsets.bits_per_value < u8::MAX as u64);
893 Ok(Box::new(VariableDecoder::default()))
894 }
895 Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
896 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
897 Box::new(VariableDecoder::default()),
898 ))),
899 Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
900 general.compression.as_ref().expect_ok()?.scheme(),
901 )?)),
902 Compression::VariablePackedStruct(description) => {
903 let mut fields = Vec::with_capacity(description.fields.len());
904 for field in &description.fields {
905 let value_encoding = field.value.as_ref().ok_or_else(|| {
906 Error::invalid_input("VariablePackedStruct field is missing value encoding")
907 })?;
908 let decoder = match field.layout.as_ref().ok_or_else(|| {
909 Error::invalid_input("VariablePackedStruct field is missing layout details")
910 })? {
911 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
912 bits_per_value,
913 ) => {
914 let decompressor =
915 self.create_fixed_per_value_decompressor(value_encoding)?;
916 VariablePackedStructFieldDecoder {
917 kind: VariablePackedStructFieldKind::Fixed {
918 bits_per_value: *bits_per_value,
919 decompressor: Arc::from(decompressor),
920 },
921 }
922 }
923 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
924 bits_per_length,
925 ) => {
926 let decompressor =
927 self.create_variable_per_value_decompressor(value_encoding)?;
928 VariablePackedStructFieldDecoder {
929 kind: VariablePackedStructFieldKind::Variable {
930 bits_per_length: *bits_per_length,
931 decompressor: Arc::from(decompressor),
932 },
933 }
934 }
935 };
936 fields.push(decoder);
937 }
938 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
939 fields,
940 )))
941 }
942 _ => todo!("variable-per-value decompressor for {:?}", description),
943 }
944 }
945
946 fn create_block_decompressor(
947 &self,
948 description: &CompressiveEncoding,
949 ) -> Result<Box<dyn BlockDecompressor>> {
950 match description.compression.as_ref().unwrap() {
951 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
952 InlineBitpacking::from_description(inline_bitpacking),
953 )),
954 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
955 Compression::Constant(constant) => {
956 let scalar = constant
957 .value
958 .as_ref()
959 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
960 Ok(Box::new(ConstantDecompressor::new(scalar)))
961 }
962 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
963 Compression::FixedSizeList(fsl) => {
964 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
965 }
966 Compression::OutOfLineBitpacking(out_of_line) => {
967 let compressed_bit_width = match out_of_line
969 .values
970 .as_ref()
971 .unwrap()
972 .compression
973 .as_ref()
974 .unwrap()
975 {
976 Compression::Flat(flat) => flat.bits_per_value,
977 _ => {
978 return Err(Error::invalid_input_source(
979 "OutOfLineBitpacking values must use Flat encoding".into(),
980 ));
981 }
982 };
983 Ok(Box::new(OutOfLineBitpacking::new(
984 compressed_bit_width,
985 out_of_line.uncompressed_bits_per_value,
986 )))
987 }
988 Compression::General(general) => {
989 let inner_desc = general
990 .values
991 .as_ref()
992 .ok_or_else(|| {
993 Error::invalid_input("General compression missing inner encoding")
994 })?
995 .as_ref();
996 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
997
998 let compression = general.compression.as_ref().ok_or_else(|| {
999 Error::invalid_input("General compression missing compression config")
1000 })?;
1001 let scheme = compression.scheme().try_into()?;
1002 let config = CompressionConfig::new(scheme, compression.level);
1003 let general_decompressor =
1004 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1005
1006 Ok(Box::new(general_decompressor))
1007 }
1008 Compression::Rle(rle) => {
1009 let bits_per_value = validate_rle_compression(rle)?;
1010 Ok(Box::new(RleDecompressor::new(bits_per_value)))
1011 }
1012 _ => todo!(),
1013 }
1014 }
1015}
1016fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1018 let values = rle
1019 .values
1020 .as_ref()
1021 .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1022 let run_lengths = rle
1023 .run_lengths
1024 .as_ref()
1025 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1026
1027 let values = values
1028 .compression
1029 .as_ref()
1030 .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1031 let Compression::Flat(values) = values else {
1032 return Err(Error::invalid_input(
1033 "RLE compression only supports flat values",
1034 ));
1035 };
1036
1037 let run_lengths = run_lengths
1038 .compression
1039 .as_ref()
1040 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1041 let Compression::Flat(run_lengths) = run_lengths else {
1042 return Err(Error::invalid_input(
1043 "RLE compression only supports flat run lengths",
1044 ));
1045 };
1046
1047 if run_lengths.bits_per_value != 8 {
1048 return Err(Error::invalid_input(format!(
1049 "RLE compression only supports 8-bit run lengths, got {}",
1050 run_lengths.bits_per_value
1051 )));
1052 }
1053
1054 Ok(values.bits_per_value)
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use super::*;
1060 use crate::buffer::LanceBuffer;
1061 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1062 use crate::statistics::ComputeStat;
1063 use crate::testing::extract_array_encoding_chain;
1064 use arrow_schema::{DataType, Field as ArrowField};
1065 use std::collections::HashMap;
1066
1067 fn create_test_field(name: &str, data_type: DataType) -> Field {
1068 let arrow_field = ArrowField::new(name, data_type, true);
1069 let mut field = Field::try_from(&arrow_field).unwrap();
1070 field.id = -1;
1071 field
1072 }
1073
1074 fn create_fixed_width_block_with_stats(
1075 bits_per_value: u64,
1076 num_values: u64,
1077 run_count: u64,
1078 ) -> DataBlock {
1079 let bytes_per_value = (bits_per_value / 8) as usize;
1081 let total_bytes = bytes_per_value * num_values as usize;
1082 let mut data = vec![0u8; total_bytes];
1083
1084 let values_per_run = (num_values / run_count).max(1);
1086 let mut run_value = 0u8;
1087
1088 for i in 0..num_values as usize {
1089 if i % values_per_run as usize == 0 {
1090 run_value = run_value.wrapping_add(17); }
1092 for j in 0..bytes_per_value {
1094 let byte_offset = i * bytes_per_value + j;
1095 if byte_offset < data.len() {
1096 data[byte_offset] = run_value.wrapping_add(j as u8);
1097 }
1098 }
1099 }
1100
1101 let mut block = FixedWidthDataBlock {
1102 bits_per_value,
1103 data: LanceBuffer::reinterpret_vec(data),
1104 num_values,
1105 block_info: BlockInfo::default(),
1106 };
1107
1108 use crate::statistics::ComputeStat;
1110 block.compute_stat();
1111
1112 DataBlock::FixedWidth(block)
1113 }
1114
1115 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1116 let bytes_per_value = (bits_per_value / 8) as usize;
1118 let total_bytes = bytes_per_value * num_values as usize;
1119 let mut data = vec![0u8; total_bytes];
1120
1121 for i in 0..num_values as usize {
1123 let byte_offset = i * bytes_per_value;
1124 if byte_offset < data.len() {
1125 data[byte_offset] = (i % 256) as u8;
1126 }
1127 }
1128
1129 let mut block = FixedWidthDataBlock {
1130 bits_per_value,
1131 data: LanceBuffer::reinterpret_vec(data),
1132 num_values,
1133 block_info: BlockInfo::default(),
1134 };
1135
1136 use crate::statistics::ComputeStat;
1138 block.compute_stat();
1139
1140 DataBlock::FixedWidth(block)
1141 }
1142
1143 fn create_variable_width_block(
1144 bits_per_offset: u8,
1145 num_values: u64,
1146 avg_value_size: usize,
1147 ) -> DataBlock {
1148 use crate::statistics::ComputeStat;
1149
1150 let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1152 let mut current_offset = 0i64;
1153 offsets.push(current_offset);
1154
1155 for i in 0..num_values {
1157 let value_size = if avg_value_size == 0 {
1158 1
1159 } else {
1160 ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1161 .min(avg_value_size * 2)
1162 };
1163 current_offset += value_size as i64;
1164 offsets.push(current_offset);
1165 }
1166
1167 let total_data_size = current_offset as usize;
1169 let mut data = vec![0u8; total_data_size];
1170
1171 for i in 0..num_values {
1173 let start_offset = offsets[i as usize] as usize;
1174 let end_offset = offsets[(i + 1) as usize] as usize;
1175
1176 let content = (i % 256) as u8;
1177 for j in 0..end_offset - start_offset {
1178 data[start_offset + j] = content.wrapping_add(j as u8);
1179 }
1180 }
1181
1182 let offsets_buffer = match bits_per_offset {
1184 32 => {
1185 let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1186 LanceBuffer::reinterpret_vec(offsets_32)
1187 }
1188 64 => LanceBuffer::reinterpret_vec(offsets),
1189 _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1190 };
1191
1192 let mut block = VariableWidthBlock {
1193 data: LanceBuffer::from(data),
1194 offsets: offsets_buffer,
1195 bits_per_offset,
1196 num_values,
1197 block_info: BlockInfo::default(),
1198 };
1199
1200 block.compute_stat();
1201 DataBlock::VariableWidth(block)
1202 }
1203
1204 fn create_fsst_candidate_variable_width_block() -> DataBlock {
1205 create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1206 }
1207
1208 #[test]
1209 fn test_parameter_based_compression() {
1210 let mut params = CompressionParams::new();
1211
1212 params.columns.insert(
1214 "*_id".to_string(),
1215 CompressionFieldParams {
1216 rle_threshold: Some(0.3),
1217 compression: Some("lz4".to_string()),
1218 compression_level: None,
1219 bss: Some(BssMode::Off), minichunk_size: None,
1221 },
1222 );
1223
1224 let strategy = DefaultCompressionStrategy::with_params(params);
1225 let field = create_test_field("user_id", DataType::Int32);
1226
1227 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1232 let debug_str = format!("{:?}", compressor);
1234
1235 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1237 assert!(debug_str.contains("RleEncoder"));
1238 }
1239
1240 #[test]
1241 fn test_type_level_parameters() {
1242 let mut params = CompressionParams::new();
1243
1244 params.types.insert(
1246 "Int32".to_string(),
1247 CompressionFieldParams {
1248 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1250 compression_level: Some(3),
1251 bss: Some(BssMode::Off), minichunk_size: None,
1253 },
1254 );
1255
1256 let strategy = DefaultCompressionStrategy::with_params(params);
1257 let field = create_test_field("some_column", DataType::Int32);
1258 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1260
1261 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1262 assert!(format!("{:?}", compressor).contains("RleEncoder"));
1264 }
1265
1266 #[test]
1267 #[cfg(feature = "bitpacking")]
1268 fn test_low_cardinality_prefers_bitpacking_over_rle() {
1269 let strategy = DefaultCompressionStrategy::new();
1270 let field = create_test_field("int_score", DataType::Int64);
1271
1272 let mut values: Vec<u64> = Vec::with_capacity(256);
1275 for run_idx in 0..64 {
1276 let value = match run_idx % 3 {
1277 0 => 3u64,
1278 1 => 4u64,
1279 _ => 5u64,
1280 };
1281 values.extend(std::iter::repeat_n(value, 4));
1282 }
1283
1284 let mut block = FixedWidthDataBlock {
1285 bits_per_value: 64,
1286 data: LanceBuffer::reinterpret_vec(values),
1287 num_values: 256,
1288 block_info: BlockInfo::default(),
1289 };
1290
1291 use crate::statistics::ComputeStat;
1292 block.compute_stat();
1293
1294 let data = DataBlock::FixedWidth(block);
1295 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1296 let debug_str = format!("{:?}", compressor);
1297 assert!(
1298 debug_str.contains("InlineBitpacking"),
1299 "expected InlineBitpacking, got: {debug_str}"
1300 );
1301 assert!(
1302 !debug_str.contains("RleEncoder"),
1303 "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1304 );
1305 }
1306
1307 fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1308 let chain = extract_array_encoding_chain(encoding);
1309 if variable {
1310 assert_eq!(chain.len(), 2);
1311 assert_eq!(chain.first().unwrap().as_str(), "variable");
1312 assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1313 } else {
1314 assert_eq!(chain.len(), 1);
1315 assert_eq!(chain.first().unwrap().as_str(), "flat");
1316 }
1317 }
1318
1319 #[test]
1320 fn test_none_compression() {
1321 let mut params = CompressionParams::new();
1322
1323 params.columns.insert(
1325 "embeddings".to_string(),
1326 CompressionFieldParams {
1327 compression: Some("none".to_string()),
1328 ..Default::default()
1329 },
1330 );
1331
1332 let strategy = DefaultCompressionStrategy::with_params(params);
1333 let field = create_test_field("embeddings", DataType::Float32);
1334 let fixed_data = create_fixed_width_block(32, 1000);
1335 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1336
1337 let compressor = strategy
1339 .create_miniblock_compressor(&field, &fixed_data)
1340 .unwrap();
1341 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1342 check_uncompressed_encoding(&encoding, false);
1343 let compressor = strategy
1344 .create_miniblock_compressor(&field, &variable_data)
1345 .unwrap();
1346 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1347 check_uncompressed_encoding(&encoding, true);
1348
1349 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1351 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1352 check_uncompressed_encoding(&encoding, false);
1353 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1354 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1355 check_uncompressed_encoding(&encoding, true);
1356 }
1357
1358 #[test]
1359 fn test_field_metadata_none_compression() {
1360 let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1362 let mut metadata = HashMap::new();
1363 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1364 arrow_field = arrow_field.with_metadata(metadata);
1365 let field = Field::try_from(&arrow_field).unwrap();
1366
1367 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1368
1369 let fixed_data = create_fixed_width_block(32, 1000);
1371 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1372
1373 let compressor = strategy
1374 .create_miniblock_compressor(&field, &fixed_data)
1375 .unwrap();
1376 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1377 check_uncompressed_encoding(&encoding, false);
1378
1379 let compressor = strategy
1380 .create_miniblock_compressor(&field, &variable_data)
1381 .unwrap();
1382 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1383 check_uncompressed_encoding(&encoding, true);
1384
1385 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1387 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1388 check_uncompressed_encoding(&encoding, false);
1389
1390 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1391 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1392 check_uncompressed_encoding(&encoding, true);
1393 }
1394
1395 #[test]
1396 fn test_auto_fsst_disabled_for_binary_fields() {
1397 let strategy = DefaultCompressionStrategy::new();
1398 let field = create_test_field("bytes", DataType::Binary);
1399 let variable_data = create_fsst_candidate_variable_width_block();
1400
1401 let miniblock = strategy
1402 .create_miniblock_compressor(&field, &variable_data)
1403 .unwrap();
1404 let miniblock_debug = format!("{:?}", miniblock);
1405 assert!(
1406 miniblock_debug.contains("BinaryMiniBlockEncoder"),
1407 "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1408 );
1409 assert!(
1410 !miniblock_debug.contains("FsstMiniBlockEncoder"),
1411 "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1412 );
1413
1414 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1415 let per_value_debug = format!("{:?}", per_value);
1416 assert!(
1417 per_value_debug.contains("VariableEncoder"),
1418 "expected VariableEncoder, got: {per_value_debug}"
1419 );
1420 assert!(
1421 !per_value_debug.contains("FsstPerValueEncoder"),
1422 "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1423 );
1424 }
1425
1426 #[test]
1427 fn test_auto_fsst_still_enabled_for_utf8_fields() {
1428 let strategy = DefaultCompressionStrategy::new();
1429 let field = create_test_field("text", DataType::Utf8);
1430 let variable_data = create_fsst_candidate_variable_width_block();
1431
1432 let miniblock = strategy
1433 .create_miniblock_compressor(&field, &variable_data)
1434 .unwrap();
1435 let miniblock_debug = format!("{:?}", miniblock);
1436 assert!(
1437 miniblock_debug.contains("FsstMiniBlockEncoder"),
1438 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1439 );
1440
1441 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1442 let per_value_debug = format!("{:?}", per_value);
1443 assert!(
1444 per_value_debug.contains("FsstPerValueEncoder"),
1445 "expected FsstPerValueEncoder, got: {per_value_debug}"
1446 );
1447 }
1448
1449 #[test]
1450 fn test_explicit_fsst_still_supported_for_binary_fields() {
1451 let mut params = CompressionParams::new();
1452 params.columns.insert(
1453 "bytes".to_string(),
1454 CompressionFieldParams {
1455 compression: Some("fsst".to_string()),
1456 ..Default::default()
1457 },
1458 );
1459
1460 let strategy = DefaultCompressionStrategy::with_params(params);
1461 let field = create_test_field("bytes", DataType::Binary);
1462 let variable_data = create_fsst_candidate_variable_width_block();
1463
1464 let miniblock = strategy
1465 .create_miniblock_compressor(&field, &variable_data)
1466 .unwrap();
1467 let miniblock_debug = format!("{:?}", miniblock);
1468 assert!(
1469 miniblock_debug.contains("FsstMiniBlockEncoder"),
1470 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1471 );
1472
1473 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1474 let per_value_debug = format!("{:?}", per_value);
1475 assert!(
1476 per_value_debug.contains("FsstPerValueEncoder"),
1477 "expected FsstPerValueEncoder, got: {per_value_debug}"
1478 );
1479 }
1480
1481 #[test]
1482 fn test_parameter_merge_priority() {
1483 let mut params = CompressionParams::new();
1484
1485 params.types.insert(
1487 "Int32".to_string(),
1488 CompressionFieldParams {
1489 rle_threshold: Some(0.5),
1490 compression: Some("lz4".to_string()),
1491 ..Default::default()
1492 },
1493 );
1494
1495 params.columns.insert(
1497 "user_id".to_string(),
1498 CompressionFieldParams {
1499 rle_threshold: Some(0.2),
1500 compression: Some("zstd".to_string()),
1501 compression_level: Some(6),
1502 bss: None,
1503 minichunk_size: None,
1504 },
1505 );
1506
1507 let strategy = DefaultCompressionStrategy::with_params(params);
1508
1509 let merged = strategy
1511 .params
1512 .get_field_params("user_id", &DataType::Int32);
1513
1514 assert_eq!(merged.rle_threshold, Some(0.2));
1516 assert_eq!(merged.compression, Some("zstd".to_string()));
1517 assert_eq!(merged.compression_level, Some(6));
1518
1519 let merged = strategy
1521 .params
1522 .get_field_params("other_field", &DataType::Int32);
1523 assert_eq!(merged.rle_threshold, Some(0.5));
1524 assert_eq!(merged.compression, Some("lz4".to_string()));
1525 assert_eq!(merged.compression_level, None);
1526 }
1527
1528 #[test]
1529 fn test_pattern_matching() {
1530 let mut params = CompressionParams::new();
1531
1532 params.columns.insert(
1534 "log_*".to_string(),
1535 CompressionFieldParams {
1536 compression: Some("zstd".to_string()),
1537 compression_level: Some(6),
1538 ..Default::default()
1539 },
1540 );
1541
1542 let strategy = DefaultCompressionStrategy::with_params(params);
1543
1544 let merged = strategy
1546 .params
1547 .get_field_params("log_messages", &DataType::Utf8);
1548 assert_eq!(merged.compression, Some("zstd".to_string()));
1549 assert_eq!(merged.compression_level, Some(6));
1550
1551 let merged = strategy
1553 .params
1554 .get_field_params("messages_log", &DataType::Utf8);
1555 assert_eq!(merged.compression, None);
1556 }
1557
1558 #[test]
1559 fn test_legacy_metadata_support() {
1560 let params = CompressionParams::new();
1561 let strategy = DefaultCompressionStrategy::with_params(params);
1562
1563 let mut metadata = HashMap::new();
1565 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1566 let mut field = create_test_field("some_column", DataType::Int32);
1567 field.metadata = metadata;
1568
1569 let data = create_fixed_width_block(32, 1000);
1570 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1571
1572 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1574 }
1575
1576 #[test]
1577 fn test_default_behavior() {
1578 let params = CompressionParams::new();
1580 let strategy = DefaultCompressionStrategy::with_params(params);
1581
1582 let field = create_test_field("random_column", DataType::Int32);
1583 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1585
1586 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1587 let debug_str = format!("{:?}", compressor);
1589 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1590 }
1591
1592 #[test]
1593 fn test_field_metadata_compression() {
1594 let params = CompressionParams::new();
1595 let strategy = DefaultCompressionStrategy::with_params(params);
1596
1597 let mut metadata = HashMap::new();
1599 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1600 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1601 let mut field = create_test_field("test_column", DataType::Int32);
1602 field.metadata = metadata;
1603
1604 let data = create_fixed_width_block(32, 1000);
1605 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1606
1607 let debug_str = format!("{:?}", compressor);
1609 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1610 }
1611
1612 #[test]
1613 fn test_field_metadata_rle_threshold() {
1614 let params = CompressionParams::new();
1615 let strategy = DefaultCompressionStrategy::with_params(params);
1616
1617 let mut metadata = HashMap::new();
1619 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1620 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1622 field.metadata = metadata;
1623
1624 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1627
1628 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1629
1630 let debug_str = format!("{:?}", compressor);
1632 assert!(debug_str.contains("RleEncoder"));
1633 }
1634
1635 #[test]
1636 fn test_field_metadata_override_params() {
1637 let mut params = CompressionParams::new();
1639 params.columns.insert(
1640 "test_column".to_string(),
1641 CompressionFieldParams {
1642 rle_threshold: Some(0.3),
1643 compression: Some("lz4".to_string()),
1644 compression_level: None,
1645 bss: None,
1646 minichunk_size: None,
1647 },
1648 );
1649
1650 let strategy = DefaultCompressionStrategy::with_params(params);
1651
1652 let mut metadata = HashMap::new();
1654 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1655 let mut field = create_test_field("test_column", DataType::Int32);
1656 field.metadata = metadata;
1657
1658 let data = create_fixed_width_block(32, 1000);
1659 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1660
1661 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1663 }
1664
1665 #[test]
1666 fn test_field_metadata_mixed_configuration() {
1667 let mut params = CompressionParams::new();
1669 params.types.insert(
1670 "Int32".to_string(),
1671 CompressionFieldParams {
1672 rle_threshold: Some(0.5),
1673 compression: Some("lz4".to_string()),
1674 ..Default::default()
1675 },
1676 );
1677
1678 let strategy = DefaultCompressionStrategy::with_params(params);
1679
1680 let mut metadata = HashMap::new();
1682 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1683 let mut field = create_test_field("test_column", DataType::Int32);
1684 field.metadata = metadata;
1685
1686 let data = create_fixed_width_block(32, 1000);
1687 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1688
1689 let debug_str = format!("{:?}", compressor);
1691 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1692 }
1693
1694 #[test]
1695 fn test_bss_field_metadata() {
1696 let params = CompressionParams::new();
1697 let strategy = DefaultCompressionStrategy::with_params(params);
1698
1699 let mut metadata = HashMap::new();
1701 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1702 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1703 let arrow_field =
1704 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1705 let field = Field::try_from(&arrow_field).unwrap();
1706
1707 let data = create_fixed_width_block(32, 100);
1709
1710 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1711 let debug_str = format!("{:?}", compressor);
1712 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1713 }
1714
1715 #[test]
1716 fn test_bss_with_compression() {
1717 let params = CompressionParams::new();
1718 let strategy = DefaultCompressionStrategy::with_params(params);
1719
1720 let mut metadata = HashMap::new();
1722 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1723 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1724 let arrow_field =
1725 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1726 let field = Field::try_from(&arrow_field).unwrap();
1727
1728 let data = create_fixed_width_block(64, 100);
1730
1731 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1732 let debug_str = format!("{:?}", compressor);
1733 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1735 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1736 }
1737
1738 #[test]
1739 #[cfg(any(feature = "lz4", feature = "zstd"))]
1740 fn test_general_block_decompression_fixed_width_v2_2() {
1741 let mut params = CompressionParams::new();
1743 params.columns.insert(
1744 "dict_values".to_string(),
1745 CompressionFieldParams {
1746 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1747 ..Default::default()
1748 },
1749 );
1750
1751 let mut strategy = DefaultCompressionStrategy::with_params(params);
1752 strategy.version = LanceFileVersion::V2_2;
1753
1754 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1755 let data = create_fixed_width_block(24, 1024);
1756 let DataBlock::FixedWidth(expected_block) = &data else {
1757 panic!("expected fixed width block");
1758 };
1759 let expected_bits = expected_block.bits_per_value;
1760 let expected_num_values = expected_block.num_values;
1761 let num_values = expected_num_values;
1762
1763 let (compressor, encoding) = strategy
1764 .create_block_compressor(&field, &data)
1765 .expect("general compression should be selected");
1766 match encoding.compression.as_ref() {
1767 Some(Compression::General(_)) => {}
1768 other => panic!("expected general compression, got {:?}", other),
1769 }
1770
1771 let compressed_buffer = compressor
1772 .compress(data.clone())
1773 .expect("write path general compression should succeed");
1774
1775 let decompressor = DefaultDecompressionStrategy::default()
1776 .create_block_decompressor(&encoding)
1777 .expect("general block decompressor should be created");
1778
1779 let decoded = decompressor
1780 .decompress(compressed_buffer, num_values)
1781 .expect("decompression should succeed");
1782
1783 match decoded {
1784 DataBlock::FixedWidth(block) => {
1785 assert_eq!(block.bits_per_value, expected_bits);
1786 assert_eq!(block.num_values, expected_num_values);
1787 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1788 }
1789 _ => panic!("expected fixed width block"),
1790 }
1791 }
1792
1793 #[test]
1794 #[cfg(any(feature = "lz4", feature = "zstd"))]
1795 fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1796 let mut params = CompressionParams::new();
1797 params.columns.insert(
1798 "dict_values".to_string(),
1799 CompressionFieldParams {
1800 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1801 ..Default::default()
1802 },
1803 );
1804
1805 let strategy =
1806 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1807 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1808 let data = create_fixed_width_block(24, 1024);
1809
1810 let (_compressor, encoding) = strategy
1811 .create_block_compressor(&field, &data)
1812 .expect("block compressor selection should succeed");
1813
1814 assert!(
1815 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1816 "general compression should not be selected for V2.1"
1817 );
1818 }
1819
1820 #[test]
1821 fn test_none_compression_disables_auto_general_block_compression() {
1822 let mut params = CompressionParams::new();
1823 params.columns.insert(
1824 "dict_values".to_string(),
1825 CompressionFieldParams {
1826 compression: Some("none".to_string()),
1827 ..Default::default()
1828 },
1829 );
1830
1831 let strategy =
1832 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1833 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1834 let data = create_fixed_width_block(24, 20_000);
1835
1836 assert!(
1837 data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1838 "test requires block size above automatic general compression threshold"
1839 );
1840
1841 let (_compressor, encoding) = strategy
1842 .create_block_compressor(&field, &data)
1843 .expect("block compressor selection should succeed");
1844
1845 assert!(
1846 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1847 "compression=none should disable automatic block general compression"
1848 );
1849 }
1850
1851 #[test]
1852 fn test_rle_block_used_for_version_v2_2() {
1853 let field = create_test_field("test_repdef", DataType::UInt16);
1854
1855 let num_values = 1000u64;
1857 let mut data = Vec::with_capacity(num_values as usize);
1858 for i in 0..10 {
1859 for _ in 0..100 {
1860 data.push(i as u16);
1861 }
1862 }
1863
1864 let mut block = FixedWidthDataBlock {
1865 bits_per_value: 16,
1866 data: LanceBuffer::reinterpret_vec(data),
1867 num_values,
1868 block_info: BlockInfo::default(),
1869 };
1870
1871 block.compute_stat();
1872
1873 let data_block = DataBlock::FixedWidth(block);
1874
1875 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1876 .with_version(LanceFileVersion::V2_2);
1877
1878 let (compressor, _) = strategy
1879 .create_block_compressor(&field, &data_block)
1880 .unwrap();
1881
1882 let debug_str = format!("{:?}", compressor);
1883 assert!(debug_str.contains("RleEncoder"));
1884 }
1885
1886 #[test]
1887 fn test_rle_block_not_used_for_version_v2_1() {
1888 let field = create_test_field("test_repdef", DataType::UInt16);
1889
1890 let num_values = 1000u64;
1892 let mut data = Vec::with_capacity(num_values as usize);
1893 for i in 0..10 {
1894 for _ in 0..100 {
1895 data.push(i as u16);
1896 }
1897 }
1898
1899 let mut block = FixedWidthDataBlock {
1900 bits_per_value: 16,
1901 data: LanceBuffer::reinterpret_vec(data),
1902 num_values,
1903 block_info: BlockInfo::default(),
1904 };
1905
1906 block.compute_stat();
1907
1908 let data_block = DataBlock::FixedWidth(block);
1909
1910 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1911 .with_version(LanceFileVersion::V2_1);
1912
1913 let (compressor, _) = strategy
1914 .create_block_compressor(&field, &data_block)
1915 .unwrap();
1916
1917 let debug_str = format!("{:?}", compressor);
1918 assert!(
1919 !debug_str.contains("RleEncoder"),
1920 "RLE should not be used for V2.1"
1921 );
1922 }
1923}