1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 pb21::{compressive_encoding::Compression, CompressiveEncoding},
60 ProtobufUtils21,
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
75
76const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
78
79pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
92 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
97}
98
99pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
112 fn create_block_compressor(
114 &self,
115 field: &Field,
116 data: &DataBlock,
117 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
118
119 fn create_per_value(
121 &self,
122 field: &Field,
123 data: &DataBlock,
124 ) -> Result<Box<dyn PerValueCompressor>>;
125
126 fn create_miniblock_compressor(
128 &self,
129 field: &Field,
130 data: &DataBlock,
131 ) -> Result<Box<dyn MiniBlockCompressor>>;
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct DefaultCompressionStrategy {
136 params: CompressionParams,
138 version: LanceFileVersion,
140}
141
142fn try_bss_for_mini_block(
143 data: &FixedWidthDataBlock,
144 params: &CompressionFieldParams,
145) -> Option<Box<dyn MiniBlockCompressor>> {
146 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
149 return None;
150 }
151
152 let mode = params.bss.unwrap_or(BssMode::Auto);
153 if should_use_bss(data, mode) {
155 return Some(Box::new(ByteStreamSplitEncoder::new(
156 data.bits_per_value as usize,
157 )));
158 }
159 None
160}
161
162fn try_rle_for_mini_block(
163 data: &FixedWidthDataBlock,
164 params: &CompressionFieldParams,
165) -> Option<Box<dyn MiniBlockCompressor>> {
166 let bits = data.bits_per_value;
167 if !matches!(bits, 8 | 16 | 32 | 64) {
168 return None;
169 }
170
171 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
172 let threshold = params
173 .rle_threshold
174 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
175
176 if (run_count as f64) < (data.num_values as f64) * threshold {
177 return Some(Box::new(RleMiniBlockEncoder::new()));
178 }
179 None
180}
181
182fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
183 #[cfg(feature = "bitpacking")]
184 {
185 use arrow_array::cast::AsArray;
186
187 let bits = _data.bits_per_value;
188 if !matches!(bits, 8 | 16 | 32 | 64) {
189 return None;
190 }
191
192 let bit_widths = _data.expect_stat(Stat::BitWidth);
193 let widths = bit_widths.as_primitive::<UInt64Type>();
194 let too_small = widths.len() == 1
195 && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
196
197 if !too_small {
198 return Some(Box::new(InlineBitpacking::new(bits)));
199 }
200 None
201 }
202 #[cfg(not(feature = "bitpacking"))]
203 {
204 None
205 }
206}
207
208fn try_bitpack_for_block(
209 data: &FixedWidthDataBlock,
210) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
211 let bits = data.bits_per_value;
212 if !matches!(bits, 8 | 16 | 32 | 64) {
213 return None;
214 }
215
216 let bit_widths = data.expect_stat(Stat::BitWidth);
217 let widths = bit_widths.as_primitive::<UInt64Type>();
218 let has_all_zeros = widths.values().contains(&0);
219 let max_bit_width = *widths.values().iter().max().unwrap();
220
221 let too_small =
222 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
223
224 if has_all_zeros || too_small {
225 return None;
226 }
227
228 if data.num_values <= 1024 {
229 let compressor = Box::new(InlineBitpacking::new(bits));
230 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
231 Some((compressor, encoding))
232 } else {
233 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
234 let encoding = ProtobufUtils21::out_of_line_bitpacking(
235 bits,
236 ProtobufUtils21::flat(max_bit_width, None),
237 );
238 Some((compressor, encoding))
239 }
240}
241
242fn maybe_wrap_general_for_mini_block(
243 inner: Box<dyn MiniBlockCompressor>,
244 params: &CompressionFieldParams,
245) -> Result<Box<dyn MiniBlockCompressor>> {
246 match params.compression.as_deref() {
247 None | Some("none") | Some("fsst") => Ok(inner),
248 Some(raw) => {
249 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
250 lance_core::Error::invalid_input(
251 format!("Unknown compression scheme: {raw}"),
252 location!(),
253 )
254 })?;
255 let cfg = CompressionConfig::new(scheme, params.compression_level);
256 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
257 }
258 }
259}
260
261fn try_general_compression(
262 version: LanceFileVersion,
263 field_params: &CompressionFieldParams,
264 data: &DataBlock,
265) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
266 if let Some(compression_scheme) = &field_params.compression {
269 if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
270 let scheme: CompressionScheme = compression_scheme.parse()?;
271 let config = CompressionConfig::new(scheme, field_params.compression_level);
272 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
273 return Ok(Some((compressor, config)));
274 }
275 }
276
277 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
279 && version >= LanceFileVersion::V2_2
280 {
281 let compressor = Box::new(CompressedBufferEncoder::default());
282 let config = compressor.compressor.config();
283 return Ok(Some((compressor, config)));
284 }
285
286 Ok(None)
287}
288
289impl DefaultCompressionStrategy {
290 pub fn new() -> Self {
292 Self::default()
293 }
294
295 pub fn with_params(params: CompressionParams) -> Self {
297 Self {
298 params,
299 version: LanceFileVersion::default(),
300 }
301 }
302
303 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
305 self.version = version;
306 self
307 }
308
309 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
311 let mut params = CompressionFieldParams::default();
312
313 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
315 params.compression = Some(compression.clone());
316 }
317
318 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
320 params.compression_level = level.parse().ok();
321 }
322
323 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
325 params.rle_threshold = threshold.parse().ok();
326 }
327
328 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
330 match BssMode::parse(bss_str) {
331 Some(mode) => params.bss = Some(mode),
332 None => {
333 log::warn!("Invalid BSS mode '{}', using default", bss_str);
334 }
335 }
336 }
337
338 params
339 }
340
341 fn build_fixed_width_compressor(
342 &self,
343 params: &CompressionFieldParams,
344 data: &FixedWidthDataBlock,
345 ) -> Result<Box<dyn MiniBlockCompressor>> {
346 if params.compression.as_deref() == Some("none") {
347 return Ok(Box::new(ValueEncoder::default()));
348 }
349
350 let base = try_bss_for_mini_block(data, params)
351 .or_else(|| try_rle_for_mini_block(data, params))
352 .or_else(|| try_bitpack_for_mini_block(data))
353 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
354
355 maybe_wrap_general_for_mini_block(base, params)
356 }
357
358 fn build_variable_width_compressor(
360 &self,
361 params: &CompressionFieldParams,
362 data: &VariableWidthBlock,
363 ) -> Result<Box<dyn MiniBlockCompressor>> {
364 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
365 return Err(Error::invalid_input(
366 format!(
367 "Variable width compression not supported for {} bit offsets",
368 data.bits_per_offset
369 ),
370 location!(),
371 ));
372 }
373
374 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
376 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
377
378 if params.compression.as_deref() == Some("none") {
380 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
381 }
382
383 if params.compression.as_deref() == Some("fsst") {
385 return Ok(Box::new(FsstMiniBlockEncoder::default()));
386 }
387
388 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
390 >= FSST_LEAST_INPUT_MAX_LENGTH
391 && data_size >= FSST_LEAST_INPUT_SIZE as u64
392 {
393 Box::new(FsstMiniBlockEncoder::default())
394 } else {
395 Box::new(BinaryMiniBlockEncoder::default())
396 };
397
398 if let Some(compression_scheme) = ¶ms.compression {
400 if compression_scheme != "none" && compression_scheme != "fsst" {
401 let scheme: CompressionScheme = compression_scheme.parse()?;
402 let config = CompressionConfig::new(scheme, params.compression_level);
403 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
404 }
405 }
406
407 Ok(base_encoder)
408 }
409
410 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
413 let mut field_params = self
414 .params
415 .get_field_params(&field.name, &field.data_type());
416
417 let metadata_params = Self::parse_field_metadata(field);
419 field_params.merge(&metadata_params);
420
421 field_params
422 }
423}
424
425impl CompressionStrategy for DefaultCompressionStrategy {
426 fn create_miniblock_compressor(
427 &self,
428 field: &Field,
429 data: &DataBlock,
430 ) -> Result<Box<dyn MiniBlockCompressor>> {
431 let field_params = self.get_merged_field_params(field);
432
433 match data {
434 DataBlock::FixedWidth(fixed_width_data) => {
435 self.build_fixed_width_compressor(&field_params, fixed_width_data)
436 }
437 DataBlock::VariableWidth(variable_width_data) => {
438 self.build_variable_width_compressor(&field_params, variable_width_data)
439 }
440 DataBlock::Struct(struct_data_block) => {
441 if struct_data_block.has_variable_width_child() {
444 return Err(Error::invalid_input(
445 "Packed struct mini-block encoding supports only fixed-width children",
446 location!(),
447 ));
448 }
449 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
450 }
451 DataBlock::FixedSizeList(_) => {
452 Ok(Box::new(ValueEncoder::default()))
460 }
461 _ => Err(Error::NotSupported {
462 source: format!(
463 "Mini-block compression not yet supported for block type {}",
464 data.name()
465 )
466 .into(),
467 location: location!(),
468 }),
469 }
470 }
471
472 fn create_per_value(
473 &self,
474 field: &Field,
475 data: &DataBlock,
476 ) -> Result<Box<dyn PerValueCompressor>> {
477 let field_params = self.get_merged_field_params(field);
478
479 match data {
480 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
481 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
482 DataBlock::Struct(struct_block) => {
483 if field.children.len() != struct_block.children.len() {
484 return Err(Error::invalid_input(
485 "Struct field metadata does not match data block children",
486 location!(),
487 ));
488 }
489 let has_variable_child = struct_block.has_variable_width_child();
490 if has_variable_child {
491 if self.version < LanceFileVersion::V2_2 {
492 return Err(Error::NotSupported {
493 source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
494 location: location!(),
495 });
496 }
497 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
498 self.clone(),
499 field.children.clone(),
500 )))
501 } else {
502 Err(Error::invalid_input(
503 "Packed struct per-value compression should not be used for fixed-width-only structs",
504 location!(),
505 ))
506 }
507 }
508 DataBlock::VariableWidth(variable_width) => {
509 if field_params.compression.as_deref() == Some("none") {
511 return Ok(Box::new(VariableEncoder::default()));
512 }
513
514 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
515 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
516
517 let per_value_requested =
522 if let Some(compression) = field_params.compression.as_deref() {
523 compression != "fsst"
524 } else {
525 false
526 };
527
528 if (max_len > 32 * 1024 || per_value_requested)
529 && data_size >= FSST_LEAST_INPUT_SIZE as u64
530 {
531 return Ok(Box::new(CompressedBufferEncoder::default()));
532 }
533
534 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
535 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
536 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
537
538 let variable_compression = Box::new(VariableEncoder::default());
539
540 if field_params.compression.as_deref() == Some("fsst")
542 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
543 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
544 {
545 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
546 } else {
547 Ok(variable_compression)
548 }
549 } else {
550 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
551 }
552 }
553 _ => unreachable!(
554 "Per-value compression not yet supported for block type: {}",
555 data.name()
556 ),
557 }
558 }
559
560 fn create_block_compressor(
561 &self,
562 field: &Field,
563 data: &DataBlock,
564 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
565 let field_params = self.get_merged_field_params(field);
566
567 match data {
568 DataBlock::FixedWidth(fixed_width) => {
569 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
570 return Ok((compressor, encoding));
571 }
572
573 if let Some((compressor, config)) =
575 try_general_compression(self.version, &field_params, data)?
576 {
577 let encoding = ProtobufUtils21::wrapped(
578 config,
579 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
580 )?;
581 return Ok((compressor, encoding));
582 }
583
584 let encoder = Box::new(ValueEncoder::default());
585 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
586 Ok((encoder, encoding))
587 }
588 DataBlock::VariableWidth(variable_width) => {
589 if let Some((compressor, config)) =
591 try_general_compression(self.version, &field_params, data)?
592 {
593 let encoding = ProtobufUtils21::wrapped(
594 config,
595 ProtobufUtils21::variable(
596 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
597 None,
598 ),
599 )?;
600 return Ok((compressor, encoding));
601 }
602
603 let encoder = Box::new(VariableEncoder::default());
604 let encoding = ProtobufUtils21::variable(
605 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
606 None,
607 );
608 Ok((encoder, encoding))
609 }
610 _ => unreachable!(),
611 }
612 }
613}
614
615pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
616 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
617}
618
619pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
620 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
622 fn bits_per_value(&self) -> u64;
626}
627
628pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
629 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
631}
632
633pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
634 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
635}
636
637pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
638 fn create_miniblock_decompressor(
639 &self,
640 description: &CompressiveEncoding,
641 decompression_strategy: &dyn DecompressionStrategy,
642 ) -> Result<Box<dyn MiniBlockDecompressor>>;
643
644 fn create_fixed_per_value_decompressor(
645 &self,
646 description: &CompressiveEncoding,
647 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
648
649 fn create_variable_per_value_decompressor(
650 &self,
651 description: &CompressiveEncoding,
652 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
653
654 fn create_block_decompressor(
655 &self,
656 description: &CompressiveEncoding,
657 ) -> Result<Box<dyn BlockDecompressor>>;
658}
659
660#[derive(Debug, Default)]
661pub struct DefaultDecompressionStrategy {}
662
663impl DecompressionStrategy for DefaultDecompressionStrategy {
664 fn create_miniblock_decompressor(
665 &self,
666 description: &CompressiveEncoding,
667 decompression_strategy: &dyn DecompressionStrategy,
668 ) -> Result<Box<dyn MiniBlockDecompressor>> {
669 match description.compression.as_ref().unwrap() {
670 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
671 #[cfg(feature = "bitpacking")]
672 Compression::InlineBitpacking(description) => {
673 Ok(Box::new(InlineBitpacking::from_description(description)))
674 }
675 #[cfg(not(feature = "bitpacking"))]
676 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
677 source: "this runtime was not built with bitpacking support".into(),
678 location: location!(),
679 }),
680 Compression::Variable(variable) => {
681 let Compression::Flat(offsets) = variable
682 .offsets
683 .as_ref()
684 .unwrap()
685 .compression
686 .as_ref()
687 .unwrap()
688 else {
689 panic!("Variable compression only supports flat offsets")
690 };
691 Ok(Box::new(BinaryMiniBlockDecompressor::new(
692 offsets.bits_per_value as u8,
693 )))
694 }
695 Compression::Fsst(description) => {
696 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
697 description.values.as_ref().unwrap(),
698 decompression_strategy,
699 )?;
700 Ok(Box::new(FsstMiniBlockDecompressor::new(
701 description,
702 inner_decompressor,
703 )))
704 }
705 Compression::PackedStruct(description) => Ok(Box::new(
706 PackedStructFixedWidthMiniBlockDecompressor::new(description),
707 )),
708 Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
709 source: "variable packed struct decoding is not yet implemented".into(),
710 location: location!(),
711 }),
712 Compression::FixedSizeList(fsl) => {
713 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
716 }
717 Compression::Rle(rle) => {
718 let Compression::Flat(values) =
719 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
720 else {
721 panic!("RLE compression only supports flat values")
722 };
723 let Compression::Flat(run_lengths) = rle
724 .run_lengths
725 .as_ref()
726 .unwrap()
727 .compression
728 .as_ref()
729 .unwrap()
730 else {
731 panic!("RLE compression only supports flat run lengths")
732 };
733 assert_eq!(
734 run_lengths.bits_per_value, 8,
735 "RLE compression only supports 8-bit run lengths"
736 );
737 Ok(Box::new(RleMiniBlockDecompressor::new(
738 values.bits_per_value,
739 )))
740 }
741 Compression::ByteStreamSplit(bss) => {
742 let Compression::Flat(values) =
743 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
744 else {
745 panic!("ByteStreamSplit compression only supports flat values")
746 };
747 Ok(Box::new(ByteStreamSplitDecompressor::new(
748 values.bits_per_value as usize,
749 )))
750 }
751 Compression::General(general) => {
752 let inner_decompressor = self.create_miniblock_decompressor(
754 general.values.as_ref().ok_or_else(|| {
755 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
756 })?,
757 decompression_strategy,
758 )?;
759
760 let compression = general.compression.as_ref().ok_or_else(|| {
762 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
763 })?;
764
765 let scheme = compression.scheme().try_into()?;
766
767 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
768 scheme,
769 compression.level,
770 );
771
772 Ok(Box::new(GeneralMiniBlockDecompressor::new(
773 inner_decompressor,
774 compression_config,
775 )))
776 }
777 _ => todo!(),
778 }
779 }
780
781 fn create_fixed_per_value_decompressor(
782 &self,
783 description: &CompressiveEncoding,
784 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
785 match description.compression.as_ref().unwrap() {
786 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
787 constant
788 .value
789 .as_ref()
790 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
791 ))),
792 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
793 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
794 _ => todo!("fixed-per-value decompressor for {:?}", description),
795 }
796 }
797
798 fn create_variable_per_value_decompressor(
799 &self,
800 description: &CompressiveEncoding,
801 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
802 match description.compression.as_ref().unwrap() {
803 Compression::Variable(variable) => {
804 let Compression::Flat(offsets) = variable
805 .offsets
806 .as_ref()
807 .unwrap()
808 .compression
809 .as_ref()
810 .unwrap()
811 else {
812 panic!("Variable compression only supports flat offsets")
813 };
814 assert!(offsets.bits_per_value < u8::MAX as u64);
815 Ok(Box::new(VariableDecoder::default()))
816 }
817 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
818 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
819 Box::new(VariableDecoder::default()),
820 ))),
821 Compression::General(ref general) => {
822 Ok(Box::new(CompressedBufferEncoder::from_scheme(
823 general.compression.as_ref().expect_ok()?.scheme(),
824 )?))
825 }
826 Compression::VariablePackedStruct(description) => {
827 let mut fields = Vec::with_capacity(description.fields.len());
828 for field in &description.fields {
829 let value_encoding = field.value.as_ref().ok_or_else(|| {
830 Error::invalid_input(
831 "VariablePackedStruct field is missing value encoding",
832 location!(),
833 )
834 })?;
835 let decoder = match field.layout.as_ref().ok_or_else(|| {
836 Error::invalid_input(
837 "VariablePackedStruct field is missing layout details",
838 location!(),
839 )
840 })? {
841 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
842 bits_per_value,
843 ) => {
844 let decompressor =
845 self.create_fixed_per_value_decompressor(value_encoding)?;
846 VariablePackedStructFieldDecoder {
847 kind: VariablePackedStructFieldKind::Fixed {
848 bits_per_value: *bits_per_value,
849 decompressor: Arc::from(decompressor),
850 },
851 }
852 }
853 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
854 bits_per_length,
855 ) => {
856 let decompressor =
857 self.create_variable_per_value_decompressor(value_encoding)?;
858 VariablePackedStructFieldDecoder {
859 kind: VariablePackedStructFieldKind::Variable {
860 bits_per_length: *bits_per_length,
861 decompressor: Arc::from(decompressor),
862 },
863 }
864 }
865 };
866 fields.push(decoder);
867 }
868 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
869 fields,
870 )))
871 }
872 _ => todo!("variable-per-value decompressor for {:?}", description),
873 }
874 }
875
876 fn create_block_decompressor(
877 &self,
878 description: &CompressiveEncoding,
879 ) -> Result<Box<dyn BlockDecompressor>> {
880 match description.compression.as_ref().unwrap() {
881 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
882 InlineBitpacking::from_description(inline_bitpacking),
883 )),
884 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
885 Compression::Constant(constant) => {
886 let scalar = constant
887 .value
888 .as_ref()
889 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
890 Ok(Box::new(ConstantDecompressor::new(scalar)))
891 }
892 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
893 Compression::FixedSizeList(fsl) => {
894 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
895 }
896 Compression::OutOfLineBitpacking(out_of_line) => {
897 let compressed_bit_width = match out_of_line
899 .values
900 .as_ref()
901 .unwrap()
902 .compression
903 .as_ref()
904 .unwrap()
905 {
906 Compression::Flat(flat) => flat.bits_per_value,
907 _ => {
908 return Err(Error::InvalidInput {
909 location: location!(),
910 source: "OutOfLineBitpacking values must use Flat encoding".into(),
911 })
912 }
913 };
914 Ok(Box::new(OutOfLineBitpacking::new(
915 compressed_bit_width,
916 out_of_line.uncompressed_bits_per_value,
917 )))
918 }
919 Compression::General(general) => {
920 let inner_desc = general
921 .values
922 .as_ref()
923 .ok_or_else(|| {
924 Error::invalid_input(
925 "General compression missing inner encoding",
926 location!(),
927 )
928 })?
929 .as_ref();
930 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
931
932 let compression = general.compression.as_ref().ok_or_else(|| {
933 Error::invalid_input(
934 "General compression missing compression config",
935 location!(),
936 )
937 })?;
938 let scheme = compression.scheme().try_into()?;
939 let config = CompressionConfig::new(scheme, compression.level);
940 let general_decompressor =
941 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
942
943 Ok(Box::new(general_decompressor))
944 }
945 _ => todo!(),
946 }
947 }
948}
949
950#[cfg(test)]
951mod tests {
952 use super::*;
953 use crate::buffer::LanceBuffer;
954 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
955 use crate::testing::extract_array_encoding_chain;
956 use arrow_schema::{DataType, Field as ArrowField};
957 use std::collections::HashMap;
958
959 fn create_test_field(name: &str, data_type: DataType) -> Field {
960 let arrow_field = ArrowField::new(name, data_type, true);
961 let mut field = Field::try_from(&arrow_field).unwrap();
962 field.id = -1;
963 field
964 }
965
966 fn create_fixed_width_block_with_stats(
967 bits_per_value: u64,
968 num_values: u64,
969 run_count: u64,
970 ) -> DataBlock {
971 let bytes_per_value = (bits_per_value / 8) as usize;
973 let total_bytes = bytes_per_value * num_values as usize;
974 let mut data = vec![0u8; total_bytes];
975
976 let values_per_run = (num_values / run_count).max(1);
978 let mut run_value = 0u8;
979
980 for i in 0..num_values as usize {
981 if i % values_per_run as usize == 0 {
982 run_value = run_value.wrapping_add(17); }
984 for j in 0..bytes_per_value {
986 let byte_offset = i * bytes_per_value + j;
987 if byte_offset < data.len() {
988 data[byte_offset] = run_value.wrapping_add(j as u8);
989 }
990 }
991 }
992
993 let mut block = FixedWidthDataBlock {
994 bits_per_value,
995 data: LanceBuffer::reinterpret_vec(data),
996 num_values,
997 block_info: BlockInfo::default(),
998 };
999
1000 use crate::statistics::ComputeStat;
1002 block.compute_stat();
1003
1004 DataBlock::FixedWidth(block)
1005 }
1006
1007 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1008 let bytes_per_value = (bits_per_value / 8) as usize;
1010 let total_bytes = bytes_per_value * num_values as usize;
1011 let mut data = vec![0u8; total_bytes];
1012
1013 for i in 0..num_values as usize {
1015 let byte_offset = i * bytes_per_value;
1016 if byte_offset < data.len() {
1017 data[byte_offset] = (i % 256) as u8;
1018 }
1019 }
1020
1021 let mut block = FixedWidthDataBlock {
1022 bits_per_value,
1023 data: LanceBuffer::reinterpret_vec(data),
1024 num_values,
1025 block_info: BlockInfo::default(),
1026 };
1027
1028 use crate::statistics::ComputeStat;
1030 block.compute_stat();
1031
1032 DataBlock::FixedWidth(block)
1033 }
1034
1035 fn create_variable_width_block(
1036 bits_per_offset: u8,
1037 num_values: u64,
1038 avg_value_size: usize,
1039 ) -> DataBlock {
1040 use crate::statistics::ComputeStat;
1041
1042 let mut offsets = Vec::with_capacity((num_values + 1) as usize);
1044 let mut current_offset = 0i64;
1045 offsets.push(current_offset);
1046
1047 for i in 0..num_values {
1049 let value_size = if avg_value_size == 0 {
1050 1
1051 } else {
1052 ((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
1053 .min(avg_value_size * 2)
1054 };
1055 current_offset += value_size as i64;
1056 offsets.push(current_offset);
1057 }
1058
1059 let total_data_size = current_offset as usize;
1061 let mut data = vec![0u8; total_data_size];
1062
1063 for i in 0..num_values {
1065 let start_offset = offsets[i as usize] as usize;
1066 let end_offset = offsets[(i + 1) as usize] as usize;
1067
1068 let content = (i % 256) as u8;
1069 for j in 0..end_offset - start_offset {
1070 data[start_offset + j] = content.wrapping_add(j as u8);
1071 }
1072 }
1073
1074 let offsets_buffer = match bits_per_offset {
1076 32 => {
1077 let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
1078 LanceBuffer::reinterpret_vec(offsets_32)
1079 }
1080 64 => LanceBuffer::reinterpret_vec(offsets),
1081 _ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
1082 };
1083
1084 let mut block = VariableWidthBlock {
1085 data: LanceBuffer::from(data),
1086 offsets: offsets_buffer,
1087 bits_per_offset,
1088 num_values,
1089 block_info: BlockInfo::default(),
1090 };
1091
1092 block.compute_stat();
1093 DataBlock::VariableWidth(block)
1094 }
1095
1096 #[test]
1097 fn test_parameter_based_compression() {
1098 let mut params = CompressionParams::new();
1099
1100 params.columns.insert(
1102 "*_id".to_string(),
1103 CompressionFieldParams {
1104 rle_threshold: Some(0.3),
1105 compression: Some("lz4".to_string()),
1106 compression_level: None,
1107 bss: Some(BssMode::Off), },
1109 );
1110
1111 let strategy = DefaultCompressionStrategy::with_params(params);
1112 let field = create_test_field("user_id", DataType::Int32);
1113
1114 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1119 let debug_str = format!("{:?}", compressor);
1121
1122 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1124 assert!(debug_str.contains("RleMiniBlockEncoder"));
1125 }
1126
1127 #[test]
1128 fn test_type_level_parameters() {
1129 let mut params = CompressionParams::new();
1130
1131 params.types.insert(
1133 "Int32".to_string(),
1134 CompressionFieldParams {
1135 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1137 compression_level: Some(3),
1138 bss: Some(BssMode::Off), },
1140 );
1141
1142 let strategy = DefaultCompressionStrategy::with_params(params);
1143 let field = create_test_field("some_column", DataType::Int32);
1144 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1146
1147 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1148 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1150 }
1151
1152 fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
1153 let chain = extract_array_encoding_chain(encoding);
1154 if variable {
1155 assert_eq!(chain.len(), 2);
1156 assert_eq!(chain.first().unwrap().as_str(), "variable");
1157 assert_eq!(chain.get(1).unwrap().as_str(), "flat");
1158 } else {
1159 assert_eq!(chain.len(), 1);
1160 assert_eq!(chain.first().unwrap().as_str(), "flat");
1161 }
1162 }
1163
1164 #[test]
1165 fn test_none_compression() {
1166 let mut params = CompressionParams::new();
1167
1168 params.columns.insert(
1170 "embeddings".to_string(),
1171 CompressionFieldParams {
1172 compression: Some("none".to_string()),
1173 ..Default::default()
1174 },
1175 );
1176
1177 let strategy = DefaultCompressionStrategy::with_params(params);
1178 let field = create_test_field("embeddings", DataType::Float32);
1179 let fixed_data = create_fixed_width_block(32, 1000);
1180 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1181
1182 let compressor = strategy
1184 .create_miniblock_compressor(&field, &fixed_data)
1185 .unwrap();
1186 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1187 check_uncompressed_encoding(&encoding, false);
1188 let compressor = strategy
1189 .create_miniblock_compressor(&field, &variable_data)
1190 .unwrap();
1191 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1192 check_uncompressed_encoding(&encoding, true);
1193
1194 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1196 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1197 check_uncompressed_encoding(&encoding, false);
1198 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1199 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1200 check_uncompressed_encoding(&encoding, true);
1201 }
1202
1203 #[test]
1204 fn test_field_metadata_none_compression() {
1205 let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
1207 let mut metadata = HashMap::new();
1208 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1209 arrow_field = arrow_field.with_metadata(metadata);
1210 let field = Field::try_from(&arrow_field).unwrap();
1211
1212 let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
1213
1214 let fixed_data = create_fixed_width_block(32, 1000);
1216 let variable_data = create_variable_width_block(32, 10, 32 * 1024);
1217
1218 let compressor = strategy
1219 .create_miniblock_compressor(&field, &fixed_data)
1220 .unwrap();
1221 let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
1222 check_uncompressed_encoding(&encoding, false);
1223
1224 let compressor = strategy
1225 .create_miniblock_compressor(&field, &variable_data)
1226 .unwrap();
1227 let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
1228 check_uncompressed_encoding(&encoding, true);
1229
1230 let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
1232 let (_block, encoding) = compressor.compress(fixed_data).unwrap();
1233 check_uncompressed_encoding(&encoding, false);
1234
1235 let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
1236 let (_block, encoding) = compressor.compress(variable_data).unwrap();
1237 check_uncompressed_encoding(&encoding, true);
1238 }
1239
1240 #[test]
1241 fn test_parameter_merge_priority() {
1242 let mut params = CompressionParams::new();
1243
1244 params.types.insert(
1246 "Int32".to_string(),
1247 CompressionFieldParams {
1248 rle_threshold: Some(0.5),
1249 compression: Some("lz4".to_string()),
1250 ..Default::default()
1251 },
1252 );
1253
1254 params.columns.insert(
1256 "user_id".to_string(),
1257 CompressionFieldParams {
1258 rle_threshold: Some(0.2),
1259 compression: Some("zstd".to_string()),
1260 compression_level: Some(6),
1261 bss: None,
1262 },
1263 );
1264
1265 let strategy = DefaultCompressionStrategy::with_params(params);
1266
1267 let merged = strategy
1269 .params
1270 .get_field_params("user_id", &DataType::Int32);
1271
1272 assert_eq!(merged.rle_threshold, Some(0.2));
1274 assert_eq!(merged.compression, Some("zstd".to_string()));
1275 assert_eq!(merged.compression_level, Some(6));
1276
1277 let merged = strategy
1279 .params
1280 .get_field_params("other_field", &DataType::Int32);
1281 assert_eq!(merged.rle_threshold, Some(0.5));
1282 assert_eq!(merged.compression, Some("lz4".to_string()));
1283 assert_eq!(merged.compression_level, None);
1284 }
1285
1286 #[test]
1287 fn test_pattern_matching() {
1288 let mut params = CompressionParams::new();
1289
1290 params.columns.insert(
1292 "log_*".to_string(),
1293 CompressionFieldParams {
1294 compression: Some("zstd".to_string()),
1295 compression_level: Some(6),
1296 ..Default::default()
1297 },
1298 );
1299
1300 let strategy = DefaultCompressionStrategy::with_params(params);
1301
1302 let merged = strategy
1304 .params
1305 .get_field_params("log_messages", &DataType::Utf8);
1306 assert_eq!(merged.compression, Some("zstd".to_string()));
1307 assert_eq!(merged.compression_level, Some(6));
1308
1309 let merged = strategy
1311 .params
1312 .get_field_params("messages_log", &DataType::Utf8);
1313 assert_eq!(merged.compression, None);
1314 }
1315
1316 #[test]
1317 fn test_legacy_metadata_support() {
1318 let params = CompressionParams::new();
1319 let strategy = DefaultCompressionStrategy::with_params(params);
1320
1321 let mut metadata = HashMap::new();
1323 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1324 let mut field = create_test_field("some_column", DataType::Int32);
1325 field.metadata = metadata;
1326
1327 let data = create_fixed_width_block(32, 1000);
1328 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1329
1330 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1332 }
1333
1334 #[test]
1335 fn test_default_behavior() {
1336 let params = CompressionParams::new();
1338 let strategy = DefaultCompressionStrategy::with_params(params);
1339
1340 let field = create_test_field("random_column", DataType::Int32);
1341 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1343
1344 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1345 let debug_str = format!("{:?}", compressor);
1347 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1348 }
1349
1350 #[test]
1351 fn test_field_metadata_compression() {
1352 let params = CompressionParams::new();
1353 let strategy = DefaultCompressionStrategy::with_params(params);
1354
1355 let mut metadata = HashMap::new();
1357 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1358 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1359 let mut field = create_test_field("test_column", DataType::Int32);
1360 field.metadata = metadata;
1361
1362 let data = create_fixed_width_block(32, 1000);
1363 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1364
1365 let debug_str = format!("{:?}", compressor);
1367 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1368 }
1369
1370 #[test]
1371 fn test_field_metadata_rle_threshold() {
1372 let params = CompressionParams::new();
1373 let strategy = DefaultCompressionStrategy::with_params(params);
1374
1375 let mut metadata = HashMap::new();
1377 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1378 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1380 field.metadata = metadata;
1381
1382 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1385
1386 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1387
1388 let debug_str = format!("{:?}", compressor);
1390 assert!(debug_str.contains("RleMiniBlockEncoder"));
1391 }
1392
1393 #[test]
1394 fn test_field_metadata_override_params() {
1395 let mut params = CompressionParams::new();
1397 params.columns.insert(
1398 "test_column".to_string(),
1399 CompressionFieldParams {
1400 rle_threshold: Some(0.3),
1401 compression: Some("lz4".to_string()),
1402 compression_level: None,
1403 bss: None,
1404 },
1405 );
1406
1407 let strategy = DefaultCompressionStrategy::with_params(params);
1408
1409 let mut metadata = HashMap::new();
1411 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1412 let mut field = create_test_field("test_column", DataType::Int32);
1413 field.metadata = metadata;
1414
1415 let data = create_fixed_width_block(32, 1000);
1416 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1417
1418 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1420 }
1421
1422 #[test]
1423 fn test_field_metadata_mixed_configuration() {
1424 let mut params = CompressionParams::new();
1426 params.types.insert(
1427 "Int32".to_string(),
1428 CompressionFieldParams {
1429 rle_threshold: Some(0.5),
1430 compression: Some("lz4".to_string()),
1431 ..Default::default()
1432 },
1433 );
1434
1435 let strategy = DefaultCompressionStrategy::with_params(params);
1436
1437 let mut metadata = HashMap::new();
1439 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1440 let mut field = create_test_field("test_column", DataType::Int32);
1441 field.metadata = metadata;
1442
1443 let data = create_fixed_width_block(32, 1000);
1444 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1445
1446 let debug_str = format!("{:?}", compressor);
1448 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1449 }
1450
1451 #[test]
1452 fn test_bss_field_metadata() {
1453 let params = CompressionParams::new();
1454 let strategy = DefaultCompressionStrategy::with_params(params);
1455
1456 let mut metadata = HashMap::new();
1458 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1459 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1460 let arrow_field =
1461 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1462 let field = Field::try_from(&arrow_field).unwrap();
1463
1464 let data = create_fixed_width_block(32, 100);
1466
1467 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1468 let debug_str = format!("{:?}", compressor);
1469 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1470 }
1471
1472 #[test]
1473 fn test_bss_with_compression() {
1474 let params = CompressionParams::new();
1475 let strategy = DefaultCompressionStrategy::with_params(params);
1476
1477 let mut metadata = HashMap::new();
1479 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1480 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1481 let arrow_field =
1482 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1483 let field = Field::try_from(&arrow_field).unwrap();
1484
1485 let data = create_fixed_width_block(64, 100);
1487
1488 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1489 let debug_str = format!("{:?}", compressor);
1490 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1492 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1493 }
1494
1495 #[test]
1496 #[cfg(any(feature = "lz4", feature = "zstd"))]
1497 fn test_general_block_decompression_fixed_width_v2_2() {
1498 let mut params = CompressionParams::new();
1500 params.columns.insert(
1501 "dict_values".to_string(),
1502 CompressionFieldParams {
1503 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1504 ..Default::default()
1505 },
1506 );
1507
1508 let mut strategy = DefaultCompressionStrategy::with_params(params);
1509 strategy.version = LanceFileVersion::V2_2;
1510
1511 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1512 let data = create_fixed_width_block(24, 1024);
1513 let DataBlock::FixedWidth(expected_block) = &data else {
1514 panic!("expected fixed width block");
1515 };
1516 let expected_bits = expected_block.bits_per_value;
1517 let expected_num_values = expected_block.num_values;
1518 let num_values = expected_num_values;
1519
1520 let (compressor, encoding) = strategy
1521 .create_block_compressor(&field, &data)
1522 .expect("general compression should be selected");
1523 match encoding.compression.as_ref() {
1524 Some(Compression::General(_)) => {}
1525 other => panic!("expected general compression, got {:?}", other),
1526 }
1527
1528 let compressed_buffer = compressor
1529 .compress(data.clone())
1530 .expect("write path general compression should succeed");
1531
1532 let decompressor = DefaultDecompressionStrategy::default()
1533 .create_block_decompressor(&encoding)
1534 .expect("general block decompressor should be created");
1535
1536 let decoded = decompressor
1537 .decompress(compressed_buffer, num_values)
1538 .expect("decompression should succeed");
1539
1540 match decoded {
1541 DataBlock::FixedWidth(block) => {
1542 assert_eq!(block.bits_per_value, expected_bits);
1543 assert_eq!(block.num_values, expected_num_values);
1544 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1545 }
1546 _ => panic!("expected fixed width block"),
1547 }
1548 }
1549}