1use crate::{
20 buffer::LanceBuffer,
21 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
22 constants::{
23 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
24 },
25 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
26 encodings::{
27 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
28 physical::{
29 binary::{
30 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
31 VariableDecoder, VariableEncoder,
32 },
33 bitpacking::InlineBitpacking,
34 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
35 byte_stream_split::{
36 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
37 },
38 constant::ConstantDecompressor,
39 fsst::{
40 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
41 FsstPerValueEncoder,
42 },
43 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
44 packed::{
45 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
46 },
47 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
48 value::{ValueDecompressor, ValueEncoder},
49 },
50 },
51 format::{
52 pb21::{compressive_encoding::Compression, CompressiveEncoding},
53 ProtobufUtils21,
54 },
55 statistics::{GetStat, Stat},
56};
57use arrow::{array::AsArray, datatypes::UInt64Type};
58use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
59use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
60use snafu::location;
61use std::str::FromStr;
62
63const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
66
67pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
80 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
85}
86
87pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
100 fn create_block_compressor(
102 &self,
103 field: &Field,
104 data: &DataBlock,
105 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
106
107 fn create_per_value(
109 &self,
110 field: &Field,
111 data: &DataBlock,
112 ) -> Result<Box<dyn PerValueCompressor>>;
113
114 fn create_miniblock_compressor(
116 &self,
117 field: &Field,
118 data: &DataBlock,
119 ) -> Result<Box<dyn MiniBlockCompressor>>;
120}
121
122#[derive(Debug, Default)]
123pub struct DefaultCompressionStrategy {
124 params: CompressionParams,
126}
127
128fn try_bss_for_mini_block(
129 data: &FixedWidthDataBlock,
130 params: &CompressionFieldParams,
131) -> Option<Box<dyn MiniBlockCompressor>> {
132 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
135 return None;
136 }
137
138 let mode = params.bss.unwrap_or(BssMode::Auto);
139 if should_use_bss(data, mode) {
141 return Some(Box::new(ByteStreamSplitEncoder::new(
142 data.bits_per_value as usize,
143 )));
144 }
145 None
146}
147
148fn try_rle_for_mini_block(
149 data: &FixedWidthDataBlock,
150 params: &CompressionFieldParams,
151) -> Option<Box<dyn MiniBlockCompressor>> {
152 let bits = data.bits_per_value;
153 if !matches!(bits, 8 | 16 | 32 | 64) {
154 return None;
155 }
156
157 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
158 let threshold = params
159 .rle_threshold
160 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
161
162 if (run_count as f64) < (data.num_values as f64) * threshold {
163 return Some(Box::new(RleMiniBlockEncoder::new()));
164 }
165 None
166}
167
168fn try_bitpack_for_mini_block(data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
169 let bits = data.bits_per_value;
170 if !matches!(bits, 8 | 16 | 32 | 64) {
171 return None;
172 }
173
174 let bit_widths = data.expect_stat(Stat::BitWidth);
175 let widths = bit_widths.as_primitive::<UInt64Type>();
176 let has_all_zeros = widths.values().iter().any(|&w| w == 0);
177 let too_small =
178 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
179
180 if !has_all_zeros && !too_small {
181 return Some(Box::new(InlineBitpacking::new(bits)));
182 }
183 None
184}
185
186fn maybe_wrap_general_for_mini_block(
187 inner: Box<dyn MiniBlockCompressor>,
188 params: &CompressionFieldParams,
189) -> Result<Box<dyn MiniBlockCompressor>> {
190 match params.compression.as_deref() {
191 None | Some("none") | Some("fsst") => Ok(inner),
192 Some(raw) => {
193 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
194 lance_core::Error::invalid_input(
195 format!("Unknown compression scheme: {raw}"),
196 location!(),
197 )
198 })?;
199 let cfg = CompressionConfig::new(scheme, params.compression_level);
200 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
201 }
202 }
203}
204
205impl DefaultCompressionStrategy {
206 pub fn new() -> Self {
208 Self::default()
209 }
210
211 pub fn with_params(params: CompressionParams) -> Self {
213 Self { params }
214 }
215
216 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
218 let mut params = CompressionFieldParams::default();
219
220 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
222 params.compression = Some(compression.clone());
223 }
224
225 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
227 params.compression_level = level.parse().ok();
228 }
229
230 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
232 params.rle_threshold = threshold.parse().ok();
233 }
234
235 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
237 match BssMode::parse(bss_str) {
238 Some(mode) => params.bss = Some(mode),
239 None => {
240 log::warn!("Invalid BSS mode '{}', using default", bss_str);
241 }
242 }
243 }
244
245 params
246 }
247
248 fn build_fixed_width_compressor(
249 &self,
250 params: &CompressionFieldParams,
251 data: &FixedWidthDataBlock,
252 ) -> Result<Box<dyn MiniBlockCompressor>> {
253 if params.compression.as_deref() == Some("none") {
254 return Ok(Box::new(ValueEncoder::default()));
255 }
256
257 let base = try_bss_for_mini_block(data, params)
258 .or_else(|| try_rle_for_mini_block(data, params))
259 .or_else(|| try_bitpack_for_mini_block(data))
260 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
261
262 maybe_wrap_general_for_mini_block(base, params)
263 }
264
265 fn build_variable_width_compressor(
267 &self,
268 params: &CompressionFieldParams,
269 data: &VariableWidthBlock,
270 ) -> Result<Box<dyn MiniBlockCompressor>> {
271 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
272 return Err(Error::invalid_input(
273 format!(
274 "Variable width compression not supported for {} bit offsets",
275 data.bits_per_offset
276 ),
277 location!(),
278 ));
279 }
280
281 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
283 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
284
285 if params.compression.as_deref() == Some("none") {
287 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
288 }
289
290 if params.compression.as_deref() == Some("fsst") {
292 return Ok(Box::new(FsstMiniBlockEncoder::default()));
293 }
294
295 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
297 >= FSST_LEAST_INPUT_MAX_LENGTH
298 && data_size >= FSST_LEAST_INPUT_SIZE as u64
299 {
300 Box::new(FsstMiniBlockEncoder::default())
301 } else {
302 Box::new(BinaryMiniBlockEncoder::default())
303 };
304
305 if let Some(compression_scheme) = ¶ms.compression {
307 if compression_scheme != "none" && compression_scheme != "fsst" {
308 let scheme: CompressionScheme = compression_scheme.parse()?;
309 let config = CompressionConfig::new(scheme, params.compression_level);
310 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
311 }
312 }
313
314 Ok(base_encoder)
315 }
316}
317
318impl CompressionStrategy for DefaultCompressionStrategy {
319 fn create_miniblock_compressor(
320 &self,
321 field: &Field,
322 data: &DataBlock,
323 ) -> Result<Box<dyn MiniBlockCompressor>> {
324 let mut field_params = self
325 .params
326 .get_field_params(&field.name, &field.data_type());
327
328 let metadata_params = Self::parse_field_metadata(field);
330 field_params.merge(&metadata_params);
331
332 match data {
333 DataBlock::FixedWidth(fixed_width_data) => {
334 self.build_fixed_width_compressor(&field_params, fixed_width_data)
335 }
336 DataBlock::VariableWidth(variable_width_data) => {
337 self.build_variable_width_compressor(&field_params, variable_width_data)
338 }
339 DataBlock::Struct(struct_data_block) => {
340 if struct_data_block
343 .children
344 .iter()
345 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
346 {
347 panic!("packed struct encoding currently only supports fixed-width fields.")
348 }
349 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
350 }
351 DataBlock::FixedSizeList(_) => {
352 Ok(Box::new(ValueEncoder::default()))
360 }
361 _ => Err(Error::NotSupported {
362 source: format!(
363 "Mini-block compression not yet supported for block type {}",
364 data.name()
365 )
366 .into(),
367 location: location!(),
368 }),
369 }
370 }
371
372 fn create_per_value(
373 &self,
374 field: &Field,
375 data: &DataBlock,
376 ) -> Result<Box<dyn PerValueCompressor>> {
377 let field_params = Self::parse_field_metadata(field);
378
379 match data {
380 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
381 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
382 DataBlock::VariableWidth(variable_width) => {
383 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
384 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
385
386 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
390 return Ok(Box::new(CompressedBufferEncoder::default()));
391 }
392
393 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
394 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
395 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
396
397 let variable_compression = Box::new(VariableEncoder::default());
398
399 if field_params.compression.as_deref() == Some("fsst")
401 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
402 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
403 {
404 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
405 } else {
406 Ok(variable_compression)
407 }
408 } else {
409 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
410 }
411 }
412 _ => unreachable!(
413 "Per-value compression not yet supported for block type: {}",
414 data.name()
415 ),
416 }
417 }
418
419 fn create_block_compressor(
420 &self,
421 _field: &Field,
422 data: &DataBlock,
423 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
424 match data {
426 DataBlock::FixedWidth(fixed_width) => {
429 let encoder = Box::new(ValueEncoder::default());
430 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
431 Ok((encoder, encoding))
432 }
433 DataBlock::VariableWidth(variable_width) => {
434 let encoder = Box::new(VariableEncoder::default());
435 let encoding = ProtobufUtils21::variable(
436 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
437 None,
438 );
439 Ok((encoder, encoding))
440 }
441 _ => unreachable!(),
442 }
443 }
444}
445
446pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
447 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
448}
449
450pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
451 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
453 fn bits_per_value(&self) -> u64;
457}
458
459pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
460 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
462}
463
464pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
465 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
466}
467
468pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
469 fn create_miniblock_decompressor(
470 &self,
471 description: &CompressiveEncoding,
472 decompression_strategy: &dyn DecompressionStrategy,
473 ) -> Result<Box<dyn MiniBlockDecompressor>>;
474
475 fn create_fixed_per_value_decompressor(
476 &self,
477 description: &CompressiveEncoding,
478 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
479
480 fn create_variable_per_value_decompressor(
481 &self,
482 description: &CompressiveEncoding,
483 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
484
485 fn create_block_decompressor(
486 &self,
487 description: &CompressiveEncoding,
488 ) -> Result<Box<dyn BlockDecompressor>>;
489}
490
491#[derive(Debug, Default)]
492pub struct DefaultDecompressionStrategy {}
493
494impl DecompressionStrategy for DefaultDecompressionStrategy {
495 fn create_miniblock_decompressor(
496 &self,
497 description: &CompressiveEncoding,
498 decompression_strategy: &dyn DecompressionStrategy,
499 ) -> Result<Box<dyn MiniBlockDecompressor>> {
500 match description.compression.as_ref().unwrap() {
501 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
502 Compression::InlineBitpacking(description) => {
503 Ok(Box::new(InlineBitpacking::from_description(description)))
504 }
505 Compression::Variable(variable) => {
506 let Compression::Flat(offsets) = variable
507 .offsets
508 .as_ref()
509 .unwrap()
510 .compression
511 .as_ref()
512 .unwrap()
513 else {
514 panic!("Variable compression only supports flat offsets")
515 };
516 Ok(Box::new(BinaryMiniBlockDecompressor::new(
517 offsets.bits_per_value as u8,
518 )))
519 }
520 Compression::Fsst(description) => {
521 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
522 description.values.as_ref().unwrap(),
523 decompression_strategy,
524 )?;
525 Ok(Box::new(FsstMiniBlockDecompressor::new(
526 description,
527 inner_decompressor,
528 )))
529 }
530 Compression::PackedStruct(description) => Ok(Box::new(
531 PackedStructFixedWidthMiniBlockDecompressor::new(description),
532 )),
533 Compression::FixedSizeList(fsl) => {
534 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
537 }
538 Compression::Rle(rle) => {
539 let Compression::Flat(values) =
540 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
541 else {
542 panic!("RLE compression only supports flat values")
543 };
544 let Compression::Flat(run_lengths) = rle
545 .run_lengths
546 .as_ref()
547 .unwrap()
548 .compression
549 .as_ref()
550 .unwrap()
551 else {
552 panic!("RLE compression only supports flat run lengths")
553 };
554 assert_eq!(
555 run_lengths.bits_per_value, 8,
556 "RLE compression only supports 8-bit run lengths"
557 );
558 Ok(Box::new(RleMiniBlockDecompressor::new(
559 values.bits_per_value,
560 )))
561 }
562 Compression::ByteStreamSplit(bss) => {
563 let Compression::Flat(values) =
564 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
565 else {
566 panic!("ByteStreamSplit compression only supports flat values")
567 };
568 Ok(Box::new(ByteStreamSplitDecompressor::new(
569 values.bits_per_value as usize,
570 )))
571 }
572 Compression::General(general) => {
573 let inner_decompressor = self.create_miniblock_decompressor(
575 general.values.as_ref().ok_or_else(|| {
576 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
577 })?,
578 decompression_strategy,
579 )?;
580
581 let compression = general.compression.as_ref().ok_or_else(|| {
583 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
584 })?;
585
586 let scheme = compression.scheme().try_into()?;
587
588 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
589 scheme,
590 compression.level,
591 );
592
593 Ok(Box::new(GeneralMiniBlockDecompressor::new(
594 inner_decompressor,
595 compression_config,
596 )))
597 }
598 _ => todo!(),
599 }
600 }
601
602 fn create_fixed_per_value_decompressor(
603 &self,
604 description: &CompressiveEncoding,
605 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
606 match description.compression.as_ref().unwrap() {
607 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
608 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
609 _ => todo!("fixed-per-value decompressor for {:?}", description),
610 }
611 }
612
613 fn create_variable_per_value_decompressor(
614 &self,
615 description: &CompressiveEncoding,
616 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
617 match description.compression.as_ref().unwrap() {
618 Compression::Variable(variable) => {
619 let Compression::Flat(offsets) = variable
620 .offsets
621 .as_ref()
622 .unwrap()
623 .compression
624 .as_ref()
625 .unwrap()
626 else {
627 panic!("Variable compression only supports flat offsets")
628 };
629 assert!(offsets.bits_per_value < u8::MAX as u64);
630 Ok(Box::new(VariableDecoder::default()))
631 }
632 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
633 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
634 Box::new(VariableDecoder::default()),
635 ))),
636 Compression::General(ref general) => {
637 Ok(Box::new(CompressedBufferEncoder::from_scheme(
638 general.compression.as_ref().expect_ok()?.scheme(),
639 )?))
640 }
641 _ => todo!("variable-per-value decompressor for {:?}", description),
642 }
643 }
644
645 fn create_block_decompressor(
646 &self,
647 description: &CompressiveEncoding,
648 ) -> Result<Box<dyn BlockDecompressor>> {
649 match description.compression.as_ref().unwrap() {
650 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
651 Compression::Constant(constant) => {
652 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
653 Ok(Box::new(ConstantDecompressor::new(scalar)))
654 }
655 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
656 _ => todo!(),
657 }
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use crate::buffer::LanceBuffer;
665 use crate::data::{BlockInfo, DataBlock};
666 use arrow::datatypes::{DataType, Field as ArrowField};
667 use std::collections::HashMap;
668
669 fn create_test_field(name: &str, data_type: DataType) -> Field {
670 let arrow_field = ArrowField::new(name, data_type, true);
671 let mut field = Field::try_from(&arrow_field).unwrap();
672 field.id = -1;
673 field
674 }
675
676 fn create_fixed_width_block_with_stats(
677 bits_per_value: u64,
678 num_values: u64,
679 run_count: u64,
680 ) -> DataBlock {
681 let bytes_per_value = (bits_per_value / 8) as usize;
683 let total_bytes = bytes_per_value * num_values as usize;
684 let mut data = vec![0u8; total_bytes];
685
686 let values_per_run = (num_values / run_count).max(1);
688 let mut run_value = 0u8;
689
690 for i in 0..num_values as usize {
691 if i % values_per_run as usize == 0 {
692 run_value = run_value.wrapping_add(17); }
694 for j in 0..bytes_per_value {
696 let byte_offset = i * bytes_per_value + j;
697 if byte_offset < data.len() {
698 data[byte_offset] = run_value.wrapping_add(j as u8);
699 }
700 }
701 }
702
703 let mut block = FixedWidthDataBlock {
704 bits_per_value,
705 data: LanceBuffer::reinterpret_vec(data),
706 num_values,
707 block_info: BlockInfo::default(),
708 };
709
710 use crate::statistics::ComputeStat;
712 block.compute_stat();
713
714 DataBlock::FixedWidth(block)
715 }
716
717 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
718 let bytes_per_value = (bits_per_value / 8) as usize;
720 let total_bytes = bytes_per_value * num_values as usize;
721 let mut data = vec![0u8; total_bytes];
722
723 for i in 0..num_values as usize {
725 let byte_offset = i * bytes_per_value;
726 if byte_offset < data.len() {
727 data[byte_offset] = (i % 256) as u8;
728 }
729 }
730
731 let mut block = FixedWidthDataBlock {
732 bits_per_value,
733 data: LanceBuffer::reinterpret_vec(data),
734 num_values,
735 block_info: BlockInfo::default(),
736 };
737
738 use crate::statistics::ComputeStat;
740 block.compute_stat();
741
742 DataBlock::FixedWidth(block)
743 }
744
745 #[test]
746 fn test_parameter_based_compression() {
747 let mut params = CompressionParams::new();
748
749 params.columns.insert(
751 "*_id".to_string(),
752 CompressionFieldParams {
753 rle_threshold: Some(0.3),
754 compression: Some("lz4".to_string()),
755 compression_level: None,
756 bss: Some(BssMode::Off), },
758 );
759
760 let strategy = DefaultCompressionStrategy::with_params(params);
761 let field = create_test_field("user_id", DataType::Int32);
762
763 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
768 let debug_str = format!("{:?}", compressor);
770
771 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
773 assert!(debug_str.contains("RleMiniBlockEncoder"));
774 }
775
776 #[test]
777 fn test_type_level_parameters() {
778 let mut params = CompressionParams::new();
779
780 params.types.insert(
782 "Int32".to_string(),
783 CompressionFieldParams {
784 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
786 compression_level: Some(3),
787 bss: Some(BssMode::Off), },
789 );
790
791 let strategy = DefaultCompressionStrategy::with_params(params);
792 let field = create_test_field("some_column", DataType::Int32);
793 let data = create_fixed_width_block_with_stats(32, 1000, 50);
795
796 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
797 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
799 }
800
801 #[test]
802 fn test_none_compression() {
803 let mut params = CompressionParams::new();
804
805 params.columns.insert(
807 "embeddings".to_string(),
808 CompressionFieldParams {
809 compression: Some("none".to_string()),
810 ..Default::default()
811 },
812 );
813
814 let strategy = DefaultCompressionStrategy::with_params(params);
815 let field = create_test_field("embeddings", DataType::Float32);
816 let data = create_fixed_width_block(32, 1000);
817
818 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
819 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
821 }
822
823 #[test]
824 fn test_parameter_merge_priority() {
825 let mut params = CompressionParams::new();
826
827 params.types.insert(
829 "Int32".to_string(),
830 CompressionFieldParams {
831 rle_threshold: Some(0.5),
832 compression: Some("lz4".to_string()),
833 ..Default::default()
834 },
835 );
836
837 params.columns.insert(
839 "user_id".to_string(),
840 CompressionFieldParams {
841 rle_threshold: Some(0.2),
842 compression: Some("zstd".to_string()),
843 compression_level: Some(6),
844 bss: None,
845 },
846 );
847
848 let strategy = DefaultCompressionStrategy::with_params(params);
849
850 let merged = strategy
852 .params
853 .get_field_params("user_id", &DataType::Int32);
854
855 assert_eq!(merged.rle_threshold, Some(0.2));
857 assert_eq!(merged.compression, Some("zstd".to_string()));
858 assert_eq!(merged.compression_level, Some(6));
859
860 let merged = strategy
862 .params
863 .get_field_params("other_field", &DataType::Int32);
864 assert_eq!(merged.rle_threshold, Some(0.5));
865 assert_eq!(merged.compression, Some("lz4".to_string()));
866 assert_eq!(merged.compression_level, None);
867 }
868
869 #[test]
870 fn test_pattern_matching() {
871 let mut params = CompressionParams::new();
872
873 params.columns.insert(
875 "log_*".to_string(),
876 CompressionFieldParams {
877 compression: Some("zstd".to_string()),
878 compression_level: Some(6),
879 ..Default::default()
880 },
881 );
882
883 let strategy = DefaultCompressionStrategy::with_params(params);
884
885 let merged = strategy
887 .params
888 .get_field_params("log_messages", &DataType::Utf8);
889 assert_eq!(merged.compression, Some("zstd".to_string()));
890 assert_eq!(merged.compression_level, Some(6));
891
892 let merged = strategy
894 .params
895 .get_field_params("messages_log", &DataType::Utf8);
896 assert_eq!(merged.compression, None);
897 }
898
899 #[test]
900 fn test_legacy_metadata_support() {
901 let params = CompressionParams::new();
902 let strategy = DefaultCompressionStrategy::with_params(params);
903
904 let mut metadata = HashMap::new();
906 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
907 let mut field = create_test_field("some_column", DataType::Int32);
908 field.metadata = metadata;
909
910 let data = create_fixed_width_block(32, 1000);
911 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
912
913 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
915 }
916
917 #[test]
918 fn test_default_behavior() {
919 let params = CompressionParams::new();
921 let strategy = DefaultCompressionStrategy::with_params(params);
922
923 let field = create_test_field("random_column", DataType::Int32);
924 let data = create_fixed_width_block_with_stats(32, 1000, 600);
926
927 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
928 let debug_str = format!("{:?}", compressor);
930 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
931 }
932
933 #[test]
934 fn test_field_metadata_compression() {
935 let params = CompressionParams::new();
936 let strategy = DefaultCompressionStrategy::with_params(params);
937
938 let mut metadata = HashMap::new();
940 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
941 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
942 let mut field = create_test_field("test_column", DataType::Int32);
943 field.metadata = metadata;
944
945 let data = create_fixed_width_block(32, 1000);
946 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
947
948 let debug_str = format!("{:?}", compressor);
950 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
951 }
952
953 #[test]
954 fn test_field_metadata_rle_threshold() {
955 let params = CompressionParams::new();
956 let strategy = DefaultCompressionStrategy::with_params(params);
957
958 let mut metadata = HashMap::new();
960 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
961 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
963 field.metadata = metadata;
964
965 let data = create_fixed_width_block_with_stats(32, 1000, 100);
968
969 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
970
971 let debug_str = format!("{:?}", compressor);
973 assert!(debug_str.contains("RleMiniBlockEncoder"));
974 }
975
976 #[test]
977 fn test_field_metadata_override_params() {
978 let mut params = CompressionParams::new();
980 params.columns.insert(
981 "test_column".to_string(),
982 CompressionFieldParams {
983 rle_threshold: Some(0.3),
984 compression: Some("lz4".to_string()),
985 compression_level: None,
986 bss: None,
987 },
988 );
989
990 let strategy = DefaultCompressionStrategy::with_params(params);
991
992 let mut metadata = HashMap::new();
994 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
995 let mut field = create_test_field("test_column", DataType::Int32);
996 field.metadata = metadata;
997
998 let data = create_fixed_width_block(32, 1000);
999 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1000
1001 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1003 }
1004
1005 #[test]
1006 fn test_field_metadata_mixed_configuration() {
1007 let mut params = CompressionParams::new();
1009 params.types.insert(
1010 "Int32".to_string(),
1011 CompressionFieldParams {
1012 rle_threshold: Some(0.5),
1013 compression: Some("lz4".to_string()),
1014 ..Default::default()
1015 },
1016 );
1017
1018 let strategy = DefaultCompressionStrategy::with_params(params);
1019
1020 let mut metadata = HashMap::new();
1022 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1023 let mut field = create_test_field("test_column", DataType::Int32);
1024 field.metadata = metadata;
1025
1026 let data = create_fixed_width_block(32, 1000);
1027 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1028
1029 let debug_str = format!("{:?}", compressor);
1031 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1032 }
1033
1034 #[test]
1035 fn test_bss_field_metadata() {
1036 let params = CompressionParams::new();
1037 let strategy = DefaultCompressionStrategy::with_params(params);
1038
1039 let mut metadata = HashMap::new();
1041 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1042 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1043 let arrow_field =
1044 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1045 let field = Field::try_from(&arrow_field).unwrap();
1046
1047 let data = create_fixed_width_block(32, 100);
1049
1050 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1051 let debug_str = format!("{:?}", compressor);
1052 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1053 }
1054
1055 #[test]
1056 fn test_bss_with_compression() {
1057 let params = CompressionParams::new();
1058 let strategy = DefaultCompressionStrategy::with_params(params);
1059
1060 let mut metadata = HashMap::new();
1062 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1063 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1064 let arrow_field =
1065 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1066 let field = Field::try_from(&arrow_field).unwrap();
1067
1068 let data = create_fixed_width_block(64, 100);
1070
1071 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1072 let debug_str = format!("{:?}", compressor);
1073 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1075 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1076 }
1077}