1use crate::{
20 buffer::LanceBuffer,
21 compression_config::{CompressionFieldParams, CompressionParams},
22 constants::{COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY},
23 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
24 encodings::{
25 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
26 physical::{
27 binary::{
28 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
29 VariableDecoder, VariableEncoder,
30 },
31 bitpack::InlineBitpacking,
32 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
33 byte_stream_split::ByteStreamSplitDecompressor,
34 constant::ConstantDecompressor,
35 fsst::{
36 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
37 FsstPerValueEncoder,
38 },
39 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
40 packed::{
41 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
42 },
43 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
44 value::{ValueDecompressor, ValueEncoder},
45 },
46 },
47 format::{pb, ProtobufUtils},
48 statistics::{GetStat, Stat},
49};
50use arrow::{array::AsArray, datatypes::UInt64Type};
51use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
52use lance_core::{datatypes::Field, Error, Result};
53use snafu::location;
54
55const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
58
59pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
72 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
77}
78
79pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
92 fn create_block_compressor(
94 &self,
95 field: &Field,
96 data: &DataBlock,
97 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
98
99 fn create_per_value(
101 &self,
102 field: &Field,
103 data: &DataBlock,
104 ) -> Result<Box<dyn PerValueCompressor>>;
105
106 fn create_miniblock_compressor(
108 &self,
109 field: &Field,
110 data: &DataBlock,
111 ) -> Result<Box<dyn MiniBlockCompressor>>;
112}
113
114#[derive(Debug, Default)]
115pub struct DefaultCompressionStrategy {
116 params: CompressionParams,
118}
119
120impl DefaultCompressionStrategy {
121 pub fn new() -> Self {
123 Self::default()
124 }
125
126 pub fn with_params(params: CompressionParams) -> Self {
128 Self { params }
129 }
130
131 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
133 let mut params = CompressionFieldParams::default();
134
135 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
137 params.compression = Some(compression.clone());
138 }
139
140 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
142 params.compression_level = level.parse().ok();
143 }
144
145 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
147 params.rle_threshold = threshold.parse().ok();
148 }
149
150 params
151 }
152
153 fn build_fixed_width_compressor(
155 &self,
156 params: &CompressionFieldParams,
157 data: &FixedWidthDataBlock,
158 ) -> Result<Box<dyn MiniBlockCompressor>> {
159 let bits_per_value = data.bits_per_value;
160 let is_byte_aligned = bits_per_value == 8
161 || bits_per_value == 16
162 || bits_per_value == 32
163 || bits_per_value == 64;
164
165 let bit_widths = data.expect_stat(Stat::BitWidth);
167 let bit_widths = bit_widths.as_primitive::<UInt64Type>();
168 let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
169 let too_small = bit_widths.len() == 1
170 && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
171
172 if params.compression.as_deref() == Some("none") {
174 return Ok(Box::new(ValueEncoder::default()));
175 }
176
177 let mut base_encoder: Box<dyn MiniBlockCompressor> = {
179 let rle_threshold = params
181 .rle_threshold
182 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
183
184 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
185 let num_values = data.num_values;
186
187 if (run_count as f64) < (num_values as f64) * rle_threshold && is_byte_aligned {
188 Box::new(RleMiniBlockEncoder::new())
189 } else if !has_all_zeros && !too_small && is_byte_aligned {
190 Box::new(InlineBitpacking::new(bits_per_value))
192 } else {
193 Box::new(ValueEncoder::default())
195 }
196 };
197
198 if let Some(compression_scheme) = ¶ms.compression {
200 if compression_scheme != "none" && compression_scheme != "fsst" {
201 let scheme: CompressionScheme = compression_scheme.parse()?;
202 let config = CompressionConfig::new(scheme, params.compression_level);
203 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
204 }
205 }
206
207 Ok(base_encoder)
208 }
209
210 fn build_variable_width_compressor(
212 &self,
213 params: &CompressionFieldParams,
214 data: &VariableWidthBlock,
215 ) -> Result<Box<dyn MiniBlockCompressor>> {
216 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
217 return Err(Error::invalid_input(
218 format!(
219 "Variable width compression not supported for {} bit offsets",
220 data.bits_per_offset
221 ),
222 location!(),
223 ));
224 }
225
226 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
228 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
229
230 if params.compression.as_deref() == Some("none") {
232 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
233 }
234
235 if params.compression.as_deref() == Some("fsst") {
237 return Ok(Box::new(FsstMiniBlockEncoder::default()));
238 }
239
240 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
242 >= FSST_LEAST_INPUT_MAX_LENGTH
243 && data_size >= FSST_LEAST_INPUT_SIZE as u64
244 {
245 Box::new(FsstMiniBlockEncoder::default())
246 } else {
247 Box::new(BinaryMiniBlockEncoder::default())
248 };
249
250 if let Some(compression_scheme) = ¶ms.compression {
252 if compression_scheme != "none" && compression_scheme != "fsst" {
253 let scheme: CompressionScheme = compression_scheme.parse()?;
254 let config = CompressionConfig::new(scheme, params.compression_level);
255 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
256 }
257 }
258
259 Ok(base_encoder)
260 }
261}
262
263impl CompressionStrategy for DefaultCompressionStrategy {
264 fn create_miniblock_compressor(
265 &self,
266 field: &Field,
267 data: &DataBlock,
268 ) -> Result<Box<dyn MiniBlockCompressor>> {
269 let mut field_params = self
270 .params
271 .get_field_params(&field.name, &field.data_type());
272
273 let metadata_params = Self::parse_field_metadata(field);
275 field_params.merge(&metadata_params);
276
277 match data {
278 DataBlock::FixedWidth(fixed_width_data) => {
279 self.build_fixed_width_compressor(&field_params, fixed_width_data)
280 }
281 DataBlock::VariableWidth(variable_width_data) => {
282 self.build_variable_width_compressor(&field_params, variable_width_data)
283 }
284 DataBlock::Struct(struct_data_block) => {
285 if struct_data_block
288 .children
289 .iter()
290 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
291 {
292 panic!("packed struct encoding currently only supports fixed-width fields.")
293 }
294 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
295 }
296 DataBlock::FixedSizeList(_) => {
297 Ok(Box::new(ValueEncoder::default()))
305 }
306 _ => Err(Error::NotSupported {
307 source: format!(
308 "Mini-block compression not yet supported for block type {}",
309 data.name()
310 )
311 .into(),
312 location: location!(),
313 }),
314 }
315 }
316
317 fn create_per_value(
318 &self,
319 field: &Field,
320 data: &DataBlock,
321 ) -> Result<Box<dyn PerValueCompressor>> {
322 let field_params = Self::parse_field_metadata(field);
323
324 match data {
325 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
326 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
327 DataBlock::VariableWidth(variable_width) => {
328 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
329 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
330
331 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
335 return Ok(Box::new(CompressedBufferEncoder::default()));
336 }
337
338 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
339 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
340 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
341
342 let variable_compression = Box::new(VariableEncoder::default());
343
344 if field_params.compression.as_deref() == Some("fsst")
346 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
347 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
348 {
349 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
350 } else {
351 Ok(variable_compression)
352 }
353 } else {
354 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
355 }
356 }
357 _ => unreachable!(
358 "Per-value compression not yet supported for block type: {}",
359 data.name()
360 ),
361 }
362 }
363
364 fn create_block_compressor(
365 &self,
366 _field: &Field,
367 data: &DataBlock,
368 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
369 match data {
371 DataBlock::FixedWidth(fixed_width) => {
374 let encoder = Box::new(ValueEncoder::default());
375 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
376 Ok((encoder, encoding))
377 }
378 DataBlock::VariableWidth(variable_width) => {
379 let encoder = Box::new(VariableEncoder::default());
380 let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
381 Ok((encoder, encoding))
382 }
383 _ => unreachable!(),
384 }
385 }
386}
387
388pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
389 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
390}
391
392pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
393 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
395 fn bits_per_value(&self) -> u64;
399}
400
401pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
402 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
404}
405
406pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
407 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
408}
409
410pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
411 fn create_miniblock_decompressor(
412 &self,
413 description: &pb::ArrayEncoding,
414 decompression_strategy: &dyn DecompressionStrategy,
415 ) -> Result<Box<dyn MiniBlockDecompressor>>;
416
417 fn create_fixed_per_value_decompressor(
418 &self,
419 description: &pb::ArrayEncoding,
420 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
421
422 fn create_variable_per_value_decompressor(
423 &self,
424 description: &pb::ArrayEncoding,
425 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
426
427 fn create_block_decompressor(
428 &self,
429 description: &pb::ArrayEncoding,
430 ) -> Result<Box<dyn BlockDecompressor>>;
431}
432
433#[derive(Debug, Default)]
434pub struct DefaultDecompressionStrategy {}
435
436impl DecompressionStrategy for DefaultDecompressionStrategy {
437 fn create_miniblock_decompressor(
438 &self,
439 description: &pb::ArrayEncoding,
440 decompression_strategy: &dyn DecompressionStrategy,
441 ) -> Result<Box<dyn MiniBlockDecompressor>> {
442 match description.array_encoding.as_ref().unwrap() {
443 pb::array_encoding::ArrayEncoding::Flat(flat) => {
444 Ok(Box::new(ValueDecompressor::from_flat(flat)))
445 }
446 pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
447 Ok(Box::new(InlineBitpacking::from_description(description)))
448 }
449 pb::array_encoding::ArrayEncoding::Variable(variable) => Ok(Box::new(
450 BinaryMiniBlockDecompressor::new(variable.bits_per_offset as u8),
451 )),
452 pb::array_encoding::ArrayEncoding::Fsst(description) => {
453 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
454 description.binary.as_ref().unwrap(),
455 decompression_strategy,
456 )?;
457 Ok(Box::new(FsstMiniBlockDecompressor::new(
458 description,
459 inner_decompressor,
460 )))
461 }
462 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
463 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
464 description,
465 )))
466 }
467 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
468 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
471 }
472 pb::array_encoding::ArrayEncoding::Rle(rle) => {
473 Ok(Box::new(RleMiniBlockDecompressor::new(rle.bits_per_value)))
474 }
475 pb::array_encoding::ArrayEncoding::ByteStreamSplit(bss) => Ok(Box::new(
476 ByteStreamSplitDecompressor::new(bss.bits_per_value as usize),
477 )),
478 pb::array_encoding::ArrayEncoding::GeneralMiniBlock(general) => {
479 let inner_decompressor = self.create_miniblock_decompressor(
481 general.inner.as_ref().ok_or_else(|| {
482 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
483 })?,
484 decompression_strategy,
485 )?;
486
487 let compression = general.compression.as_ref().ok_or_else(|| {
489 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
490 })?;
491
492 let scheme = compression.scheme.parse()?;
493
494 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
495 scheme,
496 compression.level,
497 );
498
499 Ok(Box::new(GeneralMiniBlockDecompressor::new(
500 inner_decompressor,
501 compression_config,
502 )))
503 }
504 _ => todo!(),
505 }
506 }
507
508 fn create_fixed_per_value_decompressor(
509 &self,
510 description: &pb::ArrayEncoding,
511 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
512 match description.array_encoding.as_ref().unwrap() {
513 pb::array_encoding::ArrayEncoding::Flat(flat) => {
514 Ok(Box::new(ValueDecompressor::from_flat(flat)))
515 }
516 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
517 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
518 }
519 _ => todo!("fixed-per-value decompressor for {:?}", description),
520 }
521 }
522
523 fn create_variable_per_value_decompressor(
524 &self,
525 description: &pb::ArrayEncoding,
526 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
527 match *description.array_encoding.as_ref().unwrap() {
528 pb::array_encoding::ArrayEncoding::Variable(variable) => {
529 assert!(variable.bits_per_offset < u8::MAX as u32);
530 Ok(Box::new(VariableDecoder::default()))
531 }
532 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
533 Ok(Box::new(FsstPerValueDecompressor::new(
534 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
535 Box::new(VariableDecoder::default()),
536 )))
537 }
538 pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
539 CompressedBufferEncoder::from_scheme(&block.scheme)?,
540 )),
541 _ => todo!("variable-per-value decompressor for {:?}", description),
542 }
543 }
544
545 fn create_block_decompressor(
546 &self,
547 description: &pb::ArrayEncoding,
548 ) -> Result<Box<dyn BlockDecompressor>> {
549 match description.array_encoding.as_ref().unwrap() {
550 pb::array_encoding::ArrayEncoding::Flat(flat) => {
551 Ok(Box::new(ValueDecompressor::from_flat(flat)))
552 }
553 pb::array_encoding::ArrayEncoding::Constant(constant) => {
554 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
555 Ok(Box::new(ConstantDecompressor::new(scalar)))
556 }
557 pb::array_encoding::ArrayEncoding::Variable(_) => {
558 Ok(Box::new(BinaryBlockDecompressor::default()))
559 }
560 _ => todo!(),
561 }
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::buffer::LanceBuffer;
569 use crate::data::{BlockInfo, DataBlock};
570 use arrow::datatypes::{DataType, Field as ArrowField};
571 use std::collections::HashMap;
572
573 fn create_test_field(name: &str, data_type: DataType) -> Field {
574 let arrow_field = ArrowField::new(name, data_type, true);
575 let mut field = Field::try_from(&arrow_field).unwrap();
576 field.id = -1;
577 field
578 }
579
580 fn create_fixed_width_block_with_stats(
581 bits_per_value: u64,
582 num_values: u64,
583 run_count: u64,
584 ) -> DataBlock {
585 let block = FixedWidthDataBlock {
586 bits_per_value,
587 data: LanceBuffer::reinterpret_vec(vec![
588 0u8;
589 (bits_per_value * num_values / 8) as usize
590 ]),
591 num_values,
592 block_info: BlockInfo::default(),
593 };
594
595 use crate::statistics::Stat;
597 use arrow::array::{ArrayRef, UInt64Array};
598 use std::sync::Arc;
599
600 let bit_widths = Arc::new(UInt64Array::from(vec![bits_per_value])) as ArrayRef;
601 let run_count_stat = Arc::new(UInt64Array::from(vec![run_count])) as ArrayRef;
602
603 block
604 .block_info
605 .0
606 .write()
607 .unwrap()
608 .insert(Stat::BitWidth, bit_widths);
609 block
610 .block_info
611 .0
612 .write()
613 .unwrap()
614 .insert(Stat::RunCount, run_count_stat);
615
616 DataBlock::FixedWidth(block)
617 }
618
619 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
620 create_fixed_width_block_with_stats(bits_per_value, num_values, num_values / 4)
622 }
623
624 #[test]
625 fn test_parameter_based_compression() {
626 let mut params = CompressionParams::new();
627
628 params.columns.insert(
630 "*_id".to_string(),
631 CompressionFieldParams {
632 rle_threshold: Some(0.3),
633 compression: Some("lz4".to_string()),
634 compression_level: None,
635 },
636 );
637
638 let strategy = DefaultCompressionStrategy::with_params(params);
639 let field = create_test_field("user_id", DataType::Int32);
640 let data = create_fixed_width_block(32, 1000);
641
642 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
643 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
645 }
646
647 #[test]
648 fn test_type_level_parameters() {
649 let mut params = CompressionParams::new();
650
651 params.types.insert(
653 "Int32".to_string(),
654 CompressionFieldParams {
655 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
657 compression_level: Some(3),
658 },
659 );
660
661 let strategy = DefaultCompressionStrategy::with_params(params);
662 let field = create_test_field("some_column", DataType::Int32);
663 let data = create_fixed_width_block_with_stats(32, 1000, 50);
665
666 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
667 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
669 }
670
671 #[test]
672 fn test_none_compression() {
673 let mut params = CompressionParams::new();
674
675 params.columns.insert(
677 "embeddings".to_string(),
678 CompressionFieldParams {
679 compression: Some("none".to_string()),
680 ..Default::default()
681 },
682 );
683
684 let strategy = DefaultCompressionStrategy::with_params(params);
685 let field = create_test_field("embeddings", DataType::Float32);
686 let data = create_fixed_width_block(32, 1000);
687
688 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
689 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
691 }
692
693 #[test]
694 fn test_parameter_merge_priority() {
695 let mut params = CompressionParams::new();
696
697 params.types.insert(
699 "Int32".to_string(),
700 CompressionFieldParams {
701 rle_threshold: Some(0.5),
702 compression: Some("lz4".to_string()),
703 ..Default::default()
704 },
705 );
706
707 params.columns.insert(
709 "user_id".to_string(),
710 CompressionFieldParams {
711 rle_threshold: Some(0.2),
712 compression: Some("zstd".to_string()),
713 compression_level: Some(6),
714 },
715 );
716
717 let strategy = DefaultCompressionStrategy::with_params(params);
718
719 let merged = strategy
721 .params
722 .get_field_params("user_id", &DataType::Int32);
723
724 assert_eq!(merged.rle_threshold, Some(0.2));
726 assert_eq!(merged.compression, Some("zstd".to_string()));
727 assert_eq!(merged.compression_level, Some(6));
728
729 let merged = strategy
731 .params
732 .get_field_params("other_field", &DataType::Int32);
733 assert_eq!(merged.rle_threshold, Some(0.5));
734 assert_eq!(merged.compression, Some("lz4".to_string()));
735 assert_eq!(merged.compression_level, None);
736 }
737
738 #[test]
739 fn test_pattern_matching() {
740 let mut params = CompressionParams::new();
741
742 params.columns.insert(
744 "log_*".to_string(),
745 CompressionFieldParams {
746 compression: Some("zstd".to_string()),
747 compression_level: Some(6),
748 ..Default::default()
749 },
750 );
751
752 let strategy = DefaultCompressionStrategy::with_params(params);
753
754 let merged = strategy
756 .params
757 .get_field_params("log_messages", &DataType::Utf8);
758 assert_eq!(merged.compression, Some("zstd".to_string()));
759 assert_eq!(merged.compression_level, Some(6));
760
761 let merged = strategy
763 .params
764 .get_field_params("messages_log", &DataType::Utf8);
765 assert_eq!(merged.compression, None);
766 }
767
768 #[test]
769 fn test_legacy_metadata_support() {
770 let params = CompressionParams::new();
771 let strategy = DefaultCompressionStrategy::with_params(params);
772
773 let mut metadata = HashMap::new();
775 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
776 let mut field = create_test_field("some_column", DataType::Int32);
777 field.metadata = metadata;
778
779 let data = create_fixed_width_block(32, 1000);
780 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
781
782 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
784 }
785
786 #[test]
787 fn test_default_behavior() {
788 let params = CompressionParams::new();
790 let strategy = DefaultCompressionStrategy::with_params(params);
791
792 let field = create_test_field("random_column", DataType::Int32);
793 let data = create_fixed_width_block_with_stats(32, 1000, 600);
795
796 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
797 let debug_str = format!("{:?}", compressor);
799 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
800 }
801
802 #[test]
803 fn test_field_metadata_compression() {
804 let params = CompressionParams::new();
805 let strategy = DefaultCompressionStrategy::with_params(params);
806
807 let mut metadata = HashMap::new();
809 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
810 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
811 let mut field = create_test_field("test_column", DataType::Int32);
812 field.metadata = metadata;
813
814 let data = create_fixed_width_block(32, 1000);
815 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
816
817 let debug_str = format!("{:?}", compressor);
819 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
820 }
821
822 #[test]
823 fn test_field_metadata_rle_threshold() {
824 let params = CompressionParams::new();
825 let strategy = DefaultCompressionStrategy::with_params(params);
826
827 let mut metadata = HashMap::new();
829 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
830 let mut field = create_test_field("test_column", DataType::Int32);
831 field.metadata = metadata;
832
833 let data = create_fixed_width_block_with_stats(32, 1000, 700);
835 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
836
837 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
839 }
840
841 #[test]
842 fn test_field_metadata_override_params() {
843 let mut params = CompressionParams::new();
845 params.columns.insert(
846 "test_column".to_string(),
847 CompressionFieldParams {
848 rle_threshold: Some(0.3),
849 compression: Some("lz4".to_string()),
850 compression_level: None,
851 },
852 );
853
854 let strategy = DefaultCompressionStrategy::with_params(params);
855
856 let mut metadata = HashMap::new();
858 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
859 let mut field = create_test_field("test_column", DataType::Int32);
860 field.metadata = metadata;
861
862 let data = create_fixed_width_block(32, 1000);
863 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
864
865 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
867 }
868
869 #[test]
870 fn test_field_metadata_mixed_configuration() {
871 let mut params = CompressionParams::new();
873 params.types.insert(
874 "Int32".to_string(),
875 CompressionFieldParams {
876 rle_threshold: Some(0.5),
877 compression: Some("lz4".to_string()),
878 ..Default::default()
879 },
880 );
881
882 let strategy = DefaultCompressionStrategy::with_params(params);
883
884 let mut metadata = HashMap::new();
886 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
887 let mut field = create_test_field("test_column", DataType::Int32);
888 field.metadata = metadata;
889
890 let data = create_fixed_width_block(32, 1000);
891 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
892
893 let debug_str = format!("{:?}", compressor);
895 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
896 }
897}