1#[cfg(feature = "bitpacking")]
20use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
21use crate::{
22 buffer::LanceBuffer,
23 compression_config::{BssMode, CompressionFieldParams, CompressionParams},
24 constants::{
25 BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
26 },
27 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
28 encodings::{
29 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
30 physical::{
31 binary::{
32 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
33 VariableDecoder, VariableEncoder,
34 },
35 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
36 byte_stream_split::{
37 should_use_bss, ByteStreamSplitDecompressor, ByteStreamSplitEncoder,
38 },
39 constant::ConstantDecompressor,
40 fsst::{
41 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
42 FsstPerValueEncoder,
43 },
44 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
45 packed::{
46 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
47 },
48 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
49 value::{ValueDecompressor, ValueEncoder},
50 },
51 },
52 format::{
53 pb21::{compressive_encoding::Compression, CompressiveEncoding},
54 ProtobufUtils21,
55 },
56 statistics::{GetStat, Stat},
57};
58
59use arrow_array::{cast::AsArray, types::UInt64Type};
60use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
61use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
62use snafu::location;
63use std::str::FromStr;
64
65const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
68
69pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
82 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
87}
88
89pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
102 fn create_block_compressor(
104 &self,
105 field: &Field,
106 data: &DataBlock,
107 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
108
109 fn create_per_value(
111 &self,
112 field: &Field,
113 data: &DataBlock,
114 ) -> Result<Box<dyn PerValueCompressor>>;
115
116 fn create_miniblock_compressor(
118 &self,
119 field: &Field,
120 data: &DataBlock,
121 ) -> Result<Box<dyn MiniBlockCompressor>>;
122}
123
124#[derive(Debug, Default)]
125pub struct DefaultCompressionStrategy {
126 params: CompressionParams,
128}
129
130fn try_bss_for_mini_block(
131 data: &FixedWidthDataBlock,
132 params: &CompressionFieldParams,
133) -> Option<Box<dyn MiniBlockCompressor>> {
134 if params.compression.is_none() || params.compression.as_deref() == Some("none") {
137 return None;
138 }
139
140 let mode = params.bss.unwrap_or(BssMode::Auto);
141 if should_use_bss(data, mode) {
143 return Some(Box::new(ByteStreamSplitEncoder::new(
144 data.bits_per_value as usize,
145 )));
146 }
147 None
148}
149
150fn try_rle_for_mini_block(
151 data: &FixedWidthDataBlock,
152 params: &CompressionFieldParams,
153) -> Option<Box<dyn MiniBlockCompressor>> {
154 let bits = data.bits_per_value;
155 if !matches!(bits, 8 | 16 | 32 | 64) {
156 return None;
157 }
158
159 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
160 let threshold = params
161 .rle_threshold
162 .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
163
164 if (run_count as f64) < (data.num_values as f64) * threshold {
165 return Some(Box::new(RleMiniBlockEncoder::new()));
166 }
167 None
168}
169
170fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
171 #[cfg(feature = "bitpacking")]
172 {
173 use arrow_array::cast::AsArray;
174
175 let bits = _data.bits_per_value;
176 if !matches!(bits, 8 | 16 | 32 | 64) {
177 return None;
178 }
179
180 let bit_widths = _data.expect_stat(Stat::BitWidth);
181 let widths = bit_widths.as_primitive::<UInt64Type>();
182 let too_small = widths.len() == 1
183 && InlineBitpacking::min_size_bytes(widths.value(0)) >= _data.data_size();
184
185 if !too_small {
186 return Some(Box::new(InlineBitpacking::new(bits)));
187 }
188 None
189 }
190 #[cfg(not(feature = "bitpacking"))]
191 {
192 None
193 }
194}
195
196fn try_bitpack_for_block(
197 data: &FixedWidthDataBlock,
198) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
199 let bits = data.bits_per_value;
200 if !matches!(bits, 8 | 16 | 32 | 64) {
201 return None;
202 }
203
204 let bit_widths = data.expect_stat(Stat::BitWidth);
205 let widths = bit_widths.as_primitive::<UInt64Type>();
206 let has_all_zeros = widths.values().contains(&0);
207 let max_bit_width = *widths.values().iter().max().unwrap();
208
209 let too_small =
210 widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
211
212 if has_all_zeros || too_small {
213 return None;
214 }
215
216 if data.num_values <= 1024 {
217 let compressor = Box::new(InlineBitpacking::new(bits));
218 let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
219 Some((compressor, encoding))
220 } else {
221 let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
222 let encoding = ProtobufUtils21::out_of_line_bitpacking(
223 bits,
224 ProtobufUtils21::flat(max_bit_width, None),
225 );
226 Some((compressor, encoding))
227 }
228}
229
230fn maybe_wrap_general_for_mini_block(
231 inner: Box<dyn MiniBlockCompressor>,
232 params: &CompressionFieldParams,
233) -> Result<Box<dyn MiniBlockCompressor>> {
234 match params.compression.as_deref() {
235 None | Some("none") | Some("fsst") => Ok(inner),
236 Some(raw) => {
237 let scheme = CompressionScheme::from_str(raw).map_err(|_| {
238 lance_core::Error::invalid_input(
239 format!("Unknown compression scheme: {raw}"),
240 location!(),
241 )
242 })?;
243 let cfg = CompressionConfig::new(scheme, params.compression_level);
244 Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
245 }
246 }
247}
248
249impl DefaultCompressionStrategy {
250 pub fn new() -> Self {
252 Self::default()
253 }
254
255 pub fn with_params(params: CompressionParams) -> Self {
257 Self { params }
258 }
259
260 fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
262 let mut params = CompressionFieldParams::default();
263
264 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
266 params.compression = Some(compression.clone());
267 }
268
269 if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
271 params.compression_level = level.parse().ok();
272 }
273
274 if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
276 params.rle_threshold = threshold.parse().ok();
277 }
278
279 if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
281 match BssMode::parse(bss_str) {
282 Some(mode) => params.bss = Some(mode),
283 None => {
284 log::warn!("Invalid BSS mode '{}', using default", bss_str);
285 }
286 }
287 }
288
289 params
290 }
291
292 fn build_fixed_width_compressor(
293 &self,
294 params: &CompressionFieldParams,
295 data: &FixedWidthDataBlock,
296 ) -> Result<Box<dyn MiniBlockCompressor>> {
297 if params.compression.as_deref() == Some("none") {
298 return Ok(Box::new(ValueEncoder::default()));
299 }
300
301 let base = try_bss_for_mini_block(data, params)
302 .or_else(|| try_rle_for_mini_block(data, params))
303 .or_else(|| try_bitpack_for_mini_block(data))
304 .unwrap_or_else(|| Box::new(ValueEncoder::default()));
305
306 maybe_wrap_general_for_mini_block(base, params)
307 }
308
309 fn build_variable_width_compressor(
311 &self,
312 params: &CompressionFieldParams,
313 data: &VariableWidthBlock,
314 ) -> Result<Box<dyn MiniBlockCompressor>> {
315 if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
316 return Err(Error::invalid_input(
317 format!(
318 "Variable width compression not supported for {} bit offsets",
319 data.bits_per_offset
320 ),
321 location!(),
322 ));
323 }
324
325 let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
327 let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
328
329 if params.compression.as_deref() == Some("none") {
331 return Ok(Box::new(BinaryMiniBlockEncoder::default()));
332 }
333
334 if params.compression.as_deref() == Some("fsst") {
336 return Ok(Box::new(FsstMiniBlockEncoder::default()));
337 }
338
339 let mut base_encoder: Box<dyn MiniBlockCompressor> = if max_len
341 >= FSST_LEAST_INPUT_MAX_LENGTH
342 && data_size >= FSST_LEAST_INPUT_SIZE as u64
343 {
344 Box::new(FsstMiniBlockEncoder::default())
345 } else {
346 Box::new(BinaryMiniBlockEncoder::default())
347 };
348
349 if let Some(compression_scheme) = ¶ms.compression {
351 if compression_scheme != "none" && compression_scheme != "fsst" {
352 let scheme: CompressionScheme = compression_scheme.parse()?;
353 let config = CompressionConfig::new(scheme, params.compression_level);
354 base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
355 }
356 }
357
358 Ok(base_encoder)
359 }
360}
361
362impl CompressionStrategy for DefaultCompressionStrategy {
363 fn create_miniblock_compressor(
364 &self,
365 field: &Field,
366 data: &DataBlock,
367 ) -> Result<Box<dyn MiniBlockCompressor>> {
368 let mut field_params = self
369 .params
370 .get_field_params(&field.name, &field.data_type());
371
372 let metadata_params = Self::parse_field_metadata(field);
374 field_params.merge(&metadata_params);
375
376 match data {
377 DataBlock::FixedWidth(fixed_width_data) => {
378 self.build_fixed_width_compressor(&field_params, fixed_width_data)
379 }
380 DataBlock::VariableWidth(variable_width_data) => {
381 self.build_variable_width_compressor(&field_params, variable_width_data)
382 }
383 DataBlock::Struct(struct_data_block) => {
384 if struct_data_block
387 .children
388 .iter()
389 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
390 {
391 panic!("packed struct encoding currently only supports fixed-width fields.")
392 }
393 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
394 }
395 DataBlock::FixedSizeList(_) => {
396 Ok(Box::new(ValueEncoder::default()))
404 }
405 _ => Err(Error::NotSupported {
406 source: format!(
407 "Mini-block compression not yet supported for block type {}",
408 data.name()
409 )
410 .into(),
411 location: location!(),
412 }),
413 }
414 }
415
416 fn create_per_value(
417 &self,
418 field: &Field,
419 data: &DataBlock,
420 ) -> Result<Box<dyn PerValueCompressor>> {
421 let field_params = Self::parse_field_metadata(field);
422
423 match data {
424 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
425 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
426 DataBlock::VariableWidth(variable_width) => {
427 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
428 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
429
430 let per_value_requested =
435 if let Some(compression) = field_params.compression.as_deref() {
436 compression != "none" && compression != "fsst"
437 } else {
438 false
439 };
440
441 if (max_len > 32 * 1024 || per_value_requested)
442 && data_size >= FSST_LEAST_INPUT_SIZE as u64
443 {
444 return Ok(Box::new(CompressedBufferEncoder::default()));
445 }
446
447 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
448 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
449 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
450
451 let variable_compression = Box::new(VariableEncoder::default());
452
453 if field_params.compression.as_deref() == Some("fsst")
455 || (max_len >= FSST_LEAST_INPUT_MAX_LENGTH
456 && data_size >= FSST_LEAST_INPUT_SIZE as u64)
457 {
458 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
459 } else {
460 Ok(variable_compression)
461 }
462 } else {
463 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
464 }
465 }
466 _ => unreachable!(
467 "Per-value compression not yet supported for block type: {}",
468 data.name()
469 ),
470 }
471 }
472
473 fn create_block_compressor(
474 &self,
475 field: &Field,
476 data: &DataBlock,
477 ) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
478 match data {
479 DataBlock::FixedWidth(fixed_width) => {
482 if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
483 return Ok((compressor, encoding));
484 }
485
486 let encoder = Box::new(ValueEncoder::default());
488 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
489 Ok((encoder, encoding))
490 }
491 DataBlock::VariableWidth(variable_width) => {
492 let encoder = Box::new(VariableEncoder::default());
493 let encoding = ProtobufUtils21::variable(
494 ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
495 None,
496 );
497 Ok((encoder, encoding))
498 }
499 _ => todo!(
500 "block compressor for field {:?} and block type {:?}",
501 field,
502 data.name()
503 ),
504 }
505 }
506}
507
508pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
509 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
510}
511
512pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
513 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
515 fn bits_per_value(&self) -> u64;
519}
520
521pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
522 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
524}
525
526pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
527 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
528}
529
530pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
531 fn create_miniblock_decompressor(
532 &self,
533 description: &CompressiveEncoding,
534 decompression_strategy: &dyn DecompressionStrategy,
535 ) -> Result<Box<dyn MiniBlockDecompressor>>;
536
537 fn create_fixed_per_value_decompressor(
538 &self,
539 description: &CompressiveEncoding,
540 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
541
542 fn create_variable_per_value_decompressor(
543 &self,
544 description: &CompressiveEncoding,
545 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
546
547 fn create_block_decompressor(
548 &self,
549 description: &CompressiveEncoding,
550 ) -> Result<Box<dyn BlockDecompressor>>;
551}
552
553#[derive(Debug, Default)]
554pub struct DefaultDecompressionStrategy {}
555
556impl DecompressionStrategy for DefaultDecompressionStrategy {
557 fn create_miniblock_decompressor(
558 &self,
559 description: &CompressiveEncoding,
560 decompression_strategy: &dyn DecompressionStrategy,
561 ) -> Result<Box<dyn MiniBlockDecompressor>> {
562 match description.compression.as_ref().unwrap() {
563 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
564 #[cfg(feature = "bitpacking")]
565 Compression::InlineBitpacking(description) => {
566 Ok(Box::new(InlineBitpacking::from_description(description)))
567 }
568 #[cfg(not(feature = "bitpacking"))]
569 Compression::InlineBitpacking(_) => Err(Error::NotSupported {
570 source: "this runtime was not built with bitpacking support".into(),
571 location: location!(),
572 }),
573 Compression::Variable(variable) => {
574 let Compression::Flat(offsets) = variable
575 .offsets
576 .as_ref()
577 .unwrap()
578 .compression
579 .as_ref()
580 .unwrap()
581 else {
582 panic!("Variable compression only supports flat offsets")
583 };
584 Ok(Box::new(BinaryMiniBlockDecompressor::new(
585 offsets.bits_per_value as u8,
586 )))
587 }
588 Compression::Fsst(description) => {
589 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
590 description.values.as_ref().unwrap(),
591 decompression_strategy,
592 )?;
593 Ok(Box::new(FsstMiniBlockDecompressor::new(
594 description,
595 inner_decompressor,
596 )))
597 }
598 Compression::PackedStruct(description) => Ok(Box::new(
599 PackedStructFixedWidthMiniBlockDecompressor::new(description),
600 )),
601 Compression::FixedSizeList(fsl) => {
602 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
605 }
606 Compression::Rle(rle) => {
607 let Compression::Flat(values) =
608 rle.values.as_ref().unwrap().compression.as_ref().unwrap()
609 else {
610 panic!("RLE compression only supports flat values")
611 };
612 let Compression::Flat(run_lengths) = rle
613 .run_lengths
614 .as_ref()
615 .unwrap()
616 .compression
617 .as_ref()
618 .unwrap()
619 else {
620 panic!("RLE compression only supports flat run lengths")
621 };
622 assert_eq!(
623 run_lengths.bits_per_value, 8,
624 "RLE compression only supports 8-bit run lengths"
625 );
626 Ok(Box::new(RleMiniBlockDecompressor::new(
627 values.bits_per_value,
628 )))
629 }
630 Compression::ByteStreamSplit(bss) => {
631 let Compression::Flat(values) =
632 bss.values.as_ref().unwrap().compression.as_ref().unwrap()
633 else {
634 panic!("ByteStreamSplit compression only supports flat values")
635 };
636 Ok(Box::new(ByteStreamSplitDecompressor::new(
637 values.bits_per_value as usize,
638 )))
639 }
640 Compression::General(general) => {
641 let inner_decompressor = self.create_miniblock_decompressor(
643 general.values.as_ref().ok_or_else(|| {
644 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
645 })?,
646 decompression_strategy,
647 )?;
648
649 let compression = general.compression.as_ref().ok_or_else(|| {
651 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
652 })?;
653
654 let scheme = compression.scheme().try_into()?;
655
656 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
657 scheme,
658 compression.level,
659 );
660
661 Ok(Box::new(GeneralMiniBlockDecompressor::new(
662 inner_decompressor,
663 compression_config,
664 )))
665 }
666 _ => todo!(),
667 }
668 }
669
670 fn create_fixed_per_value_decompressor(
671 &self,
672 description: &CompressiveEncoding,
673 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
674 match description.compression.as_ref().unwrap() {
675 Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
676 constant
677 .value
678 .as_ref()
679 .map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
680 ))),
681 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
682 Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
683 _ => todo!("fixed-per-value decompressor for {:?}", description),
684 }
685 }
686
687 fn create_variable_per_value_decompressor(
688 &self,
689 description: &CompressiveEncoding,
690 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
691 match description.compression.as_ref().unwrap() {
692 Compression::Variable(variable) => {
693 let Compression::Flat(offsets) = variable
694 .offsets
695 .as_ref()
696 .unwrap()
697 .compression
698 .as_ref()
699 .unwrap()
700 else {
701 panic!("Variable compression only supports flat offsets")
702 };
703 assert!(offsets.bits_per_value < u8::MAX as u64);
704 Ok(Box::new(VariableDecoder::default()))
705 }
706 Compression::Fsst(ref fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
707 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
708 Box::new(VariableDecoder::default()),
709 ))),
710 Compression::General(ref general) => {
711 Ok(Box::new(CompressedBufferEncoder::from_scheme(
712 general.compression.as_ref().expect_ok()?.scheme(),
713 )?))
714 }
715 _ => todo!("variable-per-value decompressor for {:?}", description),
716 }
717 }
718
719 fn create_block_decompressor(
720 &self,
721 description: &CompressiveEncoding,
722 ) -> Result<Box<dyn BlockDecompressor>> {
723 match description.compression.as_ref().unwrap() {
724 Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
725 InlineBitpacking::from_description(inline_bitpacking),
726 )),
727 Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
728 Compression::Constant(constant) => {
729 let scalar = constant
730 .value
731 .as_ref()
732 .map(|v| LanceBuffer::from_bytes(v.clone(), 1));
733 Ok(Box::new(ConstantDecompressor::new(scalar)))
734 }
735 Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
736 Compression::OutOfLineBitpacking(out_of_line) => {
737 let compressed_bit_width = match out_of_line
739 .values
740 .as_ref()
741 .unwrap()
742 .compression
743 .as_ref()
744 .unwrap()
745 {
746 Compression::Flat(flat) => flat.bits_per_value,
747 _ => {
748 return Err(Error::InvalidInput {
749 location: location!(),
750 source: "OutOfLineBitpacking values must use Flat encoding".into(),
751 })
752 }
753 };
754 Ok(Box::new(OutOfLineBitpacking::new(
755 compressed_bit_width,
756 out_of_line.uncompressed_bits_per_value,
757 )))
758 }
759 _ => todo!(),
760 }
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use crate::buffer::LanceBuffer;
768 use crate::data::{BlockInfo, DataBlock};
769 use arrow_schema::{DataType, Field as ArrowField};
770 use std::collections::HashMap;
771
772 fn create_test_field(name: &str, data_type: DataType) -> Field {
773 let arrow_field = ArrowField::new(name, data_type, true);
774 let mut field = Field::try_from(&arrow_field).unwrap();
775 field.id = -1;
776 field
777 }
778
779 fn create_fixed_width_block_with_stats(
780 bits_per_value: u64,
781 num_values: u64,
782 run_count: u64,
783 ) -> DataBlock {
784 let bytes_per_value = (bits_per_value / 8) as usize;
786 let total_bytes = bytes_per_value * num_values as usize;
787 let mut data = vec![0u8; total_bytes];
788
789 let values_per_run = (num_values / run_count).max(1);
791 let mut run_value = 0u8;
792
793 for i in 0..num_values as usize {
794 if i % values_per_run as usize == 0 {
795 run_value = run_value.wrapping_add(17); }
797 for j in 0..bytes_per_value {
799 let byte_offset = i * bytes_per_value + j;
800 if byte_offset < data.len() {
801 data[byte_offset] = run_value.wrapping_add(j as u8);
802 }
803 }
804 }
805
806 let mut block = FixedWidthDataBlock {
807 bits_per_value,
808 data: LanceBuffer::reinterpret_vec(data),
809 num_values,
810 block_info: BlockInfo::default(),
811 };
812
813 use crate::statistics::ComputeStat;
815 block.compute_stat();
816
817 DataBlock::FixedWidth(block)
818 }
819
820 fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
821 let bytes_per_value = (bits_per_value / 8) as usize;
823 let total_bytes = bytes_per_value * num_values as usize;
824 let mut data = vec![0u8; total_bytes];
825
826 for i in 0..num_values as usize {
828 let byte_offset = i * bytes_per_value;
829 if byte_offset < data.len() {
830 data[byte_offset] = (i % 256) as u8;
831 }
832 }
833
834 let mut block = FixedWidthDataBlock {
835 bits_per_value,
836 data: LanceBuffer::reinterpret_vec(data),
837 num_values,
838 block_info: BlockInfo::default(),
839 };
840
841 use crate::statistics::ComputeStat;
843 block.compute_stat();
844
845 DataBlock::FixedWidth(block)
846 }
847
848 #[test]
849 fn test_parameter_based_compression() {
850 let mut params = CompressionParams::new();
851
852 params.columns.insert(
854 "*_id".to_string(),
855 CompressionFieldParams {
856 rle_threshold: Some(0.3),
857 compression: Some("lz4".to_string()),
858 compression_level: None,
859 bss: Some(BssMode::Off), },
861 );
862
863 let strategy = DefaultCompressionStrategy::with_params(params);
864 let field = create_test_field("user_id", DataType::Int32);
865
866 let data = create_fixed_width_block_with_stats(32, 1000, 100); let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
871 let debug_str = format!("{:?}", compressor);
873
874 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
876 assert!(debug_str.contains("RleMiniBlockEncoder"));
877 }
878
879 #[test]
880 fn test_type_level_parameters() {
881 let mut params = CompressionParams::new();
882
883 params.types.insert(
885 "Int32".to_string(),
886 CompressionFieldParams {
887 rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
889 compression_level: Some(3),
890 bss: Some(BssMode::Off), },
892 );
893
894 let strategy = DefaultCompressionStrategy::with_params(params);
895 let field = create_test_field("some_column", DataType::Int32);
896 let data = create_fixed_width_block_with_stats(32, 1000, 50);
898
899 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
900 assert!(format!("{:?}", compressor).contains("RleMiniBlockEncoder"));
902 }
903
904 #[test]
905 fn test_none_compression() {
906 let mut params = CompressionParams::new();
907
908 params.columns.insert(
910 "embeddings".to_string(),
911 CompressionFieldParams {
912 compression: Some("none".to_string()),
913 ..Default::default()
914 },
915 );
916
917 let strategy = DefaultCompressionStrategy::with_params(params);
918 let field = create_test_field("embeddings", DataType::Float32);
919 let data = create_fixed_width_block(32, 1000);
920
921 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
922 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
924 }
925
926 #[test]
927 fn test_parameter_merge_priority() {
928 let mut params = CompressionParams::new();
929
930 params.types.insert(
932 "Int32".to_string(),
933 CompressionFieldParams {
934 rle_threshold: Some(0.5),
935 compression: Some("lz4".to_string()),
936 ..Default::default()
937 },
938 );
939
940 params.columns.insert(
942 "user_id".to_string(),
943 CompressionFieldParams {
944 rle_threshold: Some(0.2),
945 compression: Some("zstd".to_string()),
946 compression_level: Some(6),
947 bss: None,
948 },
949 );
950
951 let strategy = DefaultCompressionStrategy::with_params(params);
952
953 let merged = strategy
955 .params
956 .get_field_params("user_id", &DataType::Int32);
957
958 assert_eq!(merged.rle_threshold, Some(0.2));
960 assert_eq!(merged.compression, Some("zstd".to_string()));
961 assert_eq!(merged.compression_level, Some(6));
962
963 let merged = strategy
965 .params
966 .get_field_params("other_field", &DataType::Int32);
967 assert_eq!(merged.rle_threshold, Some(0.5));
968 assert_eq!(merged.compression, Some("lz4".to_string()));
969 assert_eq!(merged.compression_level, None);
970 }
971
972 #[test]
973 fn test_pattern_matching() {
974 let mut params = CompressionParams::new();
975
976 params.columns.insert(
978 "log_*".to_string(),
979 CompressionFieldParams {
980 compression: Some("zstd".to_string()),
981 compression_level: Some(6),
982 ..Default::default()
983 },
984 );
985
986 let strategy = DefaultCompressionStrategy::with_params(params);
987
988 let merged = strategy
990 .params
991 .get_field_params("log_messages", &DataType::Utf8);
992 assert_eq!(merged.compression, Some("zstd".to_string()));
993 assert_eq!(merged.compression_level, Some(6));
994
995 let merged = strategy
997 .params
998 .get_field_params("messages_log", &DataType::Utf8);
999 assert_eq!(merged.compression, None);
1000 }
1001
1002 #[test]
1003 fn test_legacy_metadata_support() {
1004 let params = CompressionParams::new();
1005 let strategy = DefaultCompressionStrategy::with_params(params);
1006
1007 let mut metadata = HashMap::new();
1009 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1010 let mut field = create_test_field("some_column", DataType::Int32);
1011 field.metadata = metadata;
1012
1013 let data = create_fixed_width_block(32, 1000);
1014 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1015
1016 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1018 }
1019
1020 #[test]
1021 fn test_default_behavior() {
1022 let params = CompressionParams::new();
1024 let strategy = DefaultCompressionStrategy::with_params(params);
1025
1026 let field = create_test_field("random_column", DataType::Int32);
1027 let data = create_fixed_width_block_with_stats(32, 1000, 600);
1029
1030 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1031 let debug_str = format!("{:?}", compressor);
1033 assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
1034 }
1035
1036 #[test]
1037 fn test_field_metadata_compression() {
1038 let params = CompressionParams::new();
1039 let strategy = DefaultCompressionStrategy::with_params(params);
1040
1041 let mut metadata = HashMap::new();
1043 metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
1044 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
1045 let mut field = create_test_field("test_column", DataType::Int32);
1046 field.metadata = metadata;
1047
1048 let data = create_fixed_width_block(32, 1000);
1049 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1050
1051 let debug_str = format!("{:?}", compressor);
1053 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1054 }
1055
1056 #[test]
1057 fn test_field_metadata_rle_threshold() {
1058 let params = CompressionParams::new();
1059 let strategy = DefaultCompressionStrategy::with_params(params);
1060
1061 let mut metadata = HashMap::new();
1063 metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
1064 metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
1066 field.metadata = metadata;
1067
1068 let data = create_fixed_width_block_with_stats(32, 1000, 100);
1071
1072 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1073
1074 let debug_str = format!("{:?}", compressor);
1076 assert!(debug_str.contains("RleMiniBlockEncoder"));
1077 }
1078
1079 #[test]
1080 fn test_field_metadata_override_params() {
1081 let mut params = CompressionParams::new();
1083 params.columns.insert(
1084 "test_column".to_string(),
1085 CompressionFieldParams {
1086 rle_threshold: Some(0.3),
1087 compression: Some("lz4".to_string()),
1088 compression_level: None,
1089 bss: None,
1090 },
1091 );
1092
1093 let strategy = DefaultCompressionStrategy::with_params(params);
1094
1095 let mut metadata = HashMap::new();
1097 metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
1098 let mut field = create_test_field("test_column", DataType::Int32);
1099 field.metadata = metadata;
1100
1101 let data = create_fixed_width_block(32, 1000);
1102 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1103
1104 assert!(format!("{:?}", compressor).contains("ValueEncoder"));
1106 }
1107
1108 #[test]
1109 fn test_field_metadata_mixed_configuration() {
1110 let mut params = CompressionParams::new();
1112 params.types.insert(
1113 "Int32".to_string(),
1114 CompressionFieldParams {
1115 rle_threshold: Some(0.5),
1116 compression: Some("lz4".to_string()),
1117 ..Default::default()
1118 },
1119 );
1120
1121 let strategy = DefaultCompressionStrategy::with_params(params);
1122
1123 let mut metadata = HashMap::new();
1125 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
1126 let mut field = create_test_field("test_column", DataType::Int32);
1127 field.metadata = metadata;
1128
1129 let data = create_fixed_width_block(32, 1000);
1130 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1131
1132 let debug_str = format!("{:?}", compressor);
1134 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1135 }
1136
1137 #[test]
1138 fn test_bss_field_metadata() {
1139 let params = CompressionParams::new();
1140 let strategy = DefaultCompressionStrategy::with_params(params);
1141
1142 let mut metadata = HashMap::new();
1144 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1145 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1146 let arrow_field =
1147 ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
1148 let field = Field::try_from(&arrow_field).unwrap();
1149
1150 let data = create_fixed_width_block(32, 100);
1152
1153 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1154 let debug_str = format!("{:?}", compressor);
1155 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1156 }
1157
1158 #[test]
1159 fn test_bss_with_compression() {
1160 let params = CompressionParams::new();
1161 let strategy = DefaultCompressionStrategy::with_params(params);
1162
1163 let mut metadata = HashMap::new();
1165 metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
1166 metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
1167 let arrow_field =
1168 ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
1169 let field = Field::try_from(&arrow_field).unwrap();
1170
1171 let data = create_fixed_width_block(64, 100);
1173
1174 let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
1175 let debug_str = format!("{:?}", compressor);
1176 assert!(debug_str.contains("GeneralMiniBlockCompressor"));
1178 assert!(debug_str.contains("ByteStreamSplitEncoder"));
1179 }
1180}