1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::InlineBitpacking;
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36 byte_stream_split::{
37 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38 },
39 constant::ConstantDecompressor,
40 fsst::{
41 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42 FsstPerValueEncoder,
43 },
44 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45 packed::{
46 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47 },
48 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49 value::{ValueDecompressor, ValueEncoder},
50 },
51 },
52 format::{
53 pb21::{compressive_encoding::Compression, CompressiveEncoding},
54 ProtobufUtils21,
55 },
56 statistics::{GetStat, Stat},
57};
58
59use arrow_array::types::UInt64Type;
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102 fn create_block_compressor(
104 &self,
105 field: &Field,
106 data: &DataBlock,
107 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109 fn create_per_value(
111 &self,
112 field: &Field,
113 data: &DataBlock,
114 ) -> Result<Box<dyn PerValueCompressor>>;
115
116 fn create_miniblock_compressor(
118 &self,
119 field: &Field,
120 data: &DataBlock,
121 ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126 params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131 data: &FixedWidthDataBlock,
132 params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137 return None;
138 }
139
140 let mode = params.bss.unwrap_or(BssMode::Auto);
141 if should_use_bss(data, mode) {
143 return Some(Box::new(ByteStreamSplitEncoder::new(
144 data.bits_per_value as usize,
145 )));
146 }
147 None
148}
149
150fn try_rle_for_mini_block(
151 data: &FixedWidthDataBlock,
152 params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154 let bits = data.bits_per_value;
155 if !matches!(bits, 8 | 16 | 32 | 64) {
156 return None;
157 }
158
159 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160 let threshold = params
161 .rle_threshold
162 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164 if (run_count as f64) < (data.num_values as f64) * threshold {
165 return Some(Box::new(RleMiniBlockEncoder::new()));
166 }
167 None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171 #[cfg(feature = "bitpacking")]
172 {
173 use arrow_array::cast::AsArray;
174
175 let bits = _data.bits_per_value;
176 if !matches!(bits, 8 | 16 | 32 | 64) {
177 return None;
178 }
179
180 let bit_widths = _data.expect_stat(Stat::BitWidth);
181 let widths = bit_widths.as_primitive::<UInt64Type>();
182 let too_small = widths.len() == 1
183 && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
184
185 if !too_small {
186 return Some(Box::new(InlineBitpacking::new(bits)));
187 }
188 None
189 }
190 #[cfg(not(feature = "bitpacking"))]
191 {
192 None
193 }
194}
195
196fn maybe_wrap_general_for_mini_block(
197 inner: Box<dyn MiniBlockCompressor>,
198 params: &CompressionFieldParams,
199) -> Result<Box<dyn MiniBlockCompressor>> {
200 match params.compression.as_deref() {
201 None | Some("none") | Some("fsst") => Ok(inner),
202 Some(raw) => {
203 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
204 lance_core::Error::invalid_input(
205 format!("Unknown compression scheme: {raw}"),
206 location!(),
207 )
208 })?;
209 let cfg = CompressionConfig::new(scheme, params.compression_level);
210 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
211 }
212 }
213}
214
215impl DefaultCompressionStrategy {
216 pub fn new() -> Self {
218 Self::default()
219 }
220
221 pub fn with_params(params: CompressionParams) -> Self {
223 Self { params }
224 }
225
226 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
228 let mut params = CompressionFieldParams::default();
229
230 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
232 params.compression = Some(compression.clone());
233 }
234
235 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
237 params.compression_level = level.parse().ok();
238 }
239
240 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
242 params.rle_threshold = threshold.parse().ok();
243 }
244
245 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
247 match BssMode::parse(bss_str) {
248 Some(mode) => params.bss = Some(mode),
249 None => {
250 log::warn!("Invalid BSS mode '{}', using default", bss_str);
251 }
252 }
253 }
254
255 params
256 }
257
258 fn build_fixed_width_compressor(
259 &self,
260 params: &CompressionFieldParams,
261 data: &FixedWidthDataBlock,
262 ) -> Result<Box<dyn MiniBlockCompressor>> {
263 if params.compression.as_deref() == Some("none") {
264 return Ok(Box::new(ValueEncoder::default()));
265 }
266
267 let base = try_bss_for_mini_block(data, params)
268 .or_else(|| try_rle_for_mini_block(data, params))
269 .or_else(|| try_bitpack_for_mini_block(data))
270 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
271
272 maybe_wrap_general_for_mini_block(base, params)
273 }
274
275 fn build_variable_width_compressor(
277 &self,
278 params: &CompressionFieldParams,
279 data: &VariableWidthBlock,
280 ) -> Result<Box<dyn MiniBlockCompressor>> {
281 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
282 return Err(Error::invalid_input(
283 format!(
284 "Variable width compression not supported for {} bit offsets",
285 data.bits_per_offset
286 ),
287 location!(),
288 ));
289 }
290
291 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
293 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
294
295 if params.compression.as_deref() == Some("none") {
297 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
298 }
299
300 if params.compression.as_deref() == Some("fsst") {
302 return Ok(Box::new(FsstMiniBlockEncoder::default()));
303 }
304
305 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
307 >= FSST_LEAST_INPUT_MAX_LENGTH
308 && data_size >= FSST_LEAST_INPUT_SIZE as u64
309 {
310 Box::new(FsstMiniBlockEncoder::default())
311 } else {
312 Box::new(BinaryMiniBlockEncoder::default())
313 };
314
315 if let Some(compression_scheme) = ¶ms.compression {
317 if compression_scheme != "none" && compression_scheme != "fsst" {
318 let scheme: CompressionScheme = compression_scheme.parse()?;
319 let config = CompressionConfig::new(scheme, params.compression_level);
320 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
321 }
322 }
323
324 Ok(base_encoder)
325 }
326}
327
328impl CompressionStrategy for DefaultCompressionStrategy {
329 fn create_miniblock_compressor(
330 &self,
331 field: &Field,
332 data: &DataBlock,
333 ) -> Result<Box<dyn MiniBlockCompressor>> {
334 let mut field_params = self
335 .params
336 .get_field_params(&field.name, &field.data_type());
337
338 let metadata_params = Self::parse_field_metadata(field);
340 field_params.merge(&metadata_params);
341
342 match data {
343 DataBlock::FixedWidth(fixed_width_data) => {
344 self.build_fixed_width_compressor(&field_params, fixed_width_data)
345 }
346 DataBlock::VariableWidth(variable_width_data) => {
347 self.build_variable_width_compressor(&field_params, variable_width_data)
348 }
349 DataBlock::Struct(struct_data_block) => {
350 if struct_data_block
353 .children
354 .iter()
355 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
356 {
357 panic!("packed struct encoding currently only supports fixed-width fields.")
358 }
359 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
360 }
361 DataBlock::FixedSizeList(_) => {
362 Ok(Box::new(ValueEncoder::default()))
370 }
371 _ => Err(Error::NotSupported {
372 source: format!(
373 "Mini-block compression not yet supported for block type {}",
374 data.name()
375 )
376 .into(),
377 location: location!(),
378 }),
379 }
380 }
381
382 fn create_per_value(
383 &self,
384 field: &Field,
385 data: &DataBlock,
386 ) -> Result<Box<dyn PerValueCompressor>> {
387 let field_params = Self::parse_field_metadata(field);
388
389 match data {
390 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
391 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
392 DataBlock::VariableWidth(variable_width) => {
393 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
394 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
395
396 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
400 return Ok(Box::new(CompressedBufferEncoder::default()));
401 }
402
403 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
404 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
405 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
406
407 let variable_compression = Box::new(VariableEncoder::default());
408
409 if field_params.compression.as_deref() == Some("fsst")
411 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
412 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
413 {
414 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
415 } else {
416 Ok(variable_compression)
417 }
418 } else {
419 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
420 }
421 }
422 _ => unreachable!(
423 "Per-value compression not yet supported for block type: {}",
424 data.name()
425 ),
426 }
427 }
428
429 fn create_block_compressor(
430 &self,
431 _field: &Field,
432 data: &DataBlock,
433 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
434 match data {
436 DataBlock::FixedWidth(fixed_width) => {
439 let encoder = Box::new(ValueEncoder::default());
440 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
441 Ok((encoder, encoding))
442 }
443 DataBlock::VariableWidth(variable_width) => {
444 let encoder = Box::new(VariableEncoder::default());
445 let encoding = ProtobufUtils21::variable(
446 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
447 None,
448 );
449 Ok((encoder, encoding))
450 }
451 _ => unreachable!(),
452 }
453 }
454}
455
456pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
457 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
458}
459
460pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
461 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
463 fn bits_per_value(&self) -> u64;
467}
468
469pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
470 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
472}
473
474pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
475 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
476}
477
478pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
479 fn create_miniblock_decompressor(
480 &self,
481 description: &CompressiveEncoding,
482 decompression_strategy: &dyn DecompressionStrategy,
483 ) -> Result<Box<dyn MiniBlockDecompressor>>;
484
485 fn create_fixed_per_value_decompressor(
486 &self,
487 description: &CompressiveEncoding,
488 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
489
490 fn create_variable_per_value_decompressor(
491 &self,
492 description: &CompressiveEncoding,
493 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
494
495 fn create_block_decompressor(
496 &self,
497 description: &CompressiveEncoding,
498 ) -> Result<Box<dyn BlockDecompressor>>;
499}
500
501#[derive(Debug, Default)]
502pub struct DefaultDecompressionStrategy {}
503
504impl DecompressionStrategy for DefaultDecompressionStrategy {
505 fn create_miniblock_decompressor(
506 &self,
507 description: &CompressiveEncoding,
508 decompression_strategy: &dyn DecompressionStrategy,
509 ) -> Result<Box<dyn MiniBlockDecompressor>> {
510 match description.compression.as_ref().unwrap() {
511 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
512 #[cfg(feature = "bitpacking")]
513 Compression::InlineBitpacking(description) => {
514 Ok(Box::new(InlineBitpacking::from_description(description)))
515 }
516 #[cfg(not(feature = "bitpacking"))]
517 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
518 source: "this runtime was not built with bitpacking support".into(),
519 location: location!(),
520 }),
521 Compression::Variable(variable) => {
522 let Compression::Flat(offsets) = variable
523 .offsets
524 .as_ref()
525 .unwrap()
526 .compression
527 .as_ref()
528 .unwrap()
529 else {
530 panic!("Variable compression only supports flat offsets")
531 };
532 Ok(Box::new(BinaryMiniBlockDecompressor::new(
533 offsets.bits_per_value as u8,
534 )))
535 }
536 Compression::Fsst(description) => {
537 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
538 description.values.as_ref().unwrap(),
539 decompression_strategy,
540 )?;
541 Ok(Box::new(FsstMiniBlockDecompressor::new(
542 description,
543 inner_decompressor,
544 )))
545 }
546 Compression::PackedStruct(description) => Ok(Box::new(
547 PackedStructFixedWidthMiniBlockDecompressor::new(description),
548 )),
549 Compression::FixedSizeList(fsl) => {
550 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
553 }
554 Compression::Rle(rle) => {
555 let Compression::Flat(values) =
556 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
557 else {
558 panic!("RLE compression only supports flat values")
559 };
560 let Compression::Flat(run_lengths) = rle
561 .run_lengths
562 .as_ref()
563 .unwrap()
564 .compression
565 .as_ref()
566 .unwrap()
567 else {
568 panic!("RLE compression only supports flat run lengths")
569 };
570 assert_eq!(
571 run_lengths.bits_per_value, 8,
572 "RLE compression only supports 8-bit run lengths"
573 );
574 Ok(Box::new(RleMiniBlockDecompressor::new(
575 values.bits_per_value,
576 )))
577 }
578 Compression::ByteStreamSplit(bss) => {
579 let Compression::Flat(values) =
580 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
581 else {
582 panic!("ByteStreamSplit compression only supports flat values")
583 };
584 Ok(Box::new(ByteStreamSplitDecompressor::new(
585 values.bits_per_value as usize,
586 )))
587 }
588 Compression::General(general) => {
589 let inner_decompressor = self.create_miniblock_decompressor(
591 general.values.as_ref().ok_or_else(|| {
592 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
593 })?,
594 decompression_strategy,
595 )?;
596
597 let compression = general.compression.as_ref().ok_or_else(|| {
599 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
600 })?;
601
602 let scheme = compression.scheme().try_into()?;
603
604 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
605 scheme,
606 compression.level,
607 );
608
609 Ok(Box::new(GeneralMiniBlockDecompressor::new(
610 inner_decompressor,
611 compression_config,
612 )))
613 }
614 _ => todo!(),
615 }
616 }
617
618 fn create_fixed_per_value_decompressor(
619 &self,
620 description: &CompressiveEncoding,
621 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
622 match description.compression.as_ref().unwrap() {
623 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
624 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
625 _ => todo!("fixed-per-value decompressor for {:?}", description),
626 }
627 }
628
629 fn create_variable_per_value_decompressor(
630 &self,
631 description: &CompressiveEncoding,
632 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
633 match description.compression.as_ref().unwrap() {
634 Compression::Variable(variable) => {
635 let Compression::Flat(offsets) = variable
636 .offsets
637 .as_ref()
638 .unwrap()
639 .compression
640 .as_ref()
641 .unwrap()
642 else {
643 panic!("Variable compression only supports flat offsets")
644 };
645 assert!(offsets.bits_per_value < u8::MAX as u64);
646 Ok(Box::new(VariableDecoder::default()))
647 }
648 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
649 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
650 Box::new(VariableDecoder::default()),
651 ))),
652 Compression::General(ref general) => {
653 Ok(Box::new(CompressedBufferEncoder::from_scheme(
654 general.compression.as_ref().expect_ok()?.scheme(),
655 )?))
656 }
657 _ => todo!("variable-per-value decompressor for {:?}", description),
658 }
659 }
660
661 fn create_block_decompressor(
662 &self,
663 description: &CompressiveEncoding,
664 ) -> Result<Box<dyn BlockDecompressor>> {
665 match description.compression.as_ref().unwrap() {
666 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
667 Compression::Constant(constant) => {
668 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
669 Ok(Box::new(ConstantDecompressor::new(scalar)))
670 }
671 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
672 _ => todo!(),
673 }
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680 use crate::buffer::LanceBuffer;
681 use crate::data::{BlockInfo, DataBlock};
682 use arrow_schema::{DataType, Field as ArrowField};
683 use std::collections::HashMap;
684
685 fn create_test_field(name: &str, data_type: DataType) -> Field {
686 let arrow_field = ArrowField::new(name, data_type, true);
687 let mut field = Field::try_from(&arrow_field).unwrap();
688 field.id = -1;
689 field
690 }
691
692 fn create_fixed_width_block_with_stats(
693 bits_per_value: u64,
694 num_values: u64,
695 run_count: u64,
696 ) -> DataBlock {
697 let bytes_per_value = (bits_per_value / 8) as usize;
699 let total_bytes = bytes_per_value * num_values as usize;
700 let mut data = vec![0u8; total_bytes];
701
702 let values_per_run = (num_values / run_count).max(1);
704 let mut run_value = 0u8;
705
706 for i in 0..num_values as usize {
707 if i % values_per_run as usize == 0 {
708 run_value = run_value.wrapping_add(17); }
710 for j in 0..bytes_per_value {
712 let byte_offset = i * bytes_per_value + j;
713 if byte_offset < data.len() {
714 data[byte_offset] = run_value.wrapping_add(j as u8);
715 }
716 }
717 }
718
719 let mut block = FixedWidthDataBlock {
720 bits_per_value,
721 data: LanceBuffer::reinterpret_vec(data),
722 num_values,
723 block_info: BlockInfo::default(),
724 };
725
726 use crate::statistics::ComputeStat;
728 block.compute_stat();
729
730 DataBlock::FixedWidth(block)
731 }
732
733 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
734 let bytes_per_value = (bits_per_value / 8) as usize;
736 let total_bytes = bytes_per_value * num_values as usize;
737 let mut data = vec![0u8; total_bytes];
738
739 for i in 0..num_values as usize {
741 let byte_offset = i * bytes_per_value;
742 if byte_offset < data.len() {
743 data[byte_offset] = (i % 256) as u8;
744 }
745 }
746
747 let mut block = FixedWidthDataBlock {
748 bits_per_value,
749 data: LanceBuffer::reinterpret_vec(data),
750 num_values,
751 block_info: BlockInfo::default(),
752 };
753
754 use crate::statistics::ComputeStat;
756 block.compute_stat();
757
758 DataBlock::FixedWidth(block)
759 }
760
761 #[test]
762 fn test_parameter_based_compression() {
763 let mut params = CompressionParams::new();
764
765 params.columns.insert(
767 "*_id".to_string(),
768 CompressionFieldParams {
769 rle_threshold: Some(0.3),
770 compression: Some("lz4".to_string()),
771 compression_level: None,
772 bss: Some(BssMode::Off), },
774 );
775
776 let strategy = DefaultCompressionStrategy::with_params(params);
777 let field = create_test_field("user_id", DataType::Int32);
778
779 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
784 let debug_str = format!("{:?}", compressor);
786
787 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
789 assert!(debug_str.contains("RleMiniBlockEncoder"));
790 }
791
792 #[test]
793 fn test_type_level_parameters() {
794 let mut params = CompressionParams::new();
795
796 params.types.insert(
798 "Int32".to_string(),
799 CompressionFieldParams {
800 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
802 compression_level: Some(3),
803 bss: Some(BssMode::Off), },
805 );
806
807 let strategy = DefaultCompressionStrategy::with_params(params);
808 let field = create_test_field("some_column", DataType::Int32);
809 let data = create_fixed_width_block_with_stats(32, 1000, 50);
811
812 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
813 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
815 }
816
817 #[test]
818 fn test_none_compression() {
819 let mut params = CompressionParams::new();
820
821 params.columns.insert(
823 "embeddings".to_string(),
824 CompressionFieldParams {
825 compression: Some("none".to_string()),
826 ..Default::default()
827 },
828 );
829
830 let strategy = DefaultCompressionStrategy::with_params(params);
831 let field = create_test_field("embeddings", DataType::Float32);
832 let data = create_fixed_width_block(32, 1000);
833
834 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
835 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
837 }
838
839 #[test]
840 fn test_parameter_merge_priority() {
841 let mut params = CompressionParams::new();
842
843 params.types.insert(
845 "Int32".to_string(),
846 CompressionFieldParams {
847 rle_threshold: Some(0.5),
848 compression: Some("lz4".to_string()),
849 ..Default::default()
850 },
851 );
852
853 params.columns.insert(
855 "user_id".to_string(),
856 CompressionFieldParams {
857 rle_threshold: Some(0.2),
858 compression: Some("zstd".to_string()),
859 compression_level: Some(6),
860 bss: None,
861 },
862 );
863
864 let strategy = DefaultCompressionStrategy::with_params(params);
865
866 let merged = strategy
868 .params
869 .get_field_params("user_id", &DataType::Int32);
870
871 assert_eq!(merged.rle_threshold, Some(0.2));
873 assert_eq!(merged.compression, Some("zstd".to_string()));
874 assert_eq!(merged.compression_level, Some(6));
875
876 let merged = strategy
878 .params
879 .get_field_params("other_field", &DataType::Int32);
880 assert_eq!(merged.rle_threshold, Some(0.5));
881 assert_eq!(merged.compression, Some("lz4".to_string()));
882 assert_eq!(merged.compression_level, None);
883 }
884
885 #[test]
886 fn test_pattern_matching() {
887 let mut params = CompressionParams::new();
888
889 params.columns.insert(
891 "log_*".to_string(),
892 CompressionFieldParams {
893 compression: Some("zstd".to_string()),
894 compression_level: Some(6),
895 ..Default::default()
896 },
897 );
898
899 let strategy = DefaultCompressionStrategy::with_params(params);
900
901 let merged = strategy
903 .params
904 .get_field_params("log_messages", &DataType::Utf8);
905 assert_eq!(merged.compression, Some("zstd".to_string()));
906 assert_eq!(merged.compression_level, Some(6));
907
908 let merged = strategy
910 .params
911 .get_field_params("messages_log", &DataType::Utf8);
912 assert_eq!(merged.compression, None);
913 }
914
915 #[test]
916 fn test_legacy_metadata_support() {
917 let params = CompressionParams::new();
918 let strategy = DefaultCompressionStrategy::with_params(params);
919
920 let mut metadata = HashMap::new();
922 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
923 let mut field = create_test_field("some_column", DataType::Int32);
924 field.metadata = metadata;
925
926 let data = create_fixed_width_block(32, 1000);
927 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
928
929 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
931 }
932
933 #[test]
934 fn test_default_behavior() {
935 let params = CompressionParams::new();
937 let strategy = DefaultCompressionStrategy::with_params(params);
938
939 let field = create_test_field("random_column", DataType::Int32);
940 let data = create_fixed_width_block_with_stats(32, 1000, 600);
942
943 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
944 let debug_str = format!("{:?}", compressor);
946 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
947 }
948
949 #[test]
950 fn test_field_metadata_compression() {
951 let params = CompressionParams::new();
952 let strategy = DefaultCompressionStrategy::with_params(params);
953
954 let mut metadata = HashMap::new();
956 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
957 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
958 let mut field = create_test_field("test_column", DataType::Int32);
959 field.metadata = metadata;
960
961 let data = create_fixed_width_block(32, 1000);
962 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
963
964 let debug_str = format!("{:?}", compressor);
966 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
967 }
968
969 #[test]
970 fn test_field_metadata_rle_threshold() {
971 let params = CompressionParams::new();
972 let strategy = DefaultCompressionStrategy::with_params(params);
973
974 let mut metadata = HashMap::new();
976 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
977 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
979 field.metadata = metadata;
980
981 let data = create_fixed_width_block_with_stats(32, 1000, 100);
984
985 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
986
987 let debug_str = format!("{:?}", compressor);
989 assert!(debug_str.contains("RleMiniBlockEncoder"));
990 }
991
992 #[test]
993 fn test_field_metadata_override_params() {
994 let mut params = CompressionParams::new();
996 params.columns.insert(
997 "test_column".to_string(),
998 CompressionFieldParams {
999 rle_threshold: Some(0.3),
1000 compression: Some("lz4".to_string()),
1001 compression_level: None,
1002 bss: None,
1003 },
1004 );
1005
1006 let strategy = DefaultCompressionStrategy::with_params(params);
1007
1008 let mut metadata = HashMap::new();
1010 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1011 let mut field = create_test_field("test_column", DataType::Int32);
1012 field.metadata = metadata;
1013
1014 let data = create_fixed_width_block(32, 1000);
1015 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1016
1017 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1019 }
1020
1021 #[test]
1022 fn test_field_metadata_mixed_configuration() {
1023 let mut params = CompressionParams::new();
1025 params.types.insert(
1026 "Int32".to_string(),
1027 CompressionFieldParams {
1028 rle_threshold: Some(0.5),
1029 compression: Some("lz4".to_string()),
1030 ..Default::default()
1031 },
1032 );
1033
1034 let strategy = DefaultCompressionStrategy::with_params(params);
1035
1036 let mut metadata = HashMap::new();
1038 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1039 let mut field = create_test_field("test_column", DataType::Int32);
1040 field.metadata = metadata;
1041
1042 let data = create_fixed_width_block(32, 1000);
1043 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1044
1045 let debug_str = format!("{:?}", compressor);
1047 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1048 }
1049
1050 #[test]
1051 fn test_bss_field_metadata() {
1052 let params = CompressionParams::new();
1053 let strategy = DefaultCompressionStrategy::with_params(params);
1054
1055 let mut metadata = HashMap::new();
1057 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1058 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1059 let arrow_field =
1060 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1061 let field = Field::try_from(&arrow_field).unwrap();
1062
1063 let data = create_fixed_width_block(32, 100);
1065
1066 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1067 let debug_str = format!("{:?}", compressor);
1068 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1069 }
1070
1071 #[test]
1072 fn test_bss_with_compression() {
1073 let params = CompressionParams::new();
1074 let strategy = DefaultCompressionStrategy::with_params(params);
1075
1076 let mut metadata = HashMap::new();
1078 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1079 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1080 let arrow_field =
1081 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1082 let field = Field::try_from(&arrow_field).unwrap();
1083
1084 let data = create_fixed_width_block(64, 100);
1086
1087 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1088 let debug_str = format!("{:?}", compressor);
1089 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1091 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1092 }
1093}