1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::InlineBitpacking;
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36 byte_stream_split::{
37 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38 },
39 constant::ConstantDecompressor,
40 fsst::{
41 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42 FsstPerValueEncoder,
43 },
44 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45 packed::{
46 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47 },
48 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49 value::{ValueDecompressor, ValueEncoder},
50 },
51 },
52 format::{
53 pb21::{compressive_encoding::Compression, CompressiveEncoding},
54 ProtobufUtils21,
55 },
56 statistics::{GetStat, Stat},
57};
58
59use arrow_array::types::UInt64Type;
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102 fn create_block_compressor(
104 &self,
105 field: &Field,
106 data: &DataBlock,
107 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109 fn create_per_value(
111 &self,
112 field: &Field,
113 data: &DataBlock,
114 ) -> Result<Box<dyn PerValueCompressor>>;
115
116 fn create_miniblock_compressor(
118 &self,
119 field: &Field,
120 data: &DataBlock,
121 ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126 params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131 data: &FixedWidthDataBlock,
132 params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137 return None;
138 }
139
140 let mode = params.bss.unwrap_or(BssMode::Auto);
141 if should_use_bss(data, mode) {
143 return Some(Box::new(ByteStreamSplitEncoder::new(
144 data.bits_per_value as usize,
145 )));
146 }
147 None
148}
149
150fn try_rle_for_mini_block(
151 data: &FixedWidthDataBlock,
152 params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154 let bits = data.bits_per_value;
155 if !matches!(bits, 8 | 16 | 32 | 64) {
156 return None;
157 }
158
159 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160 let threshold = params
161 .rle_threshold
162 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164 if (run_count as f64) < (data.num_values as f64) * threshold {
165 return Some(Box::new(RleMiniBlockEncoder::new()));
166 }
167 None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171 #[cfg(feature = "bitpacking")]
172 {
173 use arrow_array::cast::AsArray;
174
175 let bits = _data.bits_per_value;
176 if !matches!(bits, 8 | 16 | 32 | 64) {
177 return None;
178 }
179
180 let bit_widths = _data.expect_stat(Stat::BitWidth);
181 let widths = bit_widths.as_primitive::<UInt64Type>();
182 let has_all_zeros = widths.values().contains(&0);
183 let too_small = widths.len() == 1
184 && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
185
186 if !has_all_zeros && !too_small {
187 return Some(Box::new(InlineBitpacking::new(bits)));
188 }
189 None
190 }
191 #[cfg(not(feature = "bitpacking"))]
192 {
193 None
194 }
195}
196
197fn maybe_wrap_general_for_mini_block(
198 inner: Box<dyn MiniBlockCompressor>,
199 params: &CompressionFieldParams,
200) -> Result<Box<dyn MiniBlockCompressor>> {
201 match params.compression.as_deref() {
202 None | Some("none") | Some("fsst") => Ok(inner),
203 Some(raw) => {
204 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
205 lance_core::Error::invalid_input(
206 format!("Unknown compression scheme: {raw}"),
207 location!(),
208 )
209 })?;
210 let cfg = CompressionConfig::new(scheme, params.compression_level);
211 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
212 }
213 }
214}
215
216impl DefaultCompressionStrategy {
217 pub fn new() -> Self {
219 Self::default()
220 }
221
222 pub fn with_params(params: CompressionParams) -> Self {
224 Self { params }
225 }
226
227 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
229 let mut params = CompressionFieldParams::default();
230
231 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
233 params.compression = Some(compression.clone());
234 }
235
236 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
238 params.compression_level = level.parse().ok();
239 }
240
241 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
243 params.rle_threshold = threshold.parse().ok();
244 }
245
246 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
248 match BssMode::parse(bss_str) {
249 Some(mode) => params.bss = Some(mode),
250 None => {
251 log::warn!("Invalid BSS mode '{}', using default", bss_str);
252 }
253 }
254 }
255
256 params
257 }
258
259 fn build_fixed_width_compressor(
260 &self,
261 params: &CompressionFieldParams,
262 data: &FixedWidthDataBlock,
263 ) -> Result<Box<dyn MiniBlockCompressor>> {
264 if params.compression.as_deref() == Some("none") {
265 return Ok(Box::new(ValueEncoder::default()));
266 }
267
268 let base = try_bss_for_mini_block(data, params)
269 .or_else(|| try_rle_for_mini_block(data, params))
270 .or_else(|| try_bitpack_for_mini_block(data))
271 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
272
273 maybe_wrap_general_for_mini_block(base, params)
274 }
275
276 fn build_variable_width_compressor(
278 &self,
279 params: &CompressionFieldParams,
280 data: &VariableWidthBlock,
281 ) -> Result<Box<dyn MiniBlockCompressor>> {
282 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
283 return Err(Error::invalid_input(
284 format!(
285 "Variable width compression not supported for {} bit offsets",
286 data.bits_per_offset
287 ),
288 location!(),
289 ));
290 }
291
292 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
294 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
295
296 if params.compression.as_deref() == Some("none") {
298 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
299 }
300
301 if params.compression.as_deref() == Some("fsst") {
303 return Ok(Box::new(FsstMiniBlockEncoder::default()));
304 }
305
306 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
308 >= FSST_LEAST_INPUT_MAX_LENGTH
309 && data_size >= FSST_LEAST_INPUT_SIZE as u64
310 {
311 Box::new(FsstMiniBlockEncoder::default())
312 } else {
313 Box::new(BinaryMiniBlockEncoder::default())
314 };
315
316 if let Some(compression_scheme) = ¶ms.compression {
318 if compression_scheme != "none" && compression_scheme != "fsst" {
319 let scheme: CompressionScheme = compression_scheme.parse()?;
320 let config = CompressionConfig::new(scheme, params.compression_level);
321 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
322 }
323 }
324
325 Ok(base_encoder)
326 }
327}
328
329impl CompressionStrategy for DefaultCompressionStrategy {
330 fn create_miniblock_compressor(
331 &self,
332 field: &Field,
333 data: &DataBlock,
334 ) -> Result<Box<dyn MiniBlockCompressor>> {
335 let mut field_params = self
336 .params
337 .get_field_params(&field.name, &field.data_type());
338
339 let metadata_params = Self::parse_field_metadata(field);
341 field_params.merge(&metadata_params);
342
343 match data {
344 DataBlock::FixedWidth(fixed_width_data) => {
345 self.build_fixed_width_compressor(&field_params, fixed_width_data)
346 }
347 DataBlock::VariableWidth(variable_width_data) => {
348 self.build_variable_width_compressor(&field_params, variable_width_data)
349 }
350 DataBlock::Struct(struct_data_block) => {
351 if struct_data_block
354 .children
355 .iter()
356 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
357 {
358 panic!("packed struct encoding currently only supports fixed-width fields.")
359 }
360 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
361 }
362 DataBlock::FixedSizeList(_) => {
363 Ok(Box::new(ValueEncoder::default()))
371 }
372 _ => Err(Error::NotSupported {
373 source: format!(
374 "Mini-block compression not yet supported for block type {}",
375 data.name()
376 )
377 .into(),
378 location: location!(),
379 }),
380 }
381 }
382
383 fn create_per_value(
384 &self,
385 field: &Field,
386 data: &DataBlock,
387 ) -> Result<Box<dyn PerValueCompressor>> {
388 let field_params = Self::parse_field_metadata(field);
389
390 match data {
391 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
392 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
393 DataBlock::VariableWidth(variable_width) => {
394 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
395 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
396
397 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
401 return Ok(Box::new(CompressedBufferEncoder::default()));
402 }
403
404 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
405 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
406 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
407
408 let variable_compression = Box::new(VariableEncoder::default());
409
410 if field_params.compression.as_deref() == Some("fsst")
412 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
413 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
414 {
415 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
416 } else {
417 Ok(variable_compression)
418 }
419 } else {
420 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
421 }
422 }
423 _ => unreachable!(
424 "Per-value compression not yet supported for block type: {}",
425 data.name()
426 ),
427 }
428 }
429
430 fn create_block_compressor(
431 &self,
432 _field: &Field,
433 data: &DataBlock,
434 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
435 match data {
437 DataBlock::FixedWidth(fixed_width) => {
440 let encoder = Box::new(ValueEncoder::default());
441 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
442 Ok((encoder, encoding))
443 }
444 DataBlock::VariableWidth(variable_width) => {
445 let encoder = Box::new(VariableEncoder::default());
446 let encoding = ProtobufUtils21::variable(
447 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
448 None,
449 );
450 Ok((encoder, encoding))
451 }
452 _ => unreachable!(),
453 }
454 }
455}
456
457pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
458 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
459}
460
461pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
462 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
464 fn bits_per_value(&self) -> u64;
468}
469
470pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
471 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
473}
474
475pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
476 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
477}
478
479pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
480 fn create_miniblock_decompressor(
481 &self,
482 description: &CompressiveEncoding,
483 decompression_strategy: &dyn DecompressionStrategy,
484 ) -> Result<Box<dyn MiniBlockDecompressor>>;
485
486 fn create_fixed_per_value_decompressor(
487 &self,
488 description: &CompressiveEncoding,
489 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
490
491 fn create_variable_per_value_decompressor(
492 &self,
493 description: &CompressiveEncoding,
494 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
495
496 fn create_block_decompressor(
497 &self,
498 description: &CompressiveEncoding,
499 ) -> Result<Box<dyn BlockDecompressor>>;
500}
501
502#[derive(Debug, Default)]
503pub struct DefaultDecompressionStrategy {}
504
505impl DecompressionStrategy for DefaultDecompressionStrategy {
506 fn create_miniblock_decompressor(
507 &self,
508 description: &CompressiveEncoding,
509 decompression_strategy: &dyn DecompressionStrategy,
510 ) -> Result<Box<dyn MiniBlockDecompressor>> {
511 match description.compression.as_ref().unwrap() {
512 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
513 #[cfg(feature = "bitpacking")]
514 Compression::InlineBitpacking(description) => {
515 Ok(Box::new(InlineBitpacking::from_description(description)))
516 }
517 #[cfg(not(feature = "bitpacking"))]
518 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
519 source: "this runtime was not built with bitpacking support".into(),
520 location: location!(),
521 }),
522 Compression::Variable(variable) => {
523 let Compression::Flat(offsets) = variable
524 .offsets
525 .as_ref()
526 .unwrap()
527 .compression
528 .as_ref()
529 .unwrap()
530 else {
531 panic!("Variable compression only supports flat offsets")
532 };
533 Ok(Box::new(BinaryMiniBlockDecompressor::new(
534 offsets.bits_per_value as u8,
535 )))
536 }
537 Compression::Fsst(description) => {
538 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
539 description.values.as_ref().unwrap(),
540 decompression_strategy,
541 )?;
542 Ok(Box::new(FsstMiniBlockDecompressor::new(
543 description,
544 inner_decompressor,
545 )))
546 }
547 Compression::PackedStruct(description) => Ok(Box::new(
548 PackedStructFixedWidthMiniBlockDecompressor::new(description),
549 )),
550 Compression::FixedSizeList(fsl) => {
551 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
554 }
555 Compression::Rle(rle) => {
556 let Compression::Flat(values) =
557 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
558 else {
559 panic!("RLE compression only supports flat values")
560 };
561 let Compression::Flat(run_lengths) = rle
562 .run_lengths
563 .as_ref()
564 .unwrap()
565 .compression
566 .as_ref()
567 .unwrap()
568 else {
569 panic!("RLE compression only supports flat run lengths")
570 };
571 assert_eq!(
572 run_lengths.bits_per_value, 8,
573 "RLE compression only supports 8-bit run lengths"
574 );
575 Ok(Box::new(RleMiniBlockDecompressor::new(
576 values.bits_per_value,
577 )))
578 }
579 Compression::ByteStreamSplit(bss) => {
580 let Compression::Flat(values) =
581 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
582 else {
583 panic!("ByteStreamSplit compression only supports flat values")
584 };
585 Ok(Box::new(ByteStreamSplitDecompressor::new(
586 values.bits_per_value as usize,
587 )))
588 }
589 Compression::General(general) => {
590 let inner_decompressor = self.create_miniblock_decompressor(
592 general.values.as_ref().ok_or_else(|| {
593 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
594 })?,
595 decompression_strategy,
596 )?;
597
598 let compression = general.compression.as_ref().ok_or_else(|| {
600 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
601 })?;
602
603 let scheme = compression.scheme().try_into()?;
604
605 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
606 scheme,
607 compression.level,
608 );
609
610 Ok(Box::new(GeneralMiniBlockDecompressor::new(
611 inner_decompressor,
612 compression_config,
613 )))
614 }
615 _ => todo!(),
616 }
617 }
618
619 fn create_fixed_per_value_decompressor(
620 &self,
621 description: &CompressiveEncoding,
622 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
623 match description.compression.as_ref().unwrap() {
624 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
625 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
626 _ => todo!("fixed-per-value decompressor for {:?}", description),
627 }
628 }
629
630 fn create_variable_per_value_decompressor(
631 &self,
632 description: &CompressiveEncoding,
633 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
634 match description.compression.as_ref().unwrap() {
635 Compression::Variable(variable) => {
636 let Compression::Flat(offsets) = variable
637 .offsets
638 .as_ref()
639 .unwrap()
640 .compression
641 .as_ref()
642 .unwrap()
643 else {
644 panic!("Variable compression only supports flat offsets")
645 };
646 assert!(offsets.bits_per_value < u8::MAX as u64);
647 Ok(Box::new(VariableDecoder::default()))
648 }
649 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
650 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
651 Box::new(VariableDecoder::default()),
652 ))),
653 Compression::General(ref general) => {
654 Ok(Box::new(CompressedBufferEncoder::from_scheme(
655 general.compression.as_ref().expect_ok()?.scheme(),
656 )?))
657 }
658 _ => todo!("variable-per-value decompressor for {:?}", description),
659 }
660 }
661
662 fn create_block_decompressor(
663 &self,
664 description: &CompressiveEncoding,
665 ) -> Result<Box<dyn BlockDecompressor>> {
666 match description.compression.as_ref().unwrap() {
667 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
668 Compression::Constant(constant) => {
669 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
670 Ok(Box::new(ConstantDecompressor::new(scalar)))
671 }
672 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
673 _ => todo!(),
674 }
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use super::*;
681 use crate::buffer::LanceBuffer;
682 use crate::data::{BlockInfo, DataBlock};
683 use arrow_schema::{DataType, Field as ArrowField};
684 use std::collections::HashMap;
685
686 fn create_test_field(name: &str, data_type: DataType) -> Field {
687 let arrow_field = ArrowField::new(name, data_type, true);
688 let mut field = Field::try_from(&arrow_field).unwrap();
689 field.id = -1;
690 field
691 }
692
693 fn create_fixed_width_block_with_stats(
694 bits_per_value: u64,
695 num_values: u64,
696 run_count: u64,
697 ) -> DataBlock {
698 let bytes_per_value = (bits_per_value / 8) as usize;
700 let total_bytes = bytes_per_value * num_values as usize;
701 let mut data = vec![0u8; total_bytes];
702
703 let values_per_run = (num_values / run_count).max(1);
705 let mut run_value = 0u8;
706
707 for i in 0..num_values as usize {
708 if i % values_per_run as usize == 0 {
709 run_value = run_value.wrapping_add(17); }
711 for j in 0..bytes_per_value {
713 let byte_offset = i * bytes_per_value + j;
714 if byte_offset < data.len() {
715 data[byte_offset] = run_value.wrapping_add(j as u8);
716 }
717 }
718 }
719
720 let mut block = FixedWidthDataBlock {
721 bits_per_value,
722 data: LanceBuffer::reinterpret_vec(data),
723 num_values,
724 block_info: BlockInfo::default(),
725 };
726
727 use crate::statistics::ComputeStat;
729 block.compute_stat();
730
731 DataBlock::FixedWidth(block)
732 }
733
734 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
735 let bytes_per_value = (bits_per_value / 8) as usize;
737 let total_bytes = bytes_per_value * num_values as usize;
738 let mut data = vec![0u8; total_bytes];
739
740 for i in 0..num_values as usize {
742 let byte_offset = i * bytes_per_value;
743 if byte_offset < data.len() {
744 data[byte_offset] = (i % 256) as u8;
745 }
746 }
747
748 let mut block = FixedWidthDataBlock {
749 bits_per_value,
750 data: LanceBuffer::reinterpret_vec(data),
751 num_values,
752 block_info: BlockInfo::default(),
753 };
754
755 use crate::statistics::ComputeStat;
757 block.compute_stat();
758
759 DataBlock::FixedWidth(block)
760 }
761
762 #[test]
763 fn test_parameter_based_compression() {
764 let mut params = CompressionParams::new();
765
766 params.columns.insert(
768 "*_id".to_string(),
769 CompressionFieldParams {
770 rle_threshold: Some(0.3),
771 compression: Some("lz4".to_string()),
772 compression_level: None,
773 bss: Some(BssMode::Off), },
775 );
776
777 let strategy = DefaultCompressionStrategy::with_params(params);
778 let field = create_test_field("user_id", DataType::Int32);
779
780 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
785 let debug_str = format!("{:?}", compressor);
787
788 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
790 assert!(debug_str.contains("RleMiniBlockEncoder"));
791 }
792
793 #[test]
794 fn test_type_level_parameters() {
795 let mut params = CompressionParams::new();
796
797 params.types.insert(
799 "Int32".to_string(),
800 CompressionFieldParams {
801 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
803 compression_level: Some(3),
804 bss: Some(BssMode::Off), },
806 );
807
808 let strategy = DefaultCompressionStrategy::with_params(params);
809 let field = create_test_field("some_column", DataType::Int32);
810 let data = create_fixed_width_block_with_stats(32, 1000, 50);
812
813 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
814 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
816 }
817
818 #[test]
819 fn test_none_compression() {
820 let mut params = CompressionParams::new();
821
822 params.columns.insert(
824 "embeddings".to_string(),
825 CompressionFieldParams {
826 compression: Some("none".to_string()),
827 ..Default::default()
828 },
829 );
830
831 let strategy = DefaultCompressionStrategy::with_params(params);
832 let field = create_test_field("embeddings", DataType::Float32);
833 let data = create_fixed_width_block(32, 1000);
834
835 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
836 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
838 }
839
840 #[test]
841 fn test_parameter_merge_priority() {
842 let mut params = CompressionParams::new();
843
844 params.types.insert(
846 "Int32".to_string(),
847 CompressionFieldParams {
848 rle_threshold: Some(0.5),
849 compression: Some("lz4".to_string()),
850 ..Default::default()
851 },
852 );
853
854 params.columns.insert(
856 "user_id".to_string(),
857 CompressionFieldParams {
858 rle_threshold: Some(0.2),
859 compression: Some("zstd".to_string()),
860 compression_level: Some(6),
861 bss: None,
862 },
863 );
864
865 let strategy = DefaultCompressionStrategy::with_params(params);
866
867 let merged = strategy
869 .params
870 .get_field_params("user_id", &DataType::Int32);
871
872 assert_eq!(merged.rle_threshold, Some(0.2));
874 assert_eq!(merged.compression, Some("zstd".to_string()));
875 assert_eq!(merged.compression_level, Some(6));
876
877 let merged = strategy
879 .params
880 .get_field_params("other_field", &DataType::Int32);
881 assert_eq!(merged.rle_threshold, Some(0.5));
882 assert_eq!(merged.compression, Some("lz4".to_string()));
883 assert_eq!(merged.compression_level, None);
884 }
885
886 #[test]
887 fn test_pattern_matching() {
888 let mut params = CompressionParams::new();
889
890 params.columns.insert(
892 "log_*".to_string(),
893 CompressionFieldParams {
894 compression: Some("zstd".to_string()),
895 compression_level: Some(6),
896 ..Default::default()
897 },
898 );
899
900 let strategy = DefaultCompressionStrategy::with_params(params);
901
902 let merged = strategy
904 .params
905 .get_field_params("log_messages", &DataType::Utf8);
906 assert_eq!(merged.compression, Some("zstd".to_string()));
907 assert_eq!(merged.compression_level, Some(6));
908
909 let merged = strategy
911 .params
912 .get_field_params("messages_log", &DataType::Utf8);
913 assert_eq!(merged.compression, None);
914 }
915
916 #[test]
917 fn test_legacy_metadata_support() {
918 let params = CompressionParams::new();
919 let strategy = DefaultCompressionStrategy::with_params(params);
920
921 let mut metadata = HashMap::new();
923 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
924 let mut field = create_test_field("some_column", DataType::Int32);
925 field.metadata = metadata;
926
927 let data = create_fixed_width_block(32, 1000);
928 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
929
930 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
932 }
933
934 #[test]
935 fn test_default_behavior() {
936 let params = CompressionParams::new();
938 let strategy = DefaultCompressionStrategy::with_params(params);
939
940 let field = create_test_field("random_column", DataType::Int32);
941 let data = create_fixed_width_block_with_stats(32, 1000, 600);
943
944 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
945 let debug_str = format!("{:?}", compressor);
947 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
948 }
949
950 #[test]
951 fn test_field_metadata_compression() {
952 let params = CompressionParams::new();
953 let strategy = DefaultCompressionStrategy::with_params(params);
954
955 let mut metadata = HashMap::new();
957 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
958 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
959 let mut field = create_test_field("test_column", DataType::Int32);
960 field.metadata = metadata;
961
962 let data = create_fixed_width_block(32, 1000);
963 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
964
965 let debug_str = format!("{:?}", compressor);
967 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
968 }
969
970 #[test]
971 fn test_field_metadata_rle_threshold() {
972 let params = CompressionParams::new();
973 let strategy = DefaultCompressionStrategy::with_params(params);
974
975 let mut metadata = HashMap::new();
977 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
978 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
980 field.metadata = metadata;
981
982 let data = create_fixed_width_block_with_stats(32, 1000, 100);
985
986 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
987
988 let debug_str = format!("{:?}", compressor);
990 assert!(debug_str.contains("RleMiniBlockEncoder"));
991 }
992
993 #[test]
994 fn test_field_metadata_override_params() {
995 let mut params = CompressionParams::new();
997 params.columns.insert(
998 "test_column".to_string(),
999 CompressionFieldParams {
1000 rle_threshold: Some(0.3),
1001 compression: Some("lz4".to_string()),
1002 compression_level: None,
1003 bss: None,
1004 },
1005 );
1006
1007 let strategy = DefaultCompressionStrategy::with_params(params);
1008
1009 let mut metadata = HashMap::new();
1011 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1012 let mut field = create_test_field("test_column", DataType::Int32);
1013 field.metadata = metadata;
1014
1015 let data = create_fixed_width_block(32, 1000);
1016 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1017
1018 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1020 }
1021
1022 #[test]
1023 fn test_field_metadata_mixed_configuration() {
1024 let mut params = CompressionParams::new();
1026 params.types.insert(
1027 "Int32".to_string(),
1028 CompressionFieldParams {
1029 rle_threshold: Some(0.5),
1030 compression: Some("lz4".to_string()),
1031 ..Default::default()
1032 },
1033 );
1034
1035 let strategy = DefaultCompressionStrategy::with_params(params);
1036
1037 let mut metadata = HashMap::new();
1039 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1040 let mut field = create_test_field("test_column", DataType::Int32);
1041 field.metadata = metadata;
1042
1043 let data = create_fixed_width_block(32, 1000);
1044 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1045
1046 let debug_str = format!("{:?}", compressor);
1048 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1049 }
1050
1051 #[test]
1052 fn test_bss_field_metadata() {
1053 let params = CompressionParams::new();
1054 let strategy = DefaultCompressionStrategy::with_params(params);
1055
1056 let mut metadata = HashMap::new();
1058 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1059 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1060 let arrow_field =
1061 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1062 let field = Field::try_from(&arrow_field).unwrap();
1063
1064 let data = create_fixed_width_block(32, 100);
1066
1067 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1068 let debug_str = format!("{:?}", compressor);
1069 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1070 }
1071
1072 #[test]
1073 fn test_bss_with_compression() {
1074 let params = CompressionParams::new();
1075 let strategy = DefaultCompressionStrategy::with_params(params);
1076
1077 let mut metadata = HashMap::new();
1079 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1080 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1081 let arrow_field =
1082 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1083 let field = Field::try_from(&arrow_field).unwrap();
1084
1085 let data = create_fixed_width_block(64, 100);
1087
1088 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1089 let debug_str = format!("{:?}", compressor);
1090 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1092 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1093 }
1094}