1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 pb21::{compressive_encoding::Compression, CompressiveEncoding},
60 ProtobufUtils21,
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
78
79const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
81
82pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
95 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
100}
101
102pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
115 fn create_block_compressor(
117 &self,
118 field: &Field,
119 data: &DataBlock,
120 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
121
122 fn create_per_value(
124 &self,
125 field: &Field,
126 data: &DataBlock,
127 ) -> Result<Box<dyn PerValueCompressor>>;
128
129 fn create_miniblock_compressor(
131 &self,
132 field: &Field,
133 data: &DataBlock,
134 ) -> Result<Box<dyn MiniBlockCompressor>>;
135}
136
137#[derive(Debug, Default, Clone)]
138pub struct DefaultCompressionStrategy {
139 params: CompressionParams,
141 version: LanceFileVersion,
143}
144
145fn try_bss_for_mini_block(
146 data: &FixedWidthDataBlock,
147 params: &CompressionFieldParams,
148) -> Option<Box<dyn MiniBlockCompressor>> {
149 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
152 return None;
153 }
154
155 let mode = params.bss.unwrap_or(BssMode::Auto);
156 if should_use_bss(data, mode) {
158 return Some(Box::new(ByteStreamSplitEncoder::new(
159 data.bits_per_value as usize,
160 )));
161 }
162 None
163}
164
165fn try_rle_for_mini_block(
166 data: &FixedWidthDataBlock,
167 params: &CompressionFieldParams,
168) -> Option<Box<dyn MiniBlockCompressor>> {
169 let bits = data.bits_per_value;
170 if !matches!(bits, 8 | 16 | 32 | 64) {
171 return None;
172 }
173
174 let type_size = bits / 8;
175 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
176 let threshold = params
177 .rle_threshold
178 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
179
180 let passes_threshold = match params.rle_threshold {
183 Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
184 None => true,
185 };
186
187 if !passes_threshold {
188 return None;
189 }
190
191 let num_values = data.num_values;
197 let estimated_pairs = (run_count + (num_values / 255)).min(num_values);
198
199 let raw_bytes = (num_values as u128) * (type_size as u128);
200 let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
201
202 if rle_bytes < raw_bytes {
203 #[cfg(feature = "bitpacking")]
204 {
205 if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data) {
206 if (bitpack_bytes as u128) < rle_bytes {
207 return None;
208 }
209 }
210 }
211 return Some(Box::new(RleMiniBlockEncoder::new()));
212 }
213 None
214}
215
216fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
217 #[cfg(feature = "bitpacking")]
218 {
219 let bits = _data.bits_per_value;
220 if estimate_inline_bitpacking_bytes(_data).is_some() {
221 return Some(Box::new(InlineBitpacking::new(bits)));
222 }
223 None
224 }
225 #[cfg(not(feature = "bitpacking"))]
226 {
227 None
228 }
229}
230
231#[cfg(feature = "bitpacking")]
232fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
233 use arrow_array::cast::AsArray;
234
235 let bits = data.bits_per_value;
236 if !matches!(bits, 8 | 16 | 32 | 64) {
237 return None;
238 }
239 if data.num_values == 0 {
240 return None;
241 }
242
243 let bit_widths = data.expect_stat(Stat::BitWidth);
244 let widths = bit_widths.as_primitive::<UInt64Type>();
245
246 let words_per_chunk: u128 = 1;
247 let word_bytes: u128 = (bits / 8) as u128;
248 let mut total_words: u128 = 0;
249 for i in 0..widths.len() {
250 let bit_width = widths.value(i) as u128;
251 let packed_words = (1024u128 * bit_width) / (bits as u128);
252 total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
253 }
254
255 let estimated_bytes = total_words.saturating_mul(word_bytes);
256 let raw_bytes = data.data_size() as u128;
257
258 if estimated_bytes >= raw_bytes {
259 return None;
260 }
261
262 u64::try_from(estimated_bytes).ok()
263}
264
265fn try_bitpack_for_block(
266 data: &FixedWidthDataBlock,
267) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
268 let bits = data.bits_per_value;
269 if !matches!(bits, 8 | 16 | 32 | 64) {
270 return None;
271 }
272
273 let bit_widths = data.expect_stat(Stat::BitWidth);
274 let widths = bit_widths.as_primitive::<UInt64Type>();
275 let has_all_zeros = widths.values().contains(&0);
276 let max_bit_width = *widths.values().iter().max().unwrap();
277
278 let too_small =
279 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
280
281 if has_all_zeros || too_small {
282 return None;
283 }
284
285 if data.num_values <= 1024 {
286 let compressor = Box::new(InlineBitpacking::new(bits));
287 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
288 Some((compressor, encoding))
289 } else {
290 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
291 let encoding = ProtobufUtils21::out_of_line_bitpacking(
292 bits,
293 ProtobufUtils21::flat(max_bit_width, None),
294 );
295 Some((compressor, encoding))
296 }
297}
298
299fn maybe_wrap_general_for_mini_block(
300 inner: Box<dyn MiniBlockCompressor>,
301 params: &CompressionFieldParams,
302) -> Result<Box<dyn MiniBlockCompressor>> {
303 match params.compression.as_deref() {
304 None | Some("none") | Some("fsst") => Ok(inner),
305 Some(raw) => {
306 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
307 lance_core::Error::invalid_input(
308 format!("Unknown compression scheme: {raw}"),
309 location!(),
310 )
311 })?;
312 let cfg = CompressionConfig::new(scheme, params.compression_level);
313 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
314 }
315 }
316}
317
318fn try_general_compression(
319 version: LanceFileVersion,
320 field_params: &CompressionFieldParams,
321 data: &DataBlock,
322) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
323 if let Some(compression_scheme) = &field_params.compression {
326 if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
327 let scheme: CompressionScheme = compression_scheme.parse()?;
328 let config = CompressionConfig::new(scheme, field_params.compression_level);
329 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
330 return Ok(Some((compressor, config)));
331 }
332 }
333
334 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
336 && version >= LanceFileVersion::V2_2
337 {
338 let compressor = Box::new(CompressedBufferEncoder::default());
339 let config = compressor.compressor.config();
340 return Ok(Some((compressor, config)));
341 }
342
343 Ok(None)
344}
345
346impl DefaultCompressionStrategy {
347 pub fn new() -> Self {
349 Self::default()
350 }
351
352 pub fn with_params(params: CompressionParams) -> Self {
354 Self {
355 params,
356 version: LanceFileVersion::default(),
357 }
358 }
359
360 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
362 self.version = version;
363 self
364 }
365
366 fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
368 let mut params = CompressionFieldParams::default();
369
370 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
372 params.compression = Some(compression.clone());
373 }
374
375 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
377 params.compression_level = level.parse().ok();
378 }
379
380 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
382 params.rle_threshold = threshold.parse().ok();
383 }
384
385 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
387 match BssMode::parse(bss_str) {
388 Some(mode) => params.bss = Some(mode),
389 None => {
390 log::warn!("Invalid BSS mode '{}', using default", bss_str);
391 }
392 }
393 }
394
395 if let Some(minichunk_size_str) = field
397 .metadata
398 .get(super::constants::MINICHUNK_SIZE_META_KEY)
399 {
400 if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
401 if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
403 log::warn!(
404 "minichunk_size '{}' too large for version '{}', using default",
405 minichunk_size,
406 version
407 );
408 } else {
409 params.minichunk_size = Some(minichunk_size);
410 }
411 } else {
412 log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
413 }
414 }
415
416 params
417 }
418
419 fn build_fixed_width_compressor(
420 &self,
421 params: &CompressionFieldParams,
422 data: &FixedWidthDataBlock,
423 ) -> Result<Box<dyn MiniBlockCompressor>> {
424 if params.compression.as_deref() == Some("none") {
425 return Ok(Box::new(ValueEncoder::default()));
426 }
427
428 let base = try_bss_for_mini_block(data, params)
429 .or_else(|| try_rle_for_mini_block(data, params))
430 .or_else(|| try_bitpack_for_mini_block(data))
431 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
432
433 maybe_wrap_general_for_mini_block(base, params)
434 }
435
436 fn build_variable_width_compressor(
438 &self,
439 params: &CompressionFieldParams,
440 data: &VariableWidthBlock,
441 ) -> Result<Box<dyn MiniBlockCompressor>> {
442 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
443 return Err(Error::invalid_input(
444 format!(
445 "Variable width compression not supported for {} bit offsets",
446 data.bits_per_offset
447 ),
448 location!(),
449 ));
450 }
451
452 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
454 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
455
456 if params.compression.as_deref() == Some("none") {
458 return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
459 }
460
461 if params.compression.as_deref() == Some("fsst") {
463 return Ok(Box::new(FsstMiniBlockEncoder::new(params.minichunk_size)));
464 }
465
466 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
468 >= FSST_LEAST_INPUT_MAX_LENGTH
469 && data_size >= FSST_LEAST_INPUT_SIZE as u64
470 {
471 Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
472 } else {
473 Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
474 };
475
476 if let Some(compression_scheme) = ¶ms.compression {
478 if compression_scheme != "none" && compression_scheme != "fsst" {
479 let scheme: CompressionScheme = compression_scheme.parse()?;
480 let config = CompressionConfig::new(scheme, params.compression_level);
481 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
482 }
483 }
484
485 Ok(base_encoder)
486 }
487
488 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
491 let mut field_params = self
492 .params
493 .get_field_params(&field.name, &field.data_type());
494
495 let metadata_params = Self::parse_field_metadata(field, &self.version);
497 field_params.merge(&metadata_params);
498
499 field_params
500 }
501}
502
503impl CompressionStrategy for DefaultCompressionStrategy {
504 fn create_miniblock_compressor(
505 &self,
506 field: &Field,
507 data: &DataBlock,
508 ) -> Result<Box<dyn MiniBlockCompressor>> {
509 let field_params = self.get_merged_field_params(field);
510
511 match data {
512 DataBlock::FixedWidth(fixed_width_data) => {
513 self.build_fixed_width_compressor(&field_params, fixed_width_data)
514 }
515 DataBlock::VariableWidth(variable_width_data) => {
516 self.build_variable_width_compressor(&field_params, variable_width_data)
517 }
518 DataBlock::Struct(struct_data_block) => {
519 if struct_data_block.has_variable_width_child() {
522 return Err(Error::invalid_input(
523 "Packed struct mini-block encoding supports only fixed-width children",
524 location!(),
525 ));
526 }
527 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
528 }
529 DataBlock::FixedSizeList(_) => {
530 Ok(Box::new(ValueEncoder::default()))
538 }
539 _ => Err(Error::NotSupported {
540 source: format!(
541 "Mini-block compression not yet supported for block type {}",
542 data.name()
543 )
544 .into(),
545 location: location!(),
546 }),
547 }
548 }
549
550 fn create_per_value(
551 &self,
552 field: &Field,
553 data: &DataBlock,
554 ) -> Result<Box<dyn PerValueCompressor>> {
555 let field_params = self.get_merged_field_params(field);
556
557 match data {
558 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
559 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
560 DataBlock::Struct(struct_block) => {
561 if field.children.len() != struct_block.children.len() {
562 return Err(Error::invalid_input(
563 "Struct field metadata does not match data block children",
564 location!(),
565 ));
566 }
567 let has_variable_child = struct_block.has_variable_width_child();
568 if has_variable_child {
569 if self.version < LanceFileVersion::V2_2 {
570 return Err(Error::NotSupported {
571 source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
572 location: location!(),
573 });
574 }
575 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
576 self.clone(),
577 field.children.clone(),
578 )))
579 } else {
580 Err(Error::invalid_input(
581 "Packed struct per-value compression should not be used for fixed-width-only structs",
582 location!(),
583 ))
584 }
585 }
586 DataBlock::VariableWidth(variable_width) => {
587 if field_params.compression.as_deref() == Some("none") {
589 return Ok(Box::new(VariableEncoder::default()));
590 }
591
592 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
593 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
594
595 let per_value_requested =
600 if let Some(compression) = field_params.compression.as_deref() {
601 compression != "fsst"
602 } else {
603 false
604 };
605
606 if (max_len > 32 * 1024 || per_value_requested)
607 && data_size >= FSST_LEAST_INPUT_SIZE as u64
608 {
609 return Ok(Box::new(CompressedBufferEncoder::default()));
610 }
611
612 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
613 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
614 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
615
616 let variable_compression = Box::new(VariableEncoder::default());
617
618 if field_params.compression.as_deref() == Some("fsst")
620 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
621 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
622 {
623 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
624 } else {
625 Ok(variable_compression)
626 }
627 } else {
628 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
629 }
630 }
631 _ => unreachable!(
632 "Per-value compression not yet supported for block type: {}",
633 data.name()
634 ),
635 }
636 }
637
638 fn create_block_compressor(
639 &self,
640 field: &Field,
641 data: &DataBlock,
642 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
643 let field_params = self.get_merged_field_params(field);
644
645 match data {
646 DataBlock::FixedWidth(fixed_width) => {
647 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
648 return Ok((compressor, encoding));
649 }
650
651 if let Some((compressor, config)) =
653 try_general_compression(self.version, &field_params, data)?
654 {
655 let encoding = ProtobufUtils21::wrapped(
656 config,
657 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
658 )?;
659 return Ok((compressor, encoding));
660 }
661
662 let encoder = Box::new(ValueEncoder::default());
663 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
664 Ok((encoder, encoding))
665 }
666 DataBlock::VariableWidth(variable_width) => {
667 if let Some((compressor, config)) =
669 try_general_compression(self.version, &field_params, data)?
670 {
671 let encoding = ProtobufUtils21::wrapped(
672 config,
673 ProtobufUtils21::variable(
674 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
675 None,
676 ),
677 )?;
678 return Ok((compressor, encoding));
679 }
680
681 let encoder = Box::new(VariableEncoder::default());
682 let encoding = ProtobufUtils21::variable(
683 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
684 None,
685 );
686 Ok((encoder, encoding))
687 }
688 _ => unreachable!(),
689 }
690 }
691}
692
693pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
694 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
695}
696
697pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
698 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
700 fn bits_per_value(&self) -> u64;
704}
705
706pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
707 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
709}
710
711pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
712 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
713}
714
715pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
716 fn create_miniblock_decompressor(
717 &self,
718 description: &CompressiveEncoding,
719 decompression_strategy: &dyn DecompressionStrategy,
720 ) -> Result<Box<dyn MiniBlockDecompressor>>;
721
722 fn create_fixed_per_value_decompressor(
723 &self,
724 description: &CompressiveEncoding,
725 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
726
727 fn create_variable_per_value_decompressor(
728 &self,
729 description: &CompressiveEncoding,
730 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
731
732 fn create_block_decompressor(
733 &self,
734 description: &CompressiveEncoding,
735 ) -> Result<Box<dyn BlockDecompressor>>;
736}
737
738#[derive(Debug, Default)]
739pub struct DefaultDecompressionStrategy {}
740
741impl DecompressionStrategy for DefaultDecompressionStrategy {
742 fn create_miniblock_decompressor(
743 &self,
744 description: &CompressiveEncoding,
745 decompression_strategy: &dyn DecompressionStrategy,
746 ) -> Result<Box<dyn MiniBlockDecompressor>> {
747 match description.compression.as_ref().unwrap() {
748 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
749 #[cfg(feature = "bitpacking")]
750 Compression::InlineBitpacking(description) => {
751 Ok(Box::new(InlineBitpacking::from_description(description)))
752 }
753 #[cfg(not(feature = "bitpacking"))]
754 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
755 source: "this runtime was not built with bitpacking support".into(),
756 location: location!(),
757 }),
758 Compression::Variable(variable) => {
759 let Compression::Flat(offsets) = variable
760 .offsets
761 .as_ref()
762 .unwrap()
763 .compression
764 .as_ref()
765 .unwrap()
766 else {
767 panic!("Variable compression only supports flat offsets")
768 };
769 Ok(Box::new(BinaryMiniBlockDecompressor::new(
770 offsets.bits_per_value as u8,
771 )))
772 }
773 Compression::Fsst(description) => {
774 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
775 description.values.as_ref().unwrap(),
776 decompression_strategy,
777 )?;
778 Ok(Box::new(FsstMiniBlockDecompressor::new(
779 description,
780 inner_decompressor,
781 )))
782 }
783 Compression::PackedStruct(description) => Ok(Box::new(
784 PackedStructFixedWidthMiniBlockDecompressor::new(description),
785 )),
786 Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
787 source: "variable packed struct decoding is not yet implemented".into(),
788 location: location!(),
789 }),
790 Compression::FixedSizeList(fsl) => {
791 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
794 }
795 Compression::Rle(rle) => {
796 let Compression::Flat(values) =
797 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
798 else {
799 panic!("RLE compression only supports flat values")
800 };
801 let Compression::Flat(run_lengths) = rle
802 .run_lengths
803 .as_ref()
804 .unwrap()
805 .compression
806 .as_ref()
807 .unwrap()
808 else {
809 panic!("RLE compression only supports flat run lengths")
810 };
811 assert_eq!(
812 run_lengths.bits_per_value, 8,
813 "RLE compression only supports 8-bit run lengths"
814 );
815 Ok(Box::new(RleMiniBlockDecompressor::new(
816 values.bits_per_value,
817 )))
818 }
819 Compression::ByteStreamSplit(bss) => {
820 let Compression::Flat(values) =
821 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
822 else {
823 panic!("ByteStreamSplit compression only supports flat values")
824 };
825 Ok(Box::new(ByteStreamSplitDecompressor::new(
826 values.bits_per_value as usize,
827 )))
828 }
829 Compression::General(general) => {
830 let inner_decompressor = self.create_miniblock_decompressor(
832 general.values.as_ref().ok_or_else(|| {
833 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
834 })?,
835 decompression_strategy,
836 )?;
837
838 let compression = general.compression.as_ref().ok_or_else(|| {
840 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
841 })?;
842
843 let scheme = compression.scheme().try_into()?;
844
845 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
846 scheme,
847 compression.level,
848 );
849
850 Ok(Box::new(GeneralMiniBlockDecompressor::new(
851 inner_decompressor,
852 compression_config,
853 )))
854 }
855 _ => todo!(),
856 }
857 }
858
859 fn create_fixed_per_value_decompressor(
860 &self,
861 description: &CompressiveEncoding,
862 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
863 match description.compression.as_ref().unwrap() {
864 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
865 constant
866 .value
867 .as_ref()
868 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
869 ))),
870 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
871 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
872 _ => todo!("fixed-per-value decompressor for {:?}", description),
873 }
874 }
875
876 fn create_variable_per_value_decompressor(
877 &self,
878 description: &CompressiveEncoding,
879 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
880 match description.compression.as_ref().unwrap() {
881 Compression::Variable(variable) => {
882 let Compression::Flat(offsets) = variable
883 .offsets
884 .as_ref()
885 .unwrap()
886 .compression
887 .as_ref()
888 .unwrap()
889 else {
890 panic!("Variable compression only supports flat offsets")
891 };
892 assert!(offsets.bits_per_value < u8::MAX as u64);
893 Ok(Box::new(VariableDecoder::default()))
894 }
895 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
896 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
897 Box::new(VariableDecoder::default()),
898 ))),
899 Compression::General(ref general) => {
900 Ok(Box::new(CompressedBufferEncoder::from_scheme(
901 general.compression.as_ref().expect_ok()?.scheme(),
902 )?))
903 }
904 Compression::VariablePackedStruct(description) => {
905 let mut fields = Vec::with_capacity(description.fields.len());
906 for field in &description.fields {
907 let value_encoding = field.value.as_ref().ok_or_else(|| {
908 Error::invalid_input(
909 "VariablePackedStruct field is missing value encoding",
910 location!(),
911 )
912 })?;
913 let decoder = match field.layout.as_ref().ok_or_else(|| {
914 Error::invalid_input(
915 "VariablePackedStruct field is missing layout details",
916 location!(),
917 )
918 })? {
919 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
920 bits_per_value,
921 ) => {
922 let decompressor =
923 self.create_fixed_per_value_decompressor(value_encoding)?;
924 VariablePackedStructFieldDecoder {
925 kind: VariablePackedStructFieldKind::Fixed {
926 bits_per_value: *bits_per_value,
927 decompressor: Arc::from(decompressor),
928 },
929 }
930 }
931 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
932 bits_per_length,
933 ) => {
934 let decompressor =
935 self.create_variable_per_value_decompressor(value_encoding)?;
936 VariablePackedStructFieldDecoder {
937 kind: VariablePackedStructFieldKind::Variable {
938 bits_per_length: *bits_per_length,
939 decompressor: Arc::from(decompressor),
940 },
941 }
942 }
943 };
944 fields.push(decoder);
945 }
946 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
947 fields,
948 )))
949 }
950 _ => todo!("variable-per-value decompressor for {:?}", description),
951 }
952 }
953
954 fn create_block_decompressor(
955 &self,
956 description: &CompressiveEncoding,
957 ) -> Result<Box<dyn BlockDecompressor>> {
958 match description.compression.as_ref().unwrap() {
959 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
960 InlineBitpacking::from_description(inline_bitpacking),
961 )),
962 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
963 Compression::Constant(constant) => {
964 let scalar = constant
965 .value
966 .as_ref()
967 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
968 Ok(Box::new(ConstantDecompressor::new(scalar)))
969 }
970 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
971 Compression::FixedSizeList(fsl) => {
972 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
973 }
974 Compression::OutOfLineBitpacking(out_of_line) => {
975 let compressed_bit_width = match out_of_line
977 .values
978 .as_ref()
979 .unwrap()
980 .compression
981 .as_ref()
982 .unwrap()
983 {
984 Compression::Flat(flat) => flat.bits_per_value,
985 _ => {
986 return Err(Error::InvalidInput {
987 location: location!(),
988 source: "OutOfLineBitpacking values must use Flat encoding".into(),
989 })
990 }
991 };
992 Ok(Box::new(OutOfLineBitpacking::new(
993 compressed_bit_width,
994 out_of_line.uncompressed_bits_per_value,
995 )))
996 }
997 Compression::General(general) => {
998 let inner_desc = general
999 .values
1000 .as_ref()
1001 .ok_or_else(|| {
1002 Error::invalid_input(
1003 "General compression missing inner encoding",
1004 location!(),
1005 )
1006 })?
1007 .as_ref();
1008 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
1009
1010 let compression = general.compression.as_ref().ok_or_else(|| {
1011 Error::invalid_input(
1012 "General compression missing compression config",
1013 location!(),
1014 )
1015 })?;
1016 let scheme = compression.scheme().try_into()?;
1017 let config = CompressionConfig::new(scheme, compression.level);
1018 let general_decompressor =
1019 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
1020
1021 Ok(Box::new(general_decompressor))
1022 }
1023 _ => todo!(),
1024 }
1025 }
1026}
1027
1028#[cfg(test)]
1029mod tests {
1030 use super::*;
1031 use crate::buffer::LanceBuffer;
1032 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
1033 use crate::testing::extract_array_encoding_chain;
1034 use arrow_schema::{DataType, Field as ArrowField};
1035 use std::collections::HashMap;
1036
1037 fn create_test_field(name: &str, data_type: DataType) -> Field {
1038 let arrow_field = ArrowField::new(name, data_type, true);
1039 let mut field = Field::try_from(&arrow_field).unwrap();
1040 field.id = -1;
1041 field
1042 }
1043
1044 fn create_fixed_width_block_with_stats(
1045 bits_per_value: u64,
1046 num_values: u64,
1047 run_count: u64,
1048 ) -> DataBlock {
1049 let bytes_per_value = (bits_per_value / 8) as usize;
1051 let total_bytes = bytes_per_value * num_values as usize;
1052 let mut data = vec![0u8; total_bytes];
1053
1054 let values_per_run = (num_values / run_count).max(1);
1056 let mut run_value = 0u8;
1057
1058 for i in 0..num_values as usize {
1059 if i % values_per_run as usize == 0 {
1060 run_value = run_value.wrapping_add(17); }
1062 for j in 0..bytes_per_value {
1064 let byte_offset = i * bytes_per_value + j;
1065 if byte_offset < data.len() {
1066 data[byte_offset] = run_value.wrapping_add(j as u8);
1067 }
1068 }
1069 }
1070
1071 let mut block = FixedWidthDataBlock {
1072 bits_per_value,
1073 data: LanceBuffer::reinterpret_vec(data),
1074 num_values,
1075 block_info: BlockInfo::default(),
1076 };
1077
1078 use crate::statistics::ComputeStat;
1080 block.compute_stat();
1081
1082 DataBlock::FixedWidth(block)
1083 }
1084
1085 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1086 let bytes_per_value = (bits_per_value / 8) as usize;
1088 let total_bytes = bytes_per_value * num_values as usize;
1089 let mut data = vec![0u8; total_bytes];
1090
1091 for i in 0..num_values as usize {
1093 let byte_offset = i * bytes_per_value;
1094 if byte_offset < data.len() {
1095 data[byte_offset] = (i % 256) as u8;
1096 }
1097 }
1098
1099 let mut block = FixedWidthDataBlock {
1100 bits_per_value,
1101 data: LanceBuffer::reinterpret_vec(data),
1102 num_values,
1103 block_info: BlockInfo::default(),
1104 };
1105
1106 use crate::statistics::ComputeStat;
1108 block.compute_stat();
1109
1110 DataBlock::FixedWidth(block)
1111 }
1112
1113 fn create_variable_width_block(
1114 bits_per_offset: u8,
1115 num_values: u64,
1116 avg_value_size: usize,
1117 ) -> DataBlock {
1118 use crate::statistics::ComputeStat;
1119
1120 let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1122 let mut current_offset = 0i64;
1123 offsets.push(current_offset);
1124
1125 for i in 0..num_values {
1127 let value_size = if avg_value_size == 0 {
1128 1
1129 } else {
1130 ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1131 .min(avg_value_size * 2)
1132 };
1133 current_offset += value_size as i64;
1134 offsets.push(current_offset);
1135 }
1136
1137 let total_data_size = current_offset as usize;
1139 let mut data = vec![0u8; total_data_size];
1140
1141 for i in 0..num_values {
1143 let start_offset = offsets[i as usize] as usize;
1144 let end_offset = offsets[(i + 1) as usize] as usize;
1145
1146 let content = (i % 256) as u8;
1147 for j in 0..end_offset - start_offset {
1148 data[start_offset + j] = content.wrapping_add(j as u8);
1149 }
1150 }
1151
1152 let offsets_buffer = match bits_per_offset {
1154 32 => {
1155 let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1156 LanceBuffer::reinterpret_vec(offsets_32)
1157 }
1158 64 => LanceBuffer::reinterpret_vec(offsets),
1159 _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1160 };
1161
1162 let mut block = VariableWidthBlock {
1163 data: LanceBuffer::from(data),
1164 offsets: offsets_buffer,
1165 bits_per_offset,
1166 num_values,
1167 block_info: BlockInfo::default(),
1168 };
1169
1170 block.compute_stat();
1171 DataBlock::VariableWidth(block)
1172 }
1173
1174 #[test]
1175 fn test_parameter_based_compression() {
1176 let mut params = CompressionParams::new();
1177
1178 params.columns.insert(
1180 "*_id".to_string(),
1181 CompressionFieldParams {
1182 rle_threshold: Some(0.3),
1183 compression: Some("lz4".to_string()),
1184 compression_level: None,
1185 bss: Some(BssMode::Off), minichunk_size: None,
1187 },
1188 );
1189
1190 let strategy = DefaultCompressionStrategy::with_params(params);
1191 let field = create_test_field("user_id", DataType::Int32);
1192
1193 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1198 let debug_str = format!("{:?}", compressor);
1200
1201 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1203 assert!(debug_str.contains("RleMiniBlockEncoder"));
1204 }
1205
1206 #[test]
1207 fn test_type_level_parameters() {
1208 let mut params = CompressionParams::new();
1209
1210 params.types.insert(
1212 "Int32".to_string(),
1213 CompressionFieldParams {
1214 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1216 compression_level: Some(3),
1217 bss: Some(BssMode::Off), minichunk_size: None,
1219 },
1220 );
1221
1222 let strategy = DefaultCompressionStrategy::with_params(params);
1223 let field = create_test_field("some_column", DataType::Int32);
1224 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1226
1227 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1228 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1230 }
1231
1232 #[test]
1233 #[cfg(feature = "bitpacking")]
1234 fn test_low_cardinality_prefers_bitpacking_over_rle() {
1235 let strategy = DefaultCompressionStrategy::new();
1236 let field = create_test_field("int_score", DataType::Int64);
1237
1238 let mut values: Vec<u64> = Vec::with_capacity(256);
1241 for run_idx in 0..64 {
1242 let value = match run_idx % 3 {
1243 0 => 3u64,
1244 1 => 4u64,
1245 _ => 5u64,
1246 };
1247 values.extend(std::iter::repeat_n(value, 4));
1248 }
1249
1250 let mut block = FixedWidthDataBlock {
1251 bits_per_value: 64,
1252 data: LanceBuffer::reinterpret_vec(values),
1253 num_values: 256,
1254 block_info: BlockInfo::default(),
1255 };
1256
1257 use crate::statistics::ComputeStat;
1258 block.compute_stat();
1259
1260 let data = DataBlock::FixedWidth(block);
1261 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1262 let debug_str = format!("{:?}", compressor);
1263 assert!(
1264 debug_str.contains("InlineBitpacking"),
1265 "expected InlineBitpacking, got: {debug_str}"
1266 );
1267 assert!(
1268 !debug_str.contains("RleMiniBlockEncoder"),
1269 "expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
1270 );
1271 }
1272
1273 fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1274 let chain = extract_array_encoding_chain(encoding);
1275 if variable {
1276 assert_eq!(chain.len(), 2);
1277 assert_eq!(chain.first().unwrap().as_str(), "variable");
1278 assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1279 } else {
1280 assert_eq!(chain.len(), 1);
1281 assert_eq!(chain.first().unwrap().as_str(), "flat");
1282 }
1283 }
1284
1285 #[test]
1286 fn test_none_compression() {
1287 let mut params = CompressionParams::new();
1288
1289 params.columns.insert(
1291 "embeddings".to_string(),
1292 CompressionFieldParams {
1293 compression: Some("none".to_string()),
1294 ..Default::default()
1295 },
1296 );
1297
1298 let strategy = DefaultCompressionStrategy::with_params(params);
1299 let field = create_test_field("embeddings", DataType::Float32);
1300 let fixed_data = create_fixed_width_block(32, 1000);
1301 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1302
1303 let compressor = strategy
1305 .create_miniblock_compressor(&field, &fixed_data)
1306 .unwrap();
1307 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1308 check_uncompressed_encoding(&encoding, false);
1309 let compressor = strategy
1310 .create_miniblock_compressor(&field, &variable_data)
1311 .unwrap();
1312 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1313 check_uncompressed_encoding(&encoding, true);
1314
1315 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1317 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1318 check_uncompressed_encoding(&encoding, false);
1319 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1320 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1321 check_uncompressed_encoding(&encoding, true);
1322 }
1323
1324 #[test]
1325 fn test_field_metadata_none_compression() {
1326 let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1328 let mut metadata = HashMap::new();
1329 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1330 arrow_field = arrow_field.with_metadata(metadata);
1331 let field = Field::try_from(&arrow_field).unwrap();
1332
1333 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1334
1335 let fixed_data = create_fixed_width_block(32, 1000);
1337 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1338
1339 let compressor = strategy
1340 .create_miniblock_compressor(&field, &fixed_data)
1341 .unwrap();
1342 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1343 check_uncompressed_encoding(&encoding, false);
1344
1345 let compressor = strategy
1346 .create_miniblock_compressor(&field, &variable_data)
1347 .unwrap();
1348 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1349 check_uncompressed_encoding(&encoding, true);
1350
1351 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1353 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1354 check_uncompressed_encoding(&encoding, false);
1355
1356 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1357 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1358 check_uncompressed_encoding(&encoding, true);
1359 }
1360
1361 #[test]
1362 fn test_parameter_merge_priority() {
1363 let mut params = CompressionParams::new();
1364
1365 params.types.insert(
1367 "Int32".to_string(),
1368 CompressionFieldParams {
1369 rle_threshold: Some(0.5),
1370 compression: Some("lz4".to_string()),
1371 ..Default::default()
1372 },
1373 );
1374
1375 params.columns.insert(
1377 "user_id".to_string(),
1378 CompressionFieldParams {
1379 rle_threshold: Some(0.2),
1380 compression: Some("zstd".to_string()),
1381 compression_level: Some(6),
1382 bss: None,
1383 minichunk_size: None,
1384 },
1385 );
1386
1387 let strategy = DefaultCompressionStrategy::with_params(params);
1388
1389 let merged = strategy
1391 .params
1392 .get_field_params("user_id", &DataType::Int32);
1393
1394 assert_eq!(merged.rle_threshold, Some(0.2));
1396 assert_eq!(merged.compression, Some("zstd".to_string()));
1397 assert_eq!(merged.compression_level, Some(6));
1398
1399 let merged = strategy
1401 .params
1402 .get_field_params("other_field", &DataType::Int32);
1403 assert_eq!(merged.rle_threshold, Some(0.5));
1404 assert_eq!(merged.compression, Some("lz4".to_string()));
1405 assert_eq!(merged.compression_level, None);
1406 }
1407
1408 #[test]
1409 fn test_pattern_matching() {
1410 let mut params = CompressionParams::new();
1411
1412 params.columns.insert(
1414 "log_*".to_string(),
1415 CompressionFieldParams {
1416 compression: Some("zstd".to_string()),
1417 compression_level: Some(6),
1418 ..Default::default()
1419 },
1420 );
1421
1422 let strategy = DefaultCompressionStrategy::with_params(params);
1423
1424 let merged = strategy
1426 .params
1427 .get_field_params("log_messages", &DataType::Utf8);
1428 assert_eq!(merged.compression, Some("zstd".to_string()));
1429 assert_eq!(merged.compression_level, Some(6));
1430
1431 let merged = strategy
1433 .params
1434 .get_field_params("messages_log", &DataType::Utf8);
1435 assert_eq!(merged.compression, None);
1436 }
1437
1438 #[test]
1439 fn test_legacy_metadata_support() {
1440 let params = CompressionParams::new();
1441 let strategy = DefaultCompressionStrategy::with_params(params);
1442
1443 let mut metadata = HashMap::new();
1445 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1446 let mut field = create_test_field("some_column", DataType::Int32);
1447 field.metadata = metadata;
1448
1449 let data = create_fixed_width_block(32, 1000);
1450 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1451
1452 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1454 }
1455
1456 #[test]
1457 fn test_default_behavior() {
1458 let params = CompressionParams::new();
1460 let strategy = DefaultCompressionStrategy::with_params(params);
1461
1462 let field = create_test_field("random_column", DataType::Int32);
1463 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1465
1466 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1467 let debug_str = format!("{:?}", compressor);
1469 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1470 }
1471
1472 #[test]
1473 fn test_field_metadata_compression() {
1474 let params = CompressionParams::new();
1475 let strategy = DefaultCompressionStrategy::with_params(params);
1476
1477 let mut metadata = HashMap::new();
1479 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1480 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1481 let mut field = create_test_field("test_column", DataType::Int32);
1482 field.metadata = metadata;
1483
1484 let data = create_fixed_width_block(32, 1000);
1485 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1486
1487 let debug_str = format!("{:?}", compressor);
1489 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1490 }
1491
1492 #[test]
1493 fn test_field_metadata_rle_threshold() {
1494 let params = CompressionParams::new();
1495 let strategy = DefaultCompressionStrategy::with_params(params);
1496
1497 let mut metadata = HashMap::new();
1499 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1500 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1502 field.metadata = metadata;
1503
1504 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1507
1508 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1509
1510 let debug_str = format!("{:?}", compressor);
1512 assert!(debug_str.contains("RleMiniBlockEncoder"));
1513 }
1514
1515 #[test]
1516 fn test_field_metadata_override_params() {
1517 let mut params = CompressionParams::new();
1519 params.columns.insert(
1520 "test_column".to_string(),
1521 CompressionFieldParams {
1522 rle_threshold: Some(0.3),
1523 compression: Some("lz4".to_string()),
1524 compression_level: None,
1525 bss: None,
1526 minichunk_size: None,
1527 },
1528 );
1529
1530 let strategy = DefaultCompressionStrategy::with_params(params);
1531
1532 let mut metadata = HashMap::new();
1534 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1535 let mut field = create_test_field("test_column", DataType::Int32);
1536 field.metadata = metadata;
1537
1538 let data = create_fixed_width_block(32, 1000);
1539 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1540
1541 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1543 }
1544
1545 #[test]
1546 fn test_field_metadata_mixed_configuration() {
1547 let mut params = CompressionParams::new();
1549 params.types.insert(
1550 "Int32".to_string(),
1551 CompressionFieldParams {
1552 rle_threshold: Some(0.5),
1553 compression: Some("lz4".to_string()),
1554 ..Default::default()
1555 },
1556 );
1557
1558 let strategy = DefaultCompressionStrategy::with_params(params);
1559
1560 let mut metadata = HashMap::new();
1562 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1563 let mut field = create_test_field("test_column", DataType::Int32);
1564 field.metadata = metadata;
1565
1566 let data = create_fixed_width_block(32, 1000);
1567 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1568
1569 let debug_str = format!("{:?}", compressor);
1571 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1572 }
1573
1574 #[test]
1575 fn test_bss_field_metadata() {
1576 let params = CompressionParams::new();
1577 let strategy = DefaultCompressionStrategy::with_params(params);
1578
1579 let mut metadata = HashMap::new();
1581 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1582 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1583 let arrow_field =
1584 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1585 let field = Field::try_from(&arrow_field).unwrap();
1586
1587 let data = create_fixed_width_block(32, 100);
1589
1590 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1591 let debug_str = format!("{:?}", compressor);
1592 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1593 }
1594
1595 #[test]
1596 fn test_bss_with_compression() {
1597 let params = CompressionParams::new();
1598 let strategy = DefaultCompressionStrategy::with_params(params);
1599
1600 let mut metadata = HashMap::new();
1602 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1603 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1604 let arrow_field =
1605 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1606 let field = Field::try_from(&arrow_field).unwrap();
1607
1608 let data = create_fixed_width_block(64, 100);
1610
1611 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1612 let debug_str = format!("{:?}", compressor);
1613 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1615 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1616 }
1617
1618 #[test]
1619 #[cfg(any(feature = "lz4", feature = "zstd"))]
1620 fn test_general_block_decompression_fixed_width_v2_2() {
1621 let mut params = CompressionParams::new();
1623 params.columns.insert(
1624 "dict_values".to_string(),
1625 CompressionFieldParams {
1626 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1627 ..Default::default()
1628 },
1629 );
1630
1631 let mut strategy = DefaultCompressionStrategy::with_params(params);
1632 strategy.version = LanceFileVersion::V2_2;
1633
1634 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1635 let data = create_fixed_width_block(24, 1024);
1636 let DataBlock::FixedWidth(expected_block) = &data else {
1637 panic!("expected fixed width block");
1638 };
1639 let expected_bits = expected_block.bits_per_value;
1640 let expected_num_values = expected_block.num_values;
1641 let num_values = expected_num_values;
1642
1643 let (compressor, encoding) = strategy
1644 .create_block_compressor(&field, &data)
1645 .expect("general compression should be selected");
1646 match encoding.compression.as_ref() {
1647 Some(Compression::General(_)) => {}
1648 other => panic!("expected general compression, got {:?}", other),
1649 }
1650
1651 let compressed_buffer = compressor
1652 .compress(data.clone())
1653 .expect("write path general compression should succeed");
1654
1655 let decompressor = DefaultDecompressionStrategy::default()
1656 .create_block_decompressor(&encoding)
1657 .expect("general block decompressor should be created");
1658
1659 let decoded = decompressor
1660 .decompress(compressed_buffer, num_values)
1661 .expect("decompression should succeed");
1662
1663 match decoded {
1664 DataBlock::FixedWidth(block) => {
1665 assert_eq!(block.bits_per_value, expected_bits);
1666 assert_eq!(block.num_values, expected_num_values);
1667 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1668 }
1669 _ => panic!("expected fixed width block"),
1670 }
1671 }
1672}