1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{
36 CompressedBufferEncoder, CompressionConfig, CompressionScheme,
37 GeneralBlockDecompressor,
38 },
39 byte_stream_split::{
40 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
41 },
42 constant::ConstantDecompressor,
43 fsst::{
44 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
45 FsstPerValueEncoder,
46 },
47 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
48 packed::{
49 PackedStructFixedWidthMiniBlockDecompressor,
50 PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51 PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52 VariablePackedStructFieldKind,
53 },
54 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
55 value::{ValueDecompressor, ValueEncoder},
56 },
57 },
58 format::{
59 pb21::{compressive_encoding::Compression, CompressiveEncoding},
60 ProtobufUtils21,
61 },
62 statistics::{GetStat, Stat},
63 version::LanceFileVersion,
64};
65
66use arrow_array::{cast::AsArray, types::UInt64Type};
67use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
68use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
69use snafu::location;
70use std::{str::FromStr, sync::Arc};
71
72const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
75
76const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
78
79pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
92 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
97}
98
99pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
112 fn create_block_compressor(
114 &self,
115 field: &Field,
116 data: &DataBlock,
117 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
118
119 fn create_per_value(
121 &self,
122 field: &Field,
123 data: &DataBlock,
124 ) -> Result<Box<dyn PerValueCompressor>>;
125
126 fn create_miniblock_compressor(
128 &self,
129 field: &Field,
130 data: &DataBlock,
131 ) -> Result<Box<dyn MiniBlockCompressor>>;
132}
133
134#[derive(Debug, Default, Clone)]
135pub struct DefaultCompressionStrategy {
136 params: CompressionParams,
138 version: LanceFileVersion,
140}
141
142fn try_bss_for_mini_block(
143 data: &FixedWidthDataBlock,
144 params: &CompressionFieldParams,
145) -> Option<Box<dyn MiniBlockCompressor>> {
146 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
149 return None;
150 }
151
152 let mode = params.bss.unwrap_or(BssMode::Auto);
153 if should_use_bss(data, mode) {
155 return Some(Box::new(ByteStreamSplitEncoder::new(
156 data.bits_per_value as usize,
157 )));
158 }
159 None
160}
161
162fn try_rle_for_mini_block(
163 data: &FixedWidthDataBlock,
164 params: &CompressionFieldParams,
165) -> Option<Box<dyn MiniBlockCompressor>> {
166 let bits = data.bits_per_value;
167 if !matches!(bits, 8 | 16 | 32 | 64) {
168 return None;
169 }
170
171 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
172 let threshold = params
173 .rle_threshold
174 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
175
176 if (run_count as f64) < (data.num_values as f64) * threshold {
177 return Some(Box::new(RleMiniBlockEncoder::new()));
178 }
179 None
180}
181
182fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
183 #[cfg(feature = "bitpacking")]
184 {
185 use arrow_array::cast::AsArray;
186
187 let bits = _data.bits_per_value;
188 if !matches!(bits, 8 | 16 | 32 | 64) {
189 return None;
190 }
191
192 let bit_widths = _data.expect_stat(Stat::BitWidth);
193 let widths = bit_widths.as_primitive::<UInt64Type>();
194 let too_small = widths.len() == 1
195 && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
196
197 if !too_small {
198 return Some(Box::new(InlineBitpacking::new(bits)));
199 }
200 None
201 }
202 #[cfg(not(feature = "bitpacking"))]
203 {
204 None
205 }
206}
207
208fn try_bitpack_for_block(
209 data: &FixedWidthDataBlock,
210) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
211 let bits = data.bits_per_value;
212 if !matches!(bits, 8 | 16 | 32 | 64) {
213 return None;
214 }
215
216 let bit_widths = data.expect_stat(Stat::BitWidth);
217 let widths = bit_widths.as_primitive::<UInt64Type>();
218 let has_all_zeros = widths.values().contains(&0);
219 let max_bit_width = *widths.values().iter().max().unwrap();
220
221 let too_small =
222 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
223
224 if has_all_zeros || too_small {
225 return None;
226 }
227
228 if data.num_values <= 1024 {
229 let compressor = Box::new(InlineBitpacking::new(bits));
230 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
231 Some((compressor, encoding))
232 } else {
233 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
234 let encoding = ProtobufUtils21::out_of_line_bitpacking(
235 bits,
236 ProtobufUtils21::flat(max_bit_width, None),
237 );
238 Some((compressor, encoding))
239 }
240}
241
242fn maybe_wrap_general_for_mini_block(
243 inner: Box<dyn MiniBlockCompressor>,
244 params: &CompressionFieldParams,
245) -> Result<Box<dyn MiniBlockCompressor>> {
246 match params.compression.as_deref() {
247 None | Some("none") | Some("fsst") => Ok(inner),
248 Some(raw) => {
249 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
250 lance_core::Error::invalid_input(
251 format!("Unknown compression scheme: {raw}"),
252 location!(),
253 )
254 })?;
255 let cfg = CompressionConfig::new(scheme, params.compression_level);
256 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
257 }
258 }
259}
260
261fn try_general_compression(
262 version: LanceFileVersion,
263 field_params: &CompressionFieldParams,
264 data: &DataBlock,
265) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
266 if let Some(compression_scheme) = &field_params.compression {
269 if compression_scheme != "none" && version >= LanceFileVersion::V2_2 {
270 let scheme: CompressionScheme = compression_scheme.parse()?;
271 let config = CompressionConfig::new(scheme, field_params.compression_level);
272 let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
273 return Ok(Some((compressor, config)));
274 }
275 }
276
277 if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
279 && version >= LanceFileVersion::V2_2
280 {
281 let compressor = Box::new(CompressedBufferEncoder::default());
282 let config = compressor.compressor.config();
283 return Ok(Some((compressor, config)));
284 }
285
286 Ok(None)
287}
288
289impl DefaultCompressionStrategy {
290 pub fn new() -> Self {
292 Self::default()
293 }
294
295 pub fn with_params(params: CompressionParams) -> Self {
297 Self {
298 params,
299 version: LanceFileVersion::default(),
300 }
301 }
302
303 pub fn with_version(mut self, version: LanceFileVersion) -> Self {
305 self.version = version;
306 self
307 }
308
309 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
311 let mut params = CompressionFieldParams::default();
312
313 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
315 params.compression = Some(compression.clone());
316 }
317
318 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
320 params.compression_level = level.parse().ok();
321 }
322
323 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
325 params.rle_threshold = threshold.parse().ok();
326 }
327
328 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
330 match BssMode::parse(bss_str) {
331 Some(mode) => params.bss = Some(mode),
332 None => {
333 log::warn!("Invalid BSS mode '{}', using default", bss_str);
334 }
335 }
336 }
337
338 params
339 }
340
341 fn build_fixed_width_compressor(
342 &self,
343 params: &CompressionFieldParams,
344 data: &FixedWidthDataBlock,
345 ) -> Result<Box<dyn MiniBlockCompressor>> {
346 if params.compression.as_deref() == Some("none") {
347 return Ok(Box::new(ValueEncoder::default()));
348 }
349
350 let base = try_bss_for_mini_block(data, params)
351 .or_else(|| try_rle_for_mini_block(data, params))
352 .or_else(|| try_bitpack_for_mini_block(data))
353 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
354
355 maybe_wrap_general_for_mini_block(base, params)
356 }
357
358 fn build_variable_width_compressor(
360 &self,
361 params: &CompressionFieldParams,
362 data: &VariableWidthBlock,
363 ) -> Result<Box<dyn MiniBlockCompressor>> {
364 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
365 return Err(Error::invalid_input(
366 format!(
367 "Variable width compression not supported for {} bit offsets",
368 data.bits_per_offset
369 ),
370 location!(),
371 ));
372 }
373
374 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
376 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
377
378 if params.compression.as_deref() == Some("none") {
380 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
381 }
382
383 if params.compression.as_deref() == Some("fsst") {
385 return Ok(Box::new(FsstMiniBlockEncoder::default()));
386 }
387
388 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
390 >= FSST_LEAST_INPUT_MAX_LENGTH
391 && data_size >= FSST_LEAST_INPUT_SIZE as u64
392 {
393 Box::new(FsstMiniBlockEncoder::default())
394 } else {
395 Box::new(BinaryMiniBlockEncoder::default())
396 };
397
398 if let Some(compression_scheme) = ¶ms.compression {
400 if compression_scheme != "none" && compression_scheme != "fsst" {
401 let scheme: CompressionScheme = compression_scheme.parse()?;
402 let config = CompressionConfig::new(scheme, params.compression_level);
403 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
404 }
405 }
406
407 Ok(base_encoder)
408 }
409
410 fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
413 let mut field_params = self
414 .params
415 .get_field_params(&field.name, &field.data_type());
416
417 let metadata_params = Self::parse_field_metadata(field);
419 field_params.merge(&metadata_params);
420
421 field_params
422 }
423}
424
425impl CompressionStrategy for DefaultCompressionStrategy {
426 fn create_miniblock_compressor(
427 &self,
428 field: &Field,
429 data: &DataBlock,
430 ) -> Result<Box<dyn MiniBlockCompressor>> {
431 let field_params = self.get_merged_field_params(field);
432
433 match data {
434 DataBlock::FixedWidth(fixed_width_data) => {
435 self.build_fixed_width_compressor(&field_params, fixed_width_data)
436 }
437 DataBlock::VariableWidth(variable_width_data) => {
438 self.build_variable_width_compressor(&field_params, variable_width_data)
439 }
440 DataBlock::Struct(struct_data_block) => {
441 if struct_data_block.has_variable_width_child() {
444 return Err(Error::invalid_input(
445 "Packed struct mini-block encoding supports only fixed-width children",
446 location!(),
447 ));
448 }
449 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
450 }
451 DataBlock::FixedSizeList(_) => {
452 Ok(Box::new(ValueEncoder::default()))
460 }
461 _ => Err(Error::NotSupported {
462 source: format!(
463 "Mini-block compression not yet supported for block type {}",
464 data.name()
465 )
466 .into(),
467 location: location!(),
468 }),
469 }
470 }
471
472 fn create_per_value(
473 &self,
474 field: &Field,
475 data: &DataBlock,
476 ) -> Result<Box<dyn PerValueCompressor>> {
477 let field_params = Self::parse_field_metadata(field);
478
479 match data {
480 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
481 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
482 DataBlock::Struct(struct_block) => {
483 if field.children.len() != struct_block.children.len() {
484 return Err(Error::invalid_input(
485 "Struct field metadata does not match data block children",
486 location!(),
487 ));
488 }
489 let has_variable_child = struct_block.has_variable_width_child();
490 if has_variable_child {
491 if self.version < LanceFileVersion::V2_2 {
492 return Err(Error::NotSupported {
493 source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
494 location: location!(),
495 });
496 }
497 Ok(Box::new(PackedStructVariablePerValueEncoder::new(
498 self.clone(),
499 field.children.clone(),
500 )))
501 } else {
502 Err(Error::invalid_input(
503 "Packed struct per-value compression should not be used for fixed-width-only structs",
504 location!(),
505 ))
506 }
507 }
508 DataBlock::VariableWidth(variable_width) => {
509 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
510 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
511
512 let per_value_requested =
517 if let Some(compression) = field_params.compression.as_deref() {
518 compression != "none" && compression != "fsst"
519 } else {
520 false
521 };
522
523 if (max_len > 32 * 1024 || per_value_requested)
524 && data_size >= FSST_LEAST_INPUT_SIZE as u64
525 {
526 return Ok(Box::new(CompressedBufferEncoder::default()));
527 }
528
529 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
530 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
531 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
532
533 let variable_compression = Box::new(VariableEncoder::default());
534
535 if field_params.compression.as_deref() == Some("fsst")
537 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
538 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
539 {
540 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
541 } else {
542 Ok(variable_compression)
543 }
544 } else {
545 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
546 }
547 }
548 _ => unreachable!(
549 "Per-value compression not yet supported for block type: {}",
550 data.name()
551 ),
552 }
553 }
554
555 fn create_block_compressor(
556 &self,
557 field: &Field,
558 data: &DataBlock,
559 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
560 let field_params = self.get_merged_field_params(field);
561
562 match data {
563 DataBlock::FixedWidth(fixed_width) => {
564 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
565 return Ok((compressor, encoding));
566 }
567
568 if let Some((compressor, config)) =
570 try_general_compression(self.version, &field_params, data)?
571 {
572 let encoding = ProtobufUtils21::wrapped(
573 config,
574 ProtobufUtils21::flat(fixed_width.bits_per_value, None),
575 )?;
576 return Ok((compressor, encoding));
577 }
578
579 let encoder = Box::new(ValueEncoder::default());
580 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
581 Ok((encoder, encoding))
582 }
583 DataBlock::VariableWidth(variable_width) => {
584 if let Some((compressor, config)) =
586 try_general_compression(self.version, &field_params, data)?
587 {
588 let encoding = ProtobufUtils21::wrapped(
589 config,
590 ProtobufUtils21::variable(
591 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
592 None,
593 ),
594 )?;
595 return Ok((compressor, encoding));
596 }
597
598 let encoder = Box::new(VariableEncoder::default());
599 let encoding = ProtobufUtils21::variable(
600 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
601 None,
602 );
603 Ok((encoder, encoding))
604 }
605 _ => unreachable!(),
606 }
607 }
608}
609
610pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
611 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
612}
613
614pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
615 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
617 fn bits_per_value(&self) -> u64;
621}
622
623pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
624 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
626}
627
628pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
629 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
630}
631
632pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
633 fn create_miniblock_decompressor(
634 &self,
635 description: &CompressiveEncoding,
636 decompression_strategy: &dyn DecompressionStrategy,
637 ) -> Result<Box<dyn MiniBlockDecompressor>>;
638
639 fn create_fixed_per_value_decompressor(
640 &self,
641 description: &CompressiveEncoding,
642 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
643
644 fn create_variable_per_value_decompressor(
645 &self,
646 description: &CompressiveEncoding,
647 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
648
649 fn create_block_decompressor(
650 &self,
651 description: &CompressiveEncoding,
652 ) -> Result<Box<dyn BlockDecompressor>>;
653}
654
655#[derive(Debug, Default)]
656pub struct DefaultDecompressionStrategy {}
657
658impl DecompressionStrategy for DefaultDecompressionStrategy {
659 fn create_miniblock_decompressor(
660 &self,
661 description: &CompressiveEncoding,
662 decompression_strategy: &dyn DecompressionStrategy,
663 ) -> Result<Box<dyn MiniBlockDecompressor>> {
664 match description.compression.as_ref().unwrap() {
665 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
666 #[cfg(feature = "bitpacking")]
667 Compression::InlineBitpacking(description) => {
668 Ok(Box::new(InlineBitpacking::from_description(description)))
669 }
670 #[cfg(not(feature = "bitpacking"))]
671 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
672 source: "this runtime was not built with bitpacking support".into(),
673 location: location!(),
674 }),
675 Compression::Variable(variable) => {
676 let Compression::Flat(offsets) = variable
677 .offsets
678 .as_ref()
679 .unwrap()
680 .compression
681 .as_ref()
682 .unwrap()
683 else {
684 panic!("Variable compression only supports flat offsets")
685 };
686 Ok(Box::new(BinaryMiniBlockDecompressor::new(
687 offsets.bits_per_value as u8,
688 )))
689 }
690 Compression::Fsst(description) => {
691 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
692 description.values.as_ref().unwrap(),
693 decompression_strategy,
694 )?;
695 Ok(Box::new(FsstMiniBlockDecompressor::new(
696 description,
697 inner_decompressor,
698 )))
699 }
700 Compression::PackedStruct(description) => Ok(Box::new(
701 PackedStructFixedWidthMiniBlockDecompressor::new(description),
702 )),
703 Compression::VariablePackedStruct(_) => Err(Error::NotSupported {
704 source: "variable packed struct decoding is not yet implemented".into(),
705 location: location!(),
706 }),
707 Compression::FixedSizeList(fsl) => {
708 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
711 }
712 Compression::Rle(rle) => {
713 let Compression::Flat(values) =
714 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
715 else {
716 panic!("RLE compression only supports flat values")
717 };
718 let Compression::Flat(run_lengths) = rle
719 .run_lengths
720 .as_ref()
721 .unwrap()
722 .compression
723 .as_ref()
724 .unwrap()
725 else {
726 panic!("RLE compression only supports flat run lengths")
727 };
728 assert_eq!(
729 run_lengths.bits_per_value, 8,
730 "RLE compression only supports 8-bit run lengths"
731 );
732 Ok(Box::new(RleMiniBlockDecompressor::new(
733 values.bits_per_value,
734 )))
735 }
736 Compression::ByteStreamSplit(bss) => {
737 let Compression::Flat(values) =
738 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
739 else {
740 panic!("ByteStreamSplit compression only supports flat values")
741 };
742 Ok(Box::new(ByteStreamSplitDecompressor::new(
743 values.bits_per_value as usize,
744 )))
745 }
746 Compression::General(general) => {
747 let inner_decompressor = self.create_miniblock_decompressor(
749 general.values.as_ref().ok_or_else(|| {
750 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
751 })?,
752 decompression_strategy,
753 )?;
754
755 let compression = general.compression.as_ref().ok_or_else(|| {
757 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
758 })?;
759
760 let scheme = compression.scheme().try_into()?;
761
762 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
763 scheme,
764 compression.level,
765 );
766
767 Ok(Box::new(GeneralMiniBlockDecompressor::new(
768 inner_decompressor,
769 compression_config,
770 )))
771 }
772 _ => todo!(),
773 }
774 }
775
776 fn create_fixed_per_value_decompressor(
777 &self,
778 description: &CompressiveEncoding,
779 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
780 match description.compression.as_ref().unwrap() {
781 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
782 constant
783 .value
784 .as_ref()
785 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
786 ))),
787 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
788 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
789 _ => todo!("fixed-per-value decompressor for {:?}", description),
790 }
791 }
792
793 fn create_variable_per_value_decompressor(
794 &self,
795 description: &CompressiveEncoding,
796 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
797 match description.compression.as_ref().unwrap() {
798 Compression::Variable(variable) => {
799 let Compression::Flat(offsets) = variable
800 .offsets
801 .as_ref()
802 .unwrap()
803 .compression
804 .as_ref()
805 .unwrap()
806 else {
807 panic!("Variable compression only supports flat offsets")
808 };
809 assert!(offsets.bits_per_value < u8::MAX as u64);
810 Ok(Box::new(VariableDecoder::default()))
811 }
812 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
813 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
814 Box::new(VariableDecoder::default()),
815 ))),
816 Compression::General(ref general) => {
817 Ok(Box::new(CompressedBufferEncoder::from_scheme(
818 general.compression.as_ref().expect_ok()?.scheme(),
819 )?))
820 }
821 Compression::VariablePackedStruct(description) => {
822 let mut fields = Vec::with_capacity(description.fields.len());
823 for field in &description.fields {
824 let value_encoding = field.value.as_ref().ok_or_else(|| {
825 Error::invalid_input(
826 "VariablePackedStruct field is missing value encoding",
827 location!(),
828 )
829 })?;
830 let decoder = match field.layout.as_ref().ok_or_else(|| {
831 Error::invalid_input(
832 "VariablePackedStruct field is missing layout details",
833 location!(),
834 )
835 })? {
836 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
837 bits_per_value,
838 ) => {
839 let decompressor =
840 self.create_fixed_per_value_decompressor(value_encoding)?;
841 VariablePackedStructFieldDecoder {
842 kind: VariablePackedStructFieldKind::Fixed {
843 bits_per_value: *bits_per_value,
844 decompressor: Arc::from(decompressor),
845 },
846 }
847 }
848 crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
849 bits_per_length,
850 ) => {
851 let decompressor =
852 self.create_variable_per_value_decompressor(value_encoding)?;
853 VariablePackedStructFieldDecoder {
854 kind: VariablePackedStructFieldKind::Variable {
855 bits_per_length: *bits_per_length,
856 decompressor: Arc::from(decompressor),
857 },
858 }
859 }
860 };
861 fields.push(decoder);
862 }
863 Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
864 fields,
865 )))
866 }
867 _ => todo!("variable-per-value decompressor for {:?}", description),
868 }
869 }
870
871 fn create_block_decompressor(
872 &self,
873 description: &CompressiveEncoding,
874 ) -> Result<Box<dyn BlockDecompressor>> {
875 match description.compression.as_ref().unwrap() {
876 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
877 InlineBitpacking::from_description(inline_bitpacking),
878 )),
879 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
880 Compression::Constant(constant) => {
881 let scalar = constant
882 .value
883 .as_ref()
884 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
885 Ok(Box::new(ConstantDecompressor::new(scalar)))
886 }
887 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
888 Compression::FixedSizeList(fsl) => {
889 Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
890 }
891 Compression::OutOfLineBitpacking(out_of_line) => {
892 let compressed_bit_width = match out_of_line
894 .values
895 .as_ref()
896 .unwrap()
897 .compression
898 .as_ref()
899 .unwrap()
900 {
901 Compression::Flat(flat) => flat.bits_per_value,
902 _ => {
903 return Err(Error::InvalidInput {
904 location: location!(),
905 source: "OutOfLineBitpacking values must use Flat encoding".into(),
906 })
907 }
908 };
909 Ok(Box::new(OutOfLineBitpacking::new(
910 compressed_bit_width,
911 out_of_line.uncompressed_bits_per_value,
912 )))
913 }
914 Compression::General(general) => {
915 let inner_desc = general
916 .values
917 .as_ref()
918 .ok_or_else(|| {
919 Error::invalid_input(
920 "General compression missing inner encoding",
921 location!(),
922 )
923 })?
924 .as_ref();
925 let inner_decompressor = self.create_block_decompressor(inner_desc)?;
926
927 let compression = general.compression.as_ref().ok_or_else(|| {
928 Error::invalid_input(
929 "General compression missing compression config",
930 location!(),
931 )
932 })?;
933 let scheme = compression.scheme().try_into()?;
934 let config = CompressionConfig::new(scheme, compression.level);
935 let general_decompressor =
936 GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
937
938 Ok(Box::new(general_decompressor))
939 }
940 _ => todo!(),
941 }
942 }
943}
944
945#[cfg(test)]
946mod tests {
947 use super::*;
948 use crate::buffer::LanceBuffer;
949 use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
950 use arrow_schema::{DataType, Field as ArrowField};
951 use std::collections::HashMap;
952
953 fn create_test_field(name: &str, data_type: DataType) -> Field {
954 let arrow_field = ArrowField::new(name, data_type, true);
955 let mut field = Field::try_from(&arrow_field).unwrap();
956 field.id = -1;
957 field
958 }
959
960 fn create_fixed_width_block_with_stats(
961 bits_per_value: u64,
962 num_values: u64,
963 run_count: u64,
964 ) -> DataBlock {
965 let bytes_per_value = (bits_per_value / 8) as usize;
967 let total_bytes = bytes_per_value * num_values as usize;
968 let mut data = vec![0u8; total_bytes];
969
970 let values_per_run = (num_values / run_count).max(1);
972 let mut run_value = 0u8;
973
974 for i in 0..num_values as usize {
975 if i % values_per_run as usize == 0 {
976 run_value = run_value.wrapping_add(17); }
978 for j in 0..bytes_per_value {
980 let byte_offset = i * bytes_per_value + j;
981 if byte_offset < data.len() {
982 data[byte_offset] = run_value.wrapping_add(j as u8);
983 }
984 }
985 }
986
987 let mut block = FixedWidthDataBlock {
988 bits_per_value,
989 data: LanceBuffer::reinterpret_vec(data),
990 num_values,
991 block_info: BlockInfo::default(),
992 };
993
994 use crate::statistics::ComputeStat;
996 block.compute_stat();
997
998 DataBlock::FixedWidth(block)
999 }
1000
1001 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
1002 let bytes_per_value = (bits_per_value / 8) as usize;
1004 let total_bytes = bytes_per_value * num_values as usize;
1005 let mut data = vec![0u8; total_bytes];
1006
1007 for i in 0..num_values as usize {
1009 let byte_offset = i * bytes_per_value;
1010 if byte_offset < data.len() {
1011 data[byte_offset] = (i % 256) as u8;
1012 }
1013 }
1014
1015 let mut block = FixedWidthDataBlock {
1016 bits_per_value,
1017 data: LanceBuffer::reinterpret_vec(data),
1018 num_values,
1019 block_info: BlockInfo::default(),
1020 };
1021
1022 use crate::statistics::ComputeStat;
1024 block.compute_stat();
1025
1026 DataBlock::FixedWidth(block)
1027 }
1028
1029 #[test]
1030 fn test_parameter_based_compression() {
1031 let mut params = CompressionParams::new();
1032
1033 params.columns.insert(
1035 "*_id".to_string(),
1036 CompressionFieldParams {
1037 rle_threshold: Some(0.3),
1038 compression: Some("lz4".to_string()),
1039 compression_level: None,
1040 bss: Some(BssMode::Off), },
1042 );
1043
1044 let strategy = DefaultCompressionStrategy::with_params(params);
1045 let field = create_test_field("user_id", DataType::Int32);
1046
1047 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1052 let debug_str = format!("{:?}", compressor);
1054
1055 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1057 assert!(debug_str.contains("RleMiniBlockEncoder"));
1058 }
1059
1060 #[test]
1061 fn test_type_level_parameters() {
1062 let mut params = CompressionParams::new();
1063
1064 params.types.insert(
1066 "Int32".to_string(),
1067 CompressionFieldParams {
1068 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
1070 compression_level: Some(3),
1071 bss: Some(BssMode::Off), },
1073 );
1074
1075 let strategy = DefaultCompressionStrategy::with_params(params);
1076 let field = create_test_field("some_column", DataType::Int32);
1077 let data = create_fixed_width_block_with_stats(32, 1000, 50);
1079
1080 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1081 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
1083 }
1084
1085 #[test]
1086 fn test_none_compression() {
1087 let mut params = CompressionParams::new();
1088
1089 params.columns.insert(
1091 "embeddings".to_string(),
1092 CompressionFieldParams {
1093 compression: Some("none".to_string()),
1094 ..Default::default()
1095 },
1096 );
1097
1098 let strategy = DefaultCompressionStrategy::with_params(params);
1099 let field = create_test_field("embeddings", DataType::Float32);
1100 let data = create_fixed_width_block(32, 1000);
1101
1102 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1103 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1105 }
1106
1107 #[test]
1108 fn test_parameter_merge_priority() {
1109 let mut params = CompressionParams::new();
1110
1111 params.types.insert(
1113 "Int32".to_string(),
1114 CompressionFieldParams {
1115 rle_threshold: Some(0.5),
1116 compression: Some("lz4".to_string()),
1117 ..Default::default()
1118 },
1119 );
1120
1121 params.columns.insert(
1123 "user_id".to_string(),
1124 CompressionFieldParams {
1125 rle_threshold: Some(0.2),
1126 compression: Some("zstd".to_string()),
1127 compression_level: Some(6),
1128 bss: None,
1129 },
1130 );
1131
1132 let strategy = DefaultCompressionStrategy::with_params(params);
1133
1134 let merged = strategy
1136 .params
1137 .get_field_params("user_id", &DataType::Int32);
1138
1139 assert_eq!(merged.rle_threshold, Some(0.2));
1141 assert_eq!(merged.compression, Some("zstd".to_string()));
1142 assert_eq!(merged.compression_level, Some(6));
1143
1144 let merged = strategy
1146 .params
1147 .get_field_params("other_field", &DataType::Int32);
1148 assert_eq!(merged.rle_threshold, Some(0.5));
1149 assert_eq!(merged.compression, Some("lz4".to_string()));
1150 assert_eq!(merged.compression_level, None);
1151 }
1152
1153 #[test]
1154 fn test_pattern_matching() {
1155 let mut params = CompressionParams::new();
1156
1157 params.columns.insert(
1159 "log_*".to_string(),
1160 CompressionFieldParams {
1161 compression: Some("zstd".to_string()),
1162 compression_level: Some(6),
1163 ..Default::default()
1164 },
1165 );
1166
1167 let strategy = DefaultCompressionStrategy::with_params(params);
1168
1169 let merged = strategy
1171 .params
1172 .get_field_params("log_messages", &DataType::Utf8);
1173 assert_eq!(merged.compression, Some("zstd".to_string()));
1174 assert_eq!(merged.compression_level, Some(6));
1175
1176 let merged = strategy
1178 .params
1179 .get_field_params("messages_log", &DataType::Utf8);
1180 assert_eq!(merged.compression, None);
1181 }
1182
1183 #[test]
1184 fn test_legacy_metadata_support() {
1185 let params = CompressionParams::new();
1186 let strategy = DefaultCompressionStrategy::with_params(params);
1187
1188 let mut metadata = HashMap::new();
1190 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1191 let mut field = create_test_field("some_column", DataType::Int32);
1192 field.metadata = metadata;
1193
1194 let data = create_fixed_width_block(32, 1000);
1195 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1196
1197 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1199 }
1200
1201 #[test]
1202 fn test_default_behavior() {
1203 let params = CompressionParams::new();
1205 let strategy = DefaultCompressionStrategy::with_params(params);
1206
1207 let field = create_test_field("random_column", DataType::Int32);
1208 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1210
1211 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1212 let debug_str = format!("{:?}", compressor);
1214 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1215 }
1216
1217 #[test]
1218 fn test_field_metadata_compression() {
1219 let params = CompressionParams::new();
1220 let strategy = DefaultCompressionStrategy::with_params(params);
1221
1222 let mut metadata = HashMap::new();
1224 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1225 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1226 let mut field = create_test_field("test_column", DataType::Int32);
1227 field.metadata = metadata;
1228
1229 let data = create_fixed_width_block(32, 1000);
1230 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1231
1232 let debug_str = format!("{:?}", compressor);
1234 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1235 }
1236
1237 #[test]
1238 fn test_field_metadata_rle_threshold() {
1239 let params = CompressionParams::new();
1240 let strategy = DefaultCompressionStrategy::with_params(params);
1241
1242 let mut metadata = HashMap::new();
1244 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1245 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1247 field.metadata = metadata;
1248
1249 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1252
1253 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1254
1255 let debug_str = format!("{:?}", compressor);
1257 assert!(debug_str.contains("RleMiniBlockEncoder"));
1258 }
1259
1260 #[test]
1261 fn test_field_metadata_override_params() {
1262 let mut params = CompressionParams::new();
1264 params.columns.insert(
1265 "test_column".to_string(),
1266 CompressionFieldParams {
1267 rle_threshold: Some(0.3),
1268 compression: Some("lz4".to_string()),
1269 compression_level: None,
1270 bss: None,
1271 },
1272 );
1273
1274 let strategy = DefaultCompressionStrategy::with_params(params);
1275
1276 let mut metadata = HashMap::new();
1278 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1279 let mut field = create_test_field("test_column", DataType::Int32);
1280 field.metadata = metadata;
1281
1282 let data = create_fixed_width_block(32, 1000);
1283 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1284
1285 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1287 }
1288
1289 #[test]
1290 fn test_field_metadata_mixed_configuration() {
1291 let mut params = CompressionParams::new();
1293 params.types.insert(
1294 "Int32".to_string(),
1295 CompressionFieldParams {
1296 rle_threshold: Some(0.5),
1297 compression: Some("lz4".to_string()),
1298 ..Default::default()
1299 },
1300 );
1301
1302 let strategy = DefaultCompressionStrategy::with_params(params);
1303
1304 let mut metadata = HashMap::new();
1306 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1307 let mut field = create_test_field("test_column", DataType::Int32);
1308 field.metadata = metadata;
1309
1310 let data = create_fixed_width_block(32, 1000);
1311 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1312
1313 let debug_str = format!("{:?}", compressor);
1315 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1316 }
1317
1318 #[test]
1319 fn test_bss_field_metadata() {
1320 let params = CompressionParams::new();
1321 let strategy = DefaultCompressionStrategy::with_params(params);
1322
1323 let mut metadata = HashMap::new();
1325 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1326 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1327 let arrow_field =
1328 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1329 let field = Field::try_from(&arrow_field).unwrap();
1330
1331 let data = create_fixed_width_block(32, 100);
1333
1334 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1335 let debug_str = format!("{:?}", compressor);
1336 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1337 }
1338
1339 #[test]
1340 fn test_bss_with_compression() {
1341 let params = CompressionParams::new();
1342 let strategy = DefaultCompressionStrategy::with_params(params);
1343
1344 let mut metadata = HashMap::new();
1346 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1347 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1348 let arrow_field =
1349 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1350 let field = Field::try_from(&arrow_field).unwrap();
1351
1352 let data = create_fixed_width_block(64, 100);
1354
1355 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1356 let debug_str = format!("{:?}", compressor);
1357 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1359 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1360 }
1361
1362 #[test]
1363 #[cfg(any(feature = "lz4", feature = "zstd"))]
1364 fn test_general_block_decompression_fixed_width_v2_2() {
1365 let mut params = CompressionParams::new();
1367 params.columns.insert(
1368 "dict_values".to_string(),
1369 CompressionFieldParams {
1370 compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
1371 ..Default::default()
1372 },
1373 );
1374
1375 let mut strategy = DefaultCompressionStrategy::with_params(params);
1376 strategy.version = LanceFileVersion::V2_2;
1377
1378 let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
1379 let data = create_fixed_width_block(24, 1024);
1380 let DataBlock::FixedWidth(expected_block) = &data else {
1381 panic!("expected fixed width block");
1382 };
1383 let expected_bits = expected_block.bits_per_value;
1384 let expected_num_values = expected_block.num_values;
1385 let num_values = expected_num_values;
1386
1387 let (compressor, encoding) = strategy
1388 .create_block_compressor(&field, &data)
1389 .expect("general compression should be selected");
1390 match encoding.compression.as_ref() {
1391 Some(Compression::General(_)) => {}
1392 other => panic!("expected general compression, got {:?}", other),
1393 }
1394
1395 let compressed_buffer = compressor
1396 .compress(data.clone())
1397 .expect("write path general compression should succeed");
1398
1399 let decompressor = DefaultDecompressionStrategy::default()
1400 .create_block_decompressor(&encoding)
1401 .expect("general block decompressor should be created");
1402
1403 let decoded = decompressor
1404 .decompress(compressed_buffer, num_values)
1405 .expect("decompression should succeed");
1406
1407 match decoded {
1408 DataBlock::FixedWidth(block) => {
1409 assert_eq!(block.bits_per_value, expected_bits);
1410 assert_eq!(block.num_values, expected_num_values);
1411 assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
1412 }
1413 _ => panic!("expected fixed width block"),
1414 }
1415 }
1416}