1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleDecompressor, RleEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 ProtobufUtils21,
60 pb21::{CompressiveEncoding, compressive_encoding::Compression},
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use arrow_schema::DataType;
68use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
69use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115 fn create_block_compressor(
117 &self,
118 field: &Field,
119 data: &DataBlock,
120 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122 fn create_per_value(
124 &self,
125 field: &Field,
126 data: &DataBlock,
127 ) -> Result<Box<dyn PerValueCompressor>>;
128
129 fn create_miniblock_compressor(
131 &self,
132 field: &Field,
133 data: &DataBlock,
134 ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139 params: CompressionParams,
141 version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146 data: &FixedWidthDataBlock,
147 params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152 return None;
153 }
154
155 let mode = params.bss.unwrap_or(BssMode::Auto);
156 if should_use_bss(data, mode) {
158 return Some(Box::new(ByteStreamSplitEncoder::new(
159 data.bits_per_value as usize,
160 )));
161 }
162 None
163}
164
165fn try_rle_for_mini_block(
166 data: &FixedWidthDataBlock,
167 params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169 let bits = data.bits_per_value;
170 if !matches!(bits, 8 | 16 | 32 | 64) {
171 return None;
172 }
173
174 let type_size = bits / 8;
175 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176 let threshold = params
177 .rle_threshold
178 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180 let passes_threshold = match params.rle_threshold {
183 Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184 None => true,
185 };
186
187 if !passes_threshold {
188 return None;
189 }
190
191 let num_values = data.num_values;
197 let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
198
199 let raw_bytes = (num_values as u128) * (type_size as u128);
200 let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202 if rle_bytes < raw_bytes {
203 #[cfg(feature = "bitpacking")]
204 {
205 if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
206 && (bitpack_bytes as u128) < rle_bytes
207 {
208 return None;
209 }
210 }
211 return Some(Box::new(RleEncoder::new()));
212 }
213 None
214}
215
216fn try_rle_for_block(
217 data: &FixedWidthDataBlock,
218 version: LanceFileVersion,
219 params: &CompressionFieldParams,
220) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
221 if version < LanceFileVersion::V2_2 {
222 return None;
223 }
224
225 let bits = data.bits_per_value;
226 if !matches!(bits, 8 | 16 | 32 | 64) {
227 return None;
228 }
229
230 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
231 let threshold = params
232 .rle_threshold
233 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
234
235 if (run_count as f64) < (data.num_values as f64) * threshold {
236 let compressor = Box::new(RleEncoder::new());
237 let encoding = ProtobufUtils21::rle(
238 ProtobufUtils21::flat(bits, None),
239 ProtobufUtils21::flat(8, None),
240 );
241 return Some((compressor, encoding));
242 }
243 None
244}
245
246fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
247 #[cfg(feature = "bitpacking")]
248 {
249 let bits = _data.bits_per_value;
250 if estimate_inline_bitpacking_bytes(_data).is_some() {
251 return Some(Box::new(InlineBitpacking::new(bits)));
252 }
253 None
254 }
255 #[cfg(not(feature = "bitpacking"))]
256 {
257 None
258 }
259}
260
261#[cfg(feature = "bitpacking")]
262fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
263 use arrow_array::cast::AsArray;
264
265 let bits = data.bits_per_value;
266 if !matches!(bits, 8 | 16 | 32 | 64) {
267 return None;
268 }
269 if data.num_values == 0 {
270 return None;
271 }
272
273 let bit_widths = data.expect_stat(Stat::BitWidth);
274 let widths = bit_widths.as_primitive::<UInt64Type>();
275
276 let words_per_chunk: u128 = 1;
277 let word_bytes: u128 = (bits / 8) as u128;
278 let mut total_words: u128 = 0;
279 for i in 0..widths.len() {
280 let bit_width = widths.value(i) as u128;
281 let packed_words = (1024u128 * bit_width) / (bits as u128);
282 total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
283 }
284
285 let estimated_bytes = total_words.saturating_mul(word_bytes);
286 let raw_bytes = data.data_size() as u128;
287
288 if estimated_bytes >= raw_bytes {
289 return None;
290 }
291
292 u64::try_from(estimated_bytes).ok()
293}
294
295fn try_bitpack_for_block(
296 data: &FixedWidthDataBlock,
297) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
298 let bits = data.bits_per_value;
299 if !matches!(bits, 8 | 16 | 32 | 64) {
300 return None;
301 }
302
303 let bit_widths = data.expect_stat(Stat::BitWidth);
304 let widths = bit_widths.as_primitive::<UInt64Type>();
305 let max_bit_width = *widths.values().iter().max().unwrap();
306
307 let too_small =
308 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
309
310 if too_small {
311 return None;
312 }
313
314 if data.num_values <= 1024 {
315 let compressor = Box::new(InlineBitpacking::new(bits));
316 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
317 Some((compressor, encoding))
318 } else {
319 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
320 let encoding = ProtobufUtils21::out_of_line_bitpacking(
321 bits,
322 ProtobufUtils21::flat(max_bit_width, None),
323 );
324 Some((compressor, encoding))
325 }
326}
327
328fn maybe_wrap_general_for_mini_block(
329 inner: Box<dyn MiniBlockCompressor>,
330 params: &CompressionFieldParams,
331) -> Result<Box<dyn MiniBlockCompressor>> {
332 match params.compression.as_deref() {
333 None | Some("none") | Some("fsst") => Ok(inner),
334 Some(raw) => {
335 let scheme = CompressionScheme::from_str(raw)
336 .map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
337 let cfg = CompressionConfig::new(scheme, params.compression_level);
338 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
339 }
340 }
341}
342
343fn try_general_compression(
344 version: LanceFileVersion,
345 field_params: &CompressionFieldParams,
346 data: &DataBlock,
347) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
348 if field_params.compression.as_deref() == Some("none") {
350 return Ok(None);
351 }
352
353 if let Some(compression_scheme) = &field_params.compression
356 && version >= LanceFileVersion::V2_2
357 {
358 let scheme: CompressionScheme = compression_scheme.parse()?;
359 let config = CompressionConfig::new(scheme, field_params.compression_level);
360 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
361 return Ok(Some((compressor, config)));
362 }
363
364 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
366 && version >= LanceFileVersion::V2_2
367 {
368 let compressor = Box::new(CompressedBufferEncoder::default());
369 let config = compressor.compressor.config();
370 return Ok(Some((compressor, config)));
371 }
372
373 Ok(None)
374}
375
376impl DefaultCompressionStrategy {
377 pub fn new() -> Self {
379 Self::default()
380 }
381
382 pub fn with_params(params: CompressionParams) -> Self {
384 Self {
385 params,
386 version: LanceFileVersion::default(),
387 }
388 }
389
390 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
392 self.version = version;
393 self
394 }
395
396 fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
398 let mut params = CompressionFieldParams::default();
399
400 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
402 params.compression = Some(compression.clone());
403 }
404
405 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
407 params.compression_level = level.parse().ok();
408 }
409
410 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
412 params.rle_threshold = threshold.parse().ok();
413 }
414
415 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
417 match BssMode::parse(bss_str) {
418 Some(mode) => params.bss = Some(mode),
419 None => {
420 log::warn!("Invalid BSS mode '{}', using default", bss_str);
421 }
422 }
423 }
424
425 if let Some(minichunk_size_str) = field
427 .metadata
428 .get(super::constants::MINICHUNK_SIZE_META_KEY)
429 {
430 if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
431 if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
433 log::warn!(
434 "minichunk_size '{}' too large for version '{}', using default",
435 minichunk_size,
436 version
437 );
438 } else {
439 params.minichunk_size = Some(minichunk_size);
440 }
441 } else {
442 log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
443 }
444 }
445
446 params
447 }
448
449 fn build_fixed_width_compressor(
450 &self,
451 params: &CompressionFieldParams,
452 data: &FixedWidthDataBlock,
453 ) -> Result<Box<dyn MiniBlockCompressor>> {
454 if params.compression.as_deref() == Some("none") {
455 return Ok(Box::new(ValueEncoder::default()));
456 }
457
458 let base = try_bss_for_mini_block(data, params)
459 .or_else(|| try_rle_for_mini_block(data, params))
460 .or_else(|| try_bitpack_for_mini_block(data))
461 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
462
463 maybe_wrap_general_for_mini_block(base, params)
464 }
465
466 fn build_variable_width_compressor(
468 &self,
469 field: &Field,
470 data: &VariableWidthBlock,
471 ) -> Result<Box<dyn MiniBlockCompressor>> {
472 let params = self.get_merged_field_params(field);
473 let compression = params.compression.as_deref();
474 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
475 return Err(Error::invalid_input(format!(
476 "Variable width compression not supported for {} bit offsets",
477 data.bits_per_offset
478 )));
479 }
480
481 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
483 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
484
485 if compression == Some("none") {
487 return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
488 }
489
490 let use_fsst = compression == Some("fsst")
491 || (compression.is_none()
492 && !matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
493 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
494 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
495
496 let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
498 Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
499 } else {
500 Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
501 };
502
503 if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
505 let scheme: CompressionScheme = compression_scheme.parse()?;
506 let config = CompressionConfig::new(scheme, params.compression_level);
507 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
508 }
509
510 Ok(base_encoder)
511 }
512
513 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
516 let mut field_params = self
517 .params
518 .get_field_params(&field.name, &field.data_type());
519
520 let metadata_params = Self::parse_field_metadata(field, &self.version);
522 field_params.merge(&metadata_params);
523
524 field_params
525 }
526}
527
528impl CompressionStrategy for DefaultCompressionStrategy {
529 fn create_miniblock_compressor(
530 &self,
531 field: &Field,
532 data: &DataBlock,
533 ) -> Result<Box<dyn MiniBlockCompressor>> {
534 match data {
535 DataBlock::FixedWidth(fixed_width_data) => {
536 let field_params = self.get_merged_field_params(field);
537 self.build_fixed_width_compressor(&field_params, fixed_width_data)
538 }
539 DataBlock::VariableWidth(variable_width_data) => {
540 self.build_variable_width_compressor(field, variable_width_data)
541 }
542 DataBlock::Struct(struct_data_block) => {
543 if struct_data_block.has_variable_width_child() {
546 return Err(Error::invalid_input(
547 "Packed struct mini-block encoding supports only fixed-width children",
548 ));
549 }
550 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
551 }
552 DataBlock::FixedSizeList(_) => {
553 Ok(Box::new(ValueEncoder::default()))
561 }
562 _ => Err(Error::not_supported_source(
563 format!(
564 "Mini-block compression not yet supported for block type {}",
565 data.name()
566 )
567 .into(),
568 )),
569 }
570 }
571
572 fn create_per_value(
573 &self,
574 field: &Field,
575 data: &DataBlock,
576 ) -> Result<Box<dyn PerValueCompressor>> {
577 let field_params = self.get_merged_field_params(field);
578
579 match data {
580 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
581 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
582 DataBlock::Struct(struct_block) => {
583 if field.children.len() != struct_block.children.len() {
584 return Err(Error::invalid_input(
585 "Struct field metadata does not match data block children",
586 ));
587 }
588 let has_variable_child = struct_block.has_variable_width_child();
589 if has_variable_child {
590 if self.version < LanceFileVersion::V2_2 {
591 return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
592 }
593 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
594 self.clone(),
595 field.children.clone(),
596 )))
597 } else {
598 Err(Error::invalid_input(
599 "Packed struct per-value compression should not be used for fixed-width-only structs",
600 ))
601 }
602 }
603 DataBlock::VariableWidth(variable_width) => {
604 let compression = field_params.compression.as_deref();
605 if compression == Some("none") {
607 return Ok(Box::new(VariableEncoder::default()));
608 }
609
610 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
611 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
612
613 let per_value_requested =
618 compression.is_some_and(|compression| compression != "fsst");
619
620 if (max_len > 32 * 1024 || per_value_requested)
621 && data_size >= FSST_LEAST_INPUT_SIZE as u64
622 {
623 return Ok(Box::new(CompressedBufferEncoder::default()));
624 }
625
626 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
627 let variable_compression = Box::new(VariableEncoder::default());
628 let use_fsst = compression == Some("fsst")
629 || (compression.is_none()
630 && !matches!(
631 field.data_type(),
632 DataType::Binary | DataType::LargeBinary
633 )
634 && max_len >= FSST_LEAST_INPUT_MAX_LENGTH
635 && data_size >= FSST_LEAST_INPUT_SIZE as u64);
636
637 if use_fsst {
639 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
640 } else {
641 Ok(variable_compression)
642 }
643 } else {
644 panic!(
645 "Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
646 variable_width.bits_per_offset
647 );
648 }
649 }
650 _ => unreachable!(
651 "Per-value compression not yet supported for block type: {}",
652 data.name()
653 ),
654 }
655 }
656
657 fn create_block_compressor(
658 &self,
659 field: &Field,
660 data: &DataBlock,
661 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
662 let field_params = self.get_merged_field_params(field);
663
664 match data {
665 DataBlock::FixedWidth(fixed_width) => {
666 if let Some((compressor, encoding)) =
667 try_rle_for_block(fixed_width, self.version, &field_params)
668 {
669 return Ok((compressor, encoding));
670 }
671 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
672 return Ok((compressor, encoding));
673 }
674
675 if let Some((compressor, config)) =
677 try_general_compression(self.version, &field_params, data)?
678 {
679 let encoding = ProtobufUtils21::wrapped(
680 config,
681 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
682 )?;
683 return Ok((compressor, encoding));
684 }
685
686 let encoder = Box::new(ValueEncoder::default());
687 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
688 Ok((encoder, encoding))
689 }
690 DataBlock::VariableWidth(variable_width) => {
691 if let Some((compressor, config)) =
693 try_general_compression(self.version, &field_params, data)?
694 {
695 let encoding = ProtobufUtils21::wrapped(
696 config,
697 ProtobufUtils21::variable(
698 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
699 None,
700 ),
701 )?;
702 return Ok((compressor, encoding));
703 }
704
705 let encoder = Box::new(VariableEncoder::default());
706 let encoding = ProtobufUtils21::variable(
707 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
708 None,
709 );
710 Ok((encoder, encoding))
711 }
712 _ => unreachable!(),
713 }
714 }
715}
716
717pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
718 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
719}
720
721pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
722 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
724 fn bits_per_value(&self) -> u64;
728}
729
730pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
731 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
733}
734
735pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
736 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
737}
738
739pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
740 fn create_miniblock_decompressor(
741 &self,
742 description: &CompressiveEncoding,
743 decompression_strategy: &dyn DecompressionStrategy,
744 ) -> Result<Box<dyn MiniBlockDecompressor>>;
745
746 fn create_fixed_per_value_decompressor(
747 &self,
748 description: &CompressiveEncoding,
749 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
750
751 fn create_variable_per_value_decompressor(
752 &self,
753 description: &CompressiveEncoding,
754 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
755
756 fn create_block_decompressor(
757 &self,
758 description: &CompressiveEncoding,
759 ) -> Result<Box<dyn BlockDecompressor>>;
760}
761
762#[derive(Debug, Default)]
763pub struct DefaultDecompressionStrategy {}
764
765impl DecompressionStrategy for DefaultDecompressionStrategy {
766 fn create_miniblock_decompressor(
767 &self,
768 description: &CompressiveEncoding,
769 decompression_strategy: &dyn DecompressionStrategy,
770 ) -> Result<Box<dyn MiniBlockDecompressor>> {
771 match description.compression.as_ref().unwrap() {
772 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
773 #[cfg(feature = "bitpacking")]
774 Compression::InlineBitpacking(description) => {
775 Ok(Box::new(InlineBitpacking::from_description(description)))
776 }
777 #[cfg(not(feature = "bitpacking"))]
778 Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
779 "this runtime was not built with bitpacking support".into(),
780 )),
781 Compression::Variable(variable) => {
782 let Compression::Flat(offsets) = variable
783 .offsets
784 .as_ref()
785 .unwrap()
786 .compression
787 .as_ref()
788 .unwrap()
789 else {
790 panic!("Variable compression only supports flat offsets")
791 };
792 Ok(Box::new(BinaryMiniBlockDecompressor::new(
793 offsets.bits_per_value as u8,
794 )))
795 }
796 Compression::Fsst(description) => {
797 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
798 description.values.as_ref().unwrap(),
799 decompression_strategy,
800 )?;
801 Ok(Box::new(FsstMiniBlockDecompressor::new(
802 description,
803 inner_decompressor,
804 )))
805 }
806 Compression::PackedStruct(description) => Ok(Box::new(
807 PackedStructFixedWidthMiniBlockDecompressor::new(description),
808 )),
809 Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
810 "variable packed struct decoding is not yet implemented".into(),
811 )),
812 Compression::FixedSizeList(fsl) => {
813 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
816 }
817 Compression::Rle(rle) => {
818 let bits_per_value = validate_rle_compression(rle)?;
819 Ok(Box::new(RleDecompressor::new(bits_per_value)))
820 }
821 Compression::ByteStreamSplit(bss) => {
822 let Compression::Flat(values) =
823 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
824 else {
825 panic!("ByteStreamSplit compression only supports flat values")
826 };
827 Ok(Box::new(ByteStreamSplitDecompressor::new(
828 values.bits_per_value as usize,
829 )))
830 }
831 Compression::General(general) => {
832 let inner_decompressor = self.create_miniblock_decompressor(
834 general.values.as_ref().ok_or_else(|| {
835 Error::invalid_input("GeneralMiniBlock missing inner encoding")
836 })?,
837 decompression_strategy,
838 )?;
839
840 let compression = general.compression.as_ref().ok_or_else(|| {
842 Error::invalid_input("GeneralMiniBlock missing compression config")
843 })?;
844
845 let scheme = compression.scheme().try_into()?;
846
847 let compression_config = CompressionConfig::new(scheme, compression.level);
848
849 Ok(Box::new(GeneralMiniBlockDecompressor::new(
850 inner_decompressor,
851 compression_config,
852 )))
853 }
854 _ => todo!(),
855 }
856 }
857
858 fn create_fixed_per_value_decompressor(
859 &self,
860 description: &CompressiveEncoding,
861 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
862 match description.compression.as_ref().unwrap() {
863 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
864 constant
865 .value
866 .as_ref()
867 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
868 ))),
869 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
870 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
871 _ => todo!("fixed-per-value decompressor for {:?}", description),
872 }
873 }
874
875 fn create_variable_per_value_decompressor(
876 &self,
877 description: &CompressiveEncoding,
878 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
879 match description.compression.as_ref().unwrap() {
880 Compression::Variable(variable) => {
881 let Compression::Flat(offsets) = variable
882 .offsets
883 .as_ref()
884 .unwrap()
885 .compression
886 .as_ref()
887 .unwrap()
888 else {
889 panic!("Variable compression only supports flat offsets")
890 };
891 assert!(offsets.bits_per_value < u8::MAX as u64);
892 Ok(Box::new(VariableDecoder::default()))
893 }
894 Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
895 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
896 Box::new(VariableDecoder::default()),
897 ))),
898 Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
899 general.compression.as_ref().expect_ok()?.scheme(),
900 )?)),
901 Compression::VariablePackedStruct(description) => {
902 let mut fields = Vec::with_capacity(description.fields.len());
903 for field in &description.fields {
904 let value_encoding = field.value.as_ref().ok_or_else(|| {
905 Error::invalid_input("VariablePackedStruct field is missing value encoding")
906 })?;
907 let decoder = match field.layout.as_ref().ok_or_else(|| {
908 Error::invalid_input("VariablePackedStruct field is missing layout details")
909 })? {
910 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
911 bits_per_value,
912 ) => {
913 let decompressor =
914 self.create_fixed_per_value_decompressor(value_encoding)?;
915 VariablePackedStructFieldDecoder {
916 kind: VariablePackedStructFieldKind::Fixed {
917 bits_per_value: *bits_per_value,
918 decompressor: Arc::from(decompressor),
919 },
920 }
921 }
922 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
923 bits_per_length,
924 ) => {
925 let decompressor =
926 self.create_variable_per_value_decompressor(value_encoding)?;
927 VariablePackedStructFieldDecoder {
928 kind: VariablePackedStructFieldKind::Variable {
929 bits_per_length: *bits_per_length,
930 decompressor: Arc::from(decompressor),
931 },
932 }
933 }
934 };
935 fields.push(decoder);
936 }
937 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
938 fields,
939 )))
940 }
941 _ => todo!("variable-per-value decompressor for {:?}", description),
942 }
943 }
944
945 fn create_block_decompressor(
946 &self,
947 description: &CompressiveEncoding,
948 ) -> Result<Box<dyn BlockDecompressor>> {
949 match description.compression.as_ref().unwrap() {
950 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
951 InlineBitpacking::from_description(inline_bitpacking),
952 )),
953 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
954 Compression::Constant(constant) => {
955 let scalar = constant
956 .value
957 .as_ref()
958 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
959 Ok(Box::new(ConstantDecompressor::new(scalar)))
960 }
961 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
962 Compression::FixedSizeList(fsl) => {
963 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
964 }
965 Compression::OutOfLineBitpacking(out_of_line) => {
966 let compressed_bit_width = match out_of_line
968 .values
969 .as_ref()
970 .unwrap()
971 .compression
972 .as_ref()
973 .unwrap()
974 {
975 Compression::Flat(flat) => flat.bits_per_value,
976 _ => {
977 return Err(Error::invalid_input_source(
978 "OutOfLineBitpacking values must use Flat encoding".into(),
979 ));
980 }
981 };
982 Ok(Box::new(OutOfLineBitpacking::new(
983 compressed_bit_width,
984 out_of_line.uncompressed_bits_per_value,
985 )))
986 }
987 Compression::General(general) => {
988 let inner_desc = general
989 .values
990 .as_ref()
991 .ok_or_else(|| {
992 Error::invalid_input("General compression missing inner encoding")
993 })?
994 .as_ref();
995 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
996
997 let compression = general.compression.as_ref().ok_or_else(|| {
998 Error::invalid_input("General compression missing compression config")
999 })?;
1000 let scheme = compression.scheme().try_into()?;
1001 let config = CompressionConfig::new(scheme, compression.level);
1002 let general_decompressor =
1003 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1004
1005 Ok(Box::new(general_decompressor))
1006 }
1007 Compression::Rle(rle) => {
1008 let bits_per_value = validate_rle_compression(rle)?;
1009 Ok(Box::new(RleDecompressor::new(bits_per_value)))
1010 }
1011 _ => todo!(),
1012 }
1013 }
1014}
1015fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
1017 let values = rle
1018 .values
1019 .as_ref()
1020 .ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
1021 let run_lengths = rle
1022 .run_lengths
1023 .as_ref()
1024 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
1025
1026 let values = values
1027 .compression
1028 .as_ref()
1029 .ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
1030 let Compression::Flat(values) = values else {
1031 return Err(Error::invalid_input(
1032 "RLE compression only supports flat values",
1033 ));
1034 };
1035
1036 let run_lengths = run_lengths
1037 .compression
1038 .as_ref()
1039 .ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
1040 let Compression::Flat(run_lengths) = run_lengths else {
1041 return Err(Error::invalid_input(
1042 "RLE compression only supports flat run lengths",
1043 ));
1044 };
1045
1046 if run_lengths.bits_per_value != 8 {
1047 return Err(Error::invalid_input(format!(
1048 "RLE compression only supports 8-bit run lengths, got {}",
1049 run_lengths.bits_per_value
1050 )));
1051 }
1052
1053 Ok(values.bits_per_value)
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058 use super::*;
1059 use crate::buffer::LanceBuffer;
1060 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1061 use crate::statistics::ComputeStat;
1062 use crate::testing::extract_array_encoding_chain;
1063 use arrow_schema::{DataType, Field as ArrowField};
1064 use std::collections::HashMap;
1065
1066 fn create_test_field(name: &str, data_type: DataType) -> Field {
1067 let arrow_field = ArrowField::new(name, data_type, true);
1068 let mut field = Field::try_from(&arrow_field).unwrap();
1069 field.id = -1;
1070 field
1071 }
1072
1073 fn create_fixed_width_block_with_stats(
1074 bits_per_value: u64,
1075 num_values: u64,
1076 run_count: u64,
1077 ) -> DataBlock {
1078 let bytes_per_value = (bits_per_value / 8) as usize;
1080 let total_bytes = bytes_per_value * num_values as usize;
1081 let mut data = vec![0u8; total_bytes];
1082
1083 let values_per_run = (num_values / run_count).max(1);
1085 let mut run_value = 0u8;
1086
1087 for i in 0..num_values as usize {
1088 if i % values_per_run as usize == 0 {
1089 run_value = run_value.wrapping_add(17); }
1091 for j in 0..bytes_per_value {
1093 let byte_offset = i * bytes_per_value + j;
1094 if byte_offset < data.len() {
1095 data[byte_offset] = run_value.wrapping_add(j as u8);
1096 }
1097 }
1098 }
1099
1100 let mut block = FixedWidthDataBlock {
1101 bits_per_value,
1102 data: LanceBuffer::reinterpret_vec(data),
1103 num_values,
1104 block_info: BlockInfo::default(),
1105 };
1106
1107 use crate::statistics::ComputeStat;
1109 block.compute_stat();
1110
1111 DataBlock::FixedWidth(block)
1112 }
1113
1114 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1115 let bytes_per_value = (bits_per_value / 8) as usize;
1117 let total_bytes = bytes_per_value * num_values as usize;
1118 let mut data = vec![0u8; total_bytes];
1119
1120 for i in 0..num_values as usize {
1122 let byte_offset = i * bytes_per_value;
1123 if byte_offset < data.len() {
1124 data[byte_offset] = (i % 256) as u8;
1125 }
1126 }
1127
1128 let mut block = FixedWidthDataBlock {
1129 bits_per_value,
1130 data: LanceBuffer::reinterpret_vec(data),
1131 num_values,
1132 block_info: BlockInfo::default(),
1133 };
1134
1135 use crate::statistics::ComputeStat;
1137 block.compute_stat();
1138
1139 DataBlock::FixedWidth(block)
1140 }
1141
1142 fn create_variable_width_block(
1143 bits_per_offset: u8,
1144 num_values: u64,
1145 avg_value_size: usize,
1146 ) -> DataBlock {
1147 use crate::statistics::ComputeStat;
1148
1149 let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1151 let mut current_offset = 0i64;
1152 offsets.push(current_offset);
1153
1154 for i in 0..num_values {
1156 let value_size = if avg_value_size == 0 {
1157 1
1158 } else {
1159 ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1160 .min(avg_value_size * 2)
1161 };
1162 current_offset += value_size as i64;
1163 offsets.push(current_offset);
1164 }
1165
1166 let total_data_size = current_offset as usize;
1168 let mut data = vec![0u8; total_data_size];
1169
1170 for i in 0..num_values {
1172 let start_offset = offsets[i as usize] as usize;
1173 let end_offset = offsets[(i + 1) as usize] as usize;
1174
1175 let content = (i % 256) as u8;
1176 for j in 0..end_offset - start_offset {
1177 data[start_offset + j] = content.wrapping_add(j as u8);
1178 }
1179 }
1180
1181 let offsets_buffer = match bits_per_offset {
1183 32 => {
1184 let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1185 LanceBuffer::reinterpret_vec(offsets_32)
1186 }
1187 64 => LanceBuffer::reinterpret_vec(offsets),
1188 _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1189 };
1190
1191 let mut block = VariableWidthBlock {
1192 data: LanceBuffer::from(data),
1193 offsets: offsets_buffer,
1194 bits_per_offset,
1195 num_values,
1196 block_info: BlockInfo::default(),
1197 };
1198
1199 block.compute_stat();
1200 DataBlock::VariableWidth(block)
1201 }
1202
1203 fn create_fsst_candidate_variable_width_block() -> DataBlock {
1204 create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
1205 }
1206
1207 #[test]
1208 fn test_parameter_based_compression() {
1209 let mut params = CompressionParams::new();
1210
1211 params.columns.insert(
1213 "*_id".to_string(),
1214 CompressionFieldParams {
1215 rle_threshold: Some(0.3),
1216 compression: Some("lz4".to_string()),
1217 compression_level: None,
1218 bss: Some(BssMode::Off), minichunk_size: None,
1220 },
1221 );
1222
1223 let strategy = DefaultCompressionStrategy::with_params(params);
1224 let field = create_test_field("user_id", DataType::Int32);
1225
1226 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1231 let debug_str = format!("{:?}", compressor);
1233
1234 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1236 assert!(debug_str.contains("RleEncoder"));
1237 }
1238
1239 #[test]
1240 fn test_type_level_parameters() {
1241 let mut params = CompressionParams::new();
1242
1243 params.types.insert(
1245 "Int32".to_string(),
1246 CompressionFieldParams {
1247 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1249 compression_level: Some(3),
1250 bss: Some(BssMode::Off), minichunk_size: None,
1252 },
1253 );
1254
1255 let strategy = DefaultCompressionStrategy::with_params(params);
1256 let field = create_test_field("some_column", DataType::Int32);
1257 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1259
1260 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1261 assert!(format!("{:?}", compressor).contains("RleEncoder"));
1263 }
1264
1265 #[test]
1268 #[cfg(feature = "bitpacking")]
1269 fn test_block_bitpacks_with_zero_segment() {
1270 let strategy = DefaultCompressionStrategy::new();
1271 let field = create_test_field("levels", DataType::UInt16);
1272
1273 let mut values: Vec<u16> = vec![0; 1024];
1275 values.extend(std::iter::repeat_n(1u16, 1024));
1276 let mut block = FixedWidthDataBlock {
1277 bits_per_value: 16,
1278 data: LanceBuffer::reinterpret_vec(values),
1279 num_values: 2048,
1280 block_info: BlockInfo::default(),
1281 };
1282 block.compute_stat();
1283 let data = DataBlock::FixedWidth(block);
1284
1285 let (compressor, _encoding) = strategy.create_block_compressor(&field, &data).unwrap();
1286 let debug_str = format!("{:?}", compressor);
1287 assert!(
1288 debug_str.contains("OutOfLineBitpacking"),
1289 "expected OutOfLineBitpacking, got: {debug_str}"
1290 );
1291 }
1292
1293 #[test]
1294 #[cfg(feature = "bitpacking")]
1295 fn test_low_cardinality_prefers_bitpacking_over_rle() {
1296 let strategy = DefaultCompressionStrategy::new();
1297 let field = create_test_field("int_score", DataType::Int64);
1298
1299 let mut values: Vec<u64> = Vec::with_capacity(256);
1302 for run_idx in 0..64 {
1303 let value = match run_idx % 3 {
1304 0 => 3u64,
1305 1 => 4u64,
1306 _ => 5u64,
1307 };
1308 values.extend(std::iter::repeat_n(value, 4));
1309 }
1310
1311 let mut block = FixedWidthDataBlock {
1312 bits_per_value: 64,
1313 data: LanceBuffer::reinterpret_vec(values),
1314 num_values: 256,
1315 block_info: BlockInfo::default(),
1316 };
1317
1318 use crate::statistics::ComputeStat;
1319 block.compute_stat();
1320
1321 let data = DataBlock::FixedWidth(block);
1322 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1323 let debug_str = format!("{:?}", compressor);
1324 assert!(
1325 debug_str.contains("InlineBitpacking"),
1326 "expected InlineBitpacking, got: {debug_str}"
1327 );
1328 assert!(
1329 !debug_str.contains("RleEncoder"),
1330 "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1331 );
1332 }
1333
1334 fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1335 let chain = extract_array_encoding_chain(encoding);
1336 if variable {
1337 assert_eq!(chain.len(), 2);
1338 assert_eq!(chain.first().unwrap().as_str(), "variable");
1339 assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1340 } else {
1341 assert_eq!(chain.len(), 1);
1342 assert_eq!(chain.first().unwrap().as_str(), "flat");
1343 }
1344 }
1345
1346 #[test]
1347 fn test_none_compression() {
1348 let mut params = CompressionParams::new();
1349
1350 params.columns.insert(
1352 "embeddings".to_string(),
1353 CompressionFieldParams {
1354 compression: Some("none".to_string()),
1355 ..Default::default()
1356 },
1357 );
1358
1359 let strategy = DefaultCompressionStrategy::with_params(params);
1360 let field = create_test_field("embeddings", DataType::Float32);
1361 let fixed_data = create_fixed_width_block(32, 1000);
1362 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1363
1364 let compressor = strategy
1366 .create_miniblock_compressor(&field, &fixed_data)
1367 .unwrap();
1368 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1369 check_uncompressed_encoding(&encoding, false);
1370 let compressor = strategy
1371 .create_miniblock_compressor(&field, &variable_data)
1372 .unwrap();
1373 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1374 check_uncompressed_encoding(&encoding, true);
1375
1376 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1378 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1379 check_uncompressed_encoding(&encoding, false);
1380 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1381 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1382 check_uncompressed_encoding(&encoding, true);
1383 }
1384
1385 #[test]
1386 fn test_field_metadata_none_compression() {
1387 let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1389 let mut metadata = HashMap::new();
1390 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1391 arrow_field = arrow_field.with_metadata(metadata);
1392 let field = Field::try_from(&arrow_field).unwrap();
1393
1394 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1395
1396 let fixed_data = create_fixed_width_block(32, 1000);
1398 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1399
1400 let compressor = strategy
1401 .create_miniblock_compressor(&field, &fixed_data)
1402 .unwrap();
1403 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1404 check_uncompressed_encoding(&encoding, false);
1405
1406 let compressor = strategy
1407 .create_miniblock_compressor(&field, &variable_data)
1408 .unwrap();
1409 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1410 check_uncompressed_encoding(&encoding, true);
1411
1412 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1414 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1415 check_uncompressed_encoding(&encoding, false);
1416
1417 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1418 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1419 check_uncompressed_encoding(&encoding, true);
1420 }
1421
1422 #[test]
1423 fn test_auto_fsst_disabled_for_binary_fields() {
1424 let strategy = DefaultCompressionStrategy::new();
1425 let field = create_test_field("bytes", DataType::Binary);
1426 let variable_data = create_fsst_candidate_variable_width_block();
1427
1428 let miniblock = strategy
1429 .create_miniblock_compressor(&field, &variable_data)
1430 .unwrap();
1431 let miniblock_debug = format!("{:?}", miniblock);
1432 assert!(
1433 miniblock_debug.contains("BinaryMiniBlockEncoder"),
1434 "expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
1435 );
1436 assert!(
1437 !miniblock_debug.contains("FsstMiniBlockEncoder"),
1438 "did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
1439 );
1440
1441 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1442 let per_value_debug = format!("{:?}", per_value);
1443 assert!(
1444 per_value_debug.contains("VariableEncoder"),
1445 "expected VariableEncoder, got: {per_value_debug}"
1446 );
1447 assert!(
1448 !per_value_debug.contains("FsstPerValueEncoder"),
1449 "did not expect FsstPerValueEncoder, got: {per_value_debug}"
1450 );
1451 }
1452
1453 #[test]
1454 fn test_auto_fsst_still_enabled_for_utf8_fields() {
1455 let strategy = DefaultCompressionStrategy::new();
1456 let field = create_test_field("text", DataType::Utf8);
1457 let variable_data = create_fsst_candidate_variable_width_block();
1458
1459 let miniblock = strategy
1460 .create_miniblock_compressor(&field, &variable_data)
1461 .unwrap();
1462 let miniblock_debug = format!("{:?}", miniblock);
1463 assert!(
1464 miniblock_debug.contains("FsstMiniBlockEncoder"),
1465 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1466 );
1467
1468 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1469 let per_value_debug = format!("{:?}", per_value);
1470 assert!(
1471 per_value_debug.contains("FsstPerValueEncoder"),
1472 "expected FsstPerValueEncoder, got: {per_value_debug}"
1473 );
1474 }
1475
1476 #[test]
1477 fn test_explicit_fsst_still_supported_for_binary_fields() {
1478 let mut params = CompressionParams::new();
1479 params.columns.insert(
1480 "bytes".to_string(),
1481 CompressionFieldParams {
1482 compression: Some("fsst".to_string()),
1483 ..Default::default()
1484 },
1485 );
1486
1487 let strategy = DefaultCompressionStrategy::with_params(params);
1488 let field = create_test_field("bytes", DataType::Binary);
1489 let variable_data = create_fsst_candidate_variable_width_block();
1490
1491 let miniblock = strategy
1492 .create_miniblock_compressor(&field, &variable_data)
1493 .unwrap();
1494 let miniblock_debug = format!("{:?}", miniblock);
1495 assert!(
1496 miniblock_debug.contains("FsstMiniBlockEncoder"),
1497 "expected FsstMiniBlockEncoder, got: {miniblock_debug}"
1498 );
1499
1500 let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
1501 let per_value_debug = format!("{:?}", per_value);
1502 assert!(
1503 per_value_debug.contains("FsstPerValueEncoder"),
1504 "expected FsstPerValueEncoder, got: {per_value_debug}"
1505 );
1506 }
1507
1508 #[test]
1509 fn test_parameter_merge_priority() {
1510 let mut params = CompressionParams::new();
1511
1512 params.types.insert(
1514 "Int32".to_string(),
1515 CompressionFieldParams {
1516 rle_threshold: Some(0.5),
1517 compression: Some("lz4".to_string()),
1518 ..Default::default()
1519 },
1520 );
1521
1522 params.columns.insert(
1524 "user_id".to_string(),
1525 CompressionFieldParams {
1526 rle_threshold: Some(0.2),
1527 compression: Some("zstd".to_string()),
1528 compression_level: Some(6),
1529 bss: None,
1530 minichunk_size: None,
1531 },
1532 );
1533
1534 let strategy = DefaultCompressionStrategy::with_params(params);
1535
1536 let merged = strategy
1538 .params
1539 .get_field_params("user_id", &DataType::Int32);
1540
1541 assert_eq!(merged.rle_threshold, Some(0.2));
1543 assert_eq!(merged.compression, Some("zstd".to_string()));
1544 assert_eq!(merged.compression_level, Some(6));
1545
1546 let merged = strategy
1548 .params
1549 .get_field_params("other_field", &DataType::Int32);
1550 assert_eq!(merged.rle_threshold, Some(0.5));
1551 assert_eq!(merged.compression, Some("lz4".to_string()));
1552 assert_eq!(merged.compression_level, None);
1553 }
1554
1555 #[test]
1556 fn test_pattern_matching() {
1557 let mut params = CompressionParams::new();
1558
1559 params.columns.insert(
1561 "log_*".to_string(),
1562 CompressionFieldParams {
1563 compression: Some("zstd".to_string()),
1564 compression_level: Some(6),
1565 ..Default::default()
1566 },
1567 );
1568
1569 let strategy = DefaultCompressionStrategy::with_params(params);
1570
1571 let merged = strategy
1573 .params
1574 .get_field_params("log_messages", &DataType::Utf8);
1575 assert_eq!(merged.compression, Some("zstd".to_string()));
1576 assert_eq!(merged.compression_level, Some(6));
1577
1578 let merged = strategy
1580 .params
1581 .get_field_params("messages_log", &DataType::Utf8);
1582 assert_eq!(merged.compression, None);
1583 }
1584
1585 #[test]
1586 fn test_legacy_metadata_support() {
1587 let params = CompressionParams::new();
1588 let strategy = DefaultCompressionStrategy::with_params(params);
1589
1590 let mut metadata = HashMap::new();
1592 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1593 let mut field = create_test_field("some_column", DataType::Int32);
1594 field.metadata = metadata;
1595
1596 let data = create_fixed_width_block(32, 1000);
1597 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1598
1599 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1601 }
1602
1603 #[test]
1604 fn test_default_behavior() {
1605 let params = CompressionParams::new();
1607 let strategy = DefaultCompressionStrategy::with_params(params);
1608
1609 let field = create_test_field("random_column", DataType::Int32);
1610 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1612
1613 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1614 let debug_str = format!("{:?}", compressor);
1616 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1617 }
1618
1619 #[test]
1620 fn test_field_metadata_compression() {
1621 let params = CompressionParams::new();
1622 let strategy = DefaultCompressionStrategy::with_params(params);
1623
1624 let mut metadata = HashMap::new();
1626 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1627 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1628 let mut field = create_test_field("test_column", DataType::Int32);
1629 field.metadata = metadata;
1630
1631 let data = create_fixed_width_block(32, 1000);
1632 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1633
1634 let debug_str = format!("{:?}", compressor);
1636 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1637 }
1638
1639 #[test]
1640 fn test_field_metadata_rle_threshold() {
1641 let params = CompressionParams::new();
1642 let strategy = DefaultCompressionStrategy::with_params(params);
1643
1644 let mut metadata = HashMap::new();
1646 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1647 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1649 field.metadata = metadata;
1650
1651 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1654
1655 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1656
1657 let debug_str = format!("{:?}", compressor);
1659 assert!(debug_str.contains("RleEncoder"));
1660 }
1661
1662 #[test]
1663 fn test_field_metadata_override_params() {
1664 let mut params = CompressionParams::new();
1666 params.columns.insert(
1667 "test_column".to_string(),
1668 CompressionFieldParams {
1669 rle_threshold: Some(0.3),
1670 compression: Some("lz4".to_string()),
1671 compression_level: None,
1672 bss: None,
1673 minichunk_size: None,
1674 },
1675 );
1676
1677 let strategy = DefaultCompressionStrategy::with_params(params);
1678
1679 let mut metadata = HashMap::new();
1681 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1682 let mut field = create_test_field("test_column", DataType::Int32);
1683 field.metadata = metadata;
1684
1685 let data = create_fixed_width_block(32, 1000);
1686 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1687
1688 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1690 }
1691
1692 #[test]
1693 fn test_field_metadata_mixed_configuration() {
1694 let mut params = CompressionParams::new();
1696 params.types.insert(
1697 "Int32".to_string(),
1698 CompressionFieldParams {
1699 rle_threshold: Some(0.5),
1700 compression: Some("lz4".to_string()),
1701 ..Default::default()
1702 },
1703 );
1704
1705 let strategy = DefaultCompressionStrategy::with_params(params);
1706
1707 let mut metadata = HashMap::new();
1709 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1710 let mut field = create_test_field("test_column", DataType::Int32);
1711 field.metadata = metadata;
1712
1713 let data = create_fixed_width_block(32, 1000);
1714 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1715
1716 let debug_str = format!("{:?}", compressor);
1718 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1719 }
1720
1721 #[test]
1722 fn test_bss_field_metadata() {
1723 let params = CompressionParams::new();
1724 let strategy = DefaultCompressionStrategy::with_params(params);
1725
1726 let mut metadata = HashMap::new();
1728 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1729 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1730 let arrow_field =
1731 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1732 let field = Field::try_from(&arrow_field).unwrap();
1733
1734 let data = create_fixed_width_block(32, 100);
1736
1737 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1738 let debug_str = format!("{:?}", compressor);
1739 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1740 }
1741
1742 #[test]
1743 fn test_bss_with_compression() {
1744 let params = CompressionParams::new();
1745 let strategy = DefaultCompressionStrategy::with_params(params);
1746
1747 let mut metadata = HashMap::new();
1749 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1750 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1751 let arrow_field =
1752 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1753 let field = Field::try_from(&arrow_field).unwrap();
1754
1755 let data = create_fixed_width_block(64, 100);
1757
1758 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1759 let debug_str = format!("{:?}", compressor);
1760 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1762 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1763 }
1764
1765 #[test]
1766 #[cfg(any(feature = "lz4", feature = "zstd"))]
1767 fn test_general_block_decompression_fixed_width_v2_2() {
1768 let mut params = CompressionParams::new();
1770 params.columns.insert(
1771 "dict_values".to_string(),
1772 CompressionFieldParams {
1773 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1774 ..Default::default()
1775 },
1776 );
1777
1778 let mut strategy = DefaultCompressionStrategy::with_params(params);
1779 strategy.version = LanceFileVersion::V2_2;
1780
1781 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1782 let data = create_fixed_width_block(24, 1024);
1783 let DataBlock::FixedWidth(expected_block) = &data else {
1784 panic!("expected fixed width block");
1785 };
1786 let expected_bits = expected_block.bits_per_value;
1787 let expected_num_values = expected_block.num_values;
1788 let num_values = expected_num_values;
1789
1790 let (compressor, encoding) = strategy
1791 .create_block_compressor(&field, &data)
1792 .expect("general compression should be selected");
1793 match encoding.compression.as_ref() {
1794 Some(Compression::General(_)) => {}
1795 other => panic!("expected general compression, got {:?}", other),
1796 }
1797
1798 let compressed_buffer = compressor
1799 .compress(data.clone())
1800 .expect("write path general compression should succeed");
1801
1802 let decompressor = DefaultDecompressionStrategy::default()
1803 .create_block_decompressor(&encoding)
1804 .expect("general block decompressor should be created");
1805
1806 let decoded = decompressor
1807 .decompress(compressed_buffer, num_values)
1808 .expect("decompression should succeed");
1809
1810 match decoded {
1811 DataBlock::FixedWidth(block) => {
1812 assert_eq!(block.bits_per_value, expected_bits);
1813 assert_eq!(block.num_values, expected_num_values);
1814 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1815 }
1816 _ => panic!("expected fixed width block"),
1817 }
1818 }
1819
1820 #[test]
1821 #[cfg(any(feature = "lz4", feature = "zstd"))]
1822 fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
1823 let mut params = CompressionParams::new();
1824 params.columns.insert(
1825 "dict_values".to_string(),
1826 CompressionFieldParams {
1827 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1828 ..Default::default()
1829 },
1830 );
1831
1832 let strategy =
1833 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
1834 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1835 let data = create_fixed_width_block(24, 1024);
1836
1837 let (_compressor, encoding) = strategy
1838 .create_block_compressor(&field, &data)
1839 .expect("block compressor selection should succeed");
1840
1841 assert!(
1842 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1843 "general compression should not be selected for V2.1"
1844 );
1845 }
1846
1847 #[test]
1848 fn test_none_compression_disables_auto_general_block_compression() {
1849 let mut params = CompressionParams::new();
1850 params.columns.insert(
1851 "dict_values".to_string(),
1852 CompressionFieldParams {
1853 compression: Some("none".to_string()),
1854 ..Default::default()
1855 },
1856 );
1857
1858 let strategy =
1859 DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
1860 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1861 let data = create_fixed_width_block(24, 20_000);
1862
1863 assert!(
1864 data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
1865 "test requires block size above automatic general compression threshold"
1866 );
1867
1868 let (_compressor, encoding) = strategy
1869 .create_block_compressor(&field, &data)
1870 .expect("block compressor selection should succeed");
1871
1872 assert!(
1873 !matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
1874 "compression=none should disable automatic block general compression"
1875 );
1876 }
1877
1878 #[test]
1879 fn test_rle_block_used_for_version_v2_2() {
1880 let field = create_test_field("test_repdef", DataType::UInt16);
1881
1882 let num_values = 1000u64;
1884 let mut data = Vec::with_capacity(num_values as usize);
1885 for i in 0..10 {
1886 for _ in 0..100 {
1887 data.push(i as u16);
1888 }
1889 }
1890
1891 let mut block = FixedWidthDataBlock {
1892 bits_per_value: 16,
1893 data: LanceBuffer::reinterpret_vec(data),
1894 num_values,
1895 block_info: BlockInfo::default(),
1896 };
1897
1898 block.compute_stat();
1899
1900 let data_block = DataBlock::FixedWidth(block);
1901
1902 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1903 .with_version(LanceFileVersion::V2_2);
1904
1905 let (compressor, _) = strategy
1906 .create_block_compressor(&field, &data_block)
1907 .unwrap();
1908
1909 let debug_str = format!("{:?}", compressor);
1910 assert!(debug_str.contains("RleEncoder"));
1911 }
1912
1913 #[test]
1914 fn test_rle_block_not_used_for_version_v2_1() {
1915 let field = create_test_field("test_repdef", DataType::UInt16);
1916
1917 let num_values = 1000u64;
1919 let mut data = Vec::with_capacity(num_values as usize);
1920 for i in 0..10 {
1921 for _ in 0..100 {
1922 data.push(i as u16);
1923 }
1924 }
1925
1926 let mut block = FixedWidthDataBlock {
1927 bits_per_value: 16,
1928 data: LanceBuffer::reinterpret_vec(data),
1929 num_values,
1930 block_info: BlockInfo::default(),
1931 };
1932
1933 block.compute_stat();
1934
1935 let data_block = DataBlock::FixedWidth(block);
1936
1937 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
1938 .with_version(LanceFileVersion::V2_1);
1939
1940 let (compressor, _) = strategy
1941 .create_block_compressor(&field, &data_block)
1942 .unwrap();
1943
1944 let debug_str = format!("{:?}", compressor);
1945 assert!(
1946 !debug_str.contains("RleEncoder"),
1947 "RLE should not be used for V2.1"
1948 );
1949 }
1950}