#[cfg(feature = "bitpacking")]
use crate::encodings::physical::bitpacking::{InlineBitpacking, OutOfLineBitpacking};
use crate::{
buffer::LanceBuffer,
compression_config::{BssMode, CompressionFieldParams, CompressionParams},
constants::{
BSS_META_KEY, COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY,
},
data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
encodings::{
logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
physical::{
binary::{
BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
VariableDecoder, VariableEncoder,
},
block::{
CompressedBufferEncoder, CompressionConfig, CompressionScheme,
GeneralBlockDecompressor,
},
byte_stream_split::{
ByteStreamSplitDecompressor, ByteStreamSplitEncoder, should_use_bss,
},
constant::ConstantDecompressor,
fsst::{
FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
FsstPerValueEncoder,
},
general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
packed::{
PackedStructFixedWidthMiniBlockDecompressor,
PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
VariablePackedStructFieldKind,
},
rle::{RleDecompressor, RleEncoder},
value::{ValueDecompressor, ValueEncoder},
},
},
format::{
ProtobufUtils21,
pb21::{CompressiveEncoding, compressive_encoding::Compression},
},
statistics::{GetStat, Stat},
version::LanceFileVersion,
};
use arrow_array::{cast::AsArray, types::UInt64Type};
use arrow_schema::DataType;
use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
use lance_core::{Error, Result, datatypes::Field, error::LanceOptionExt};
use std::{str::FromStr, sync::Arc};
const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
const MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION: u64 = 32 * 1024;
pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
}
pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
fn create_block_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)>;
fn create_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn PerValueCompressor>>;
fn create_miniblock_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>>;
}
#[derive(Debug, Default, Clone)]
pub struct DefaultCompressionStrategy {
params: CompressionParams,
version: LanceFileVersion,
}
fn try_bss_for_mini_block(
data: &FixedWidthDataBlock,
params: &CompressionFieldParams,
) -> Option<Box<dyn MiniBlockCompressor>> {
if params.compression.is_none() || params.compression.as_deref() == Some("none") {
return None;
}
let mode = params.bss.unwrap_or(BssMode::Auto);
if should_use_bss(data, mode) {
return Some(Box::new(ByteStreamSplitEncoder::new(
data.bits_per_value as usize,
)));
}
None
}
fn try_rle_for_mini_block(
data: &FixedWidthDataBlock,
params: &CompressionFieldParams,
) -> Option<Box<dyn MiniBlockCompressor>> {
let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
}
let type_size = bits / 8;
let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
let threshold = params
.rle_threshold
.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
let passes_threshold = match params.rle_threshold {
Some(_) => (run_count as f64) < (data.num_values as f64) * threshold,
None => true,
};
if !passes_threshold {
return None;
}
let num_values = data.num_values;
let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
let raw_bytes = (num_values as u128) * (type_size as u128);
let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
if rle_bytes < raw_bytes {
#[cfg(feature = "bitpacking")]
{
if let Some(bitpack_bytes) = estimate_inline_bitpacking_bytes(data)
&& (bitpack_bytes as u128) < rle_bytes
{
return None;
}
}
return Some(Box::new(RleEncoder::new()));
}
None
}
fn try_rle_for_block(
data: &FixedWidthDataBlock,
version: LanceFileVersion,
params: &CompressionFieldParams,
) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
if version < LanceFileVersion::V2_2 {
return None;
}
let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
}
let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
let threshold = params
.rle_threshold
.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);
if (run_count as f64) < (data.num_values as f64) * threshold {
let compressor = Box::new(RleEncoder::new());
let encoding = ProtobufUtils21::rle(
ProtobufUtils21::flat(bits, None),
ProtobufUtils21::flat( 8, None),
);
return Some((compressor, encoding));
}
None
}
fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
#[cfg(feature = "bitpacking")]
{
let bits = _data.bits_per_value;
if estimate_inline_bitpacking_bytes(_data).is_some() {
return Some(Box::new(InlineBitpacking::new(bits)));
}
None
}
#[cfg(not(feature = "bitpacking"))]
{
None
}
}
#[cfg(feature = "bitpacking")]
fn estimate_inline_bitpacking_bytes(data: &FixedWidthDataBlock) -> Option<u64> {
use arrow_array::cast::AsArray;
let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
}
if data.num_values == 0 {
return None;
}
let bit_widths = data.expect_stat(Stat::BitWidth);
let widths = bit_widths.as_primitive::<UInt64Type>();
let words_per_chunk: u128 = 1;
let word_bytes: u128 = (bits / 8) as u128;
let mut total_words: u128 = 0;
for i in 0..widths.len() {
let bit_width = widths.value(i) as u128;
let packed_words = (1024u128 * bit_width) / (bits as u128);
total_words = total_words.saturating_add(words_per_chunk.saturating_add(packed_words));
}
let estimated_bytes = total_words.saturating_mul(word_bytes);
let raw_bytes = data.data_size() as u128;
if estimated_bytes >= raw_bytes {
return None;
}
u64::try_from(estimated_bytes).ok()
}
fn try_bitpack_for_block(
data: &FixedWidthDataBlock,
) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
}
let bit_widths = data.expect_stat(Stat::BitWidth);
let widths = bit_widths.as_primitive::<UInt64Type>();
let has_all_zeros = widths.values().contains(&0);
let max_bit_width = *widths.values().iter().max().unwrap();
let too_small =
widths.len() == 1 && InlineBitpacking::min_size_bytes(widths.value(0)) >= data.data_size();
if has_all_zeros || too_small {
return None;
}
if data.num_values <= 1024 {
let compressor = Box::new(InlineBitpacking::new(bits));
let encoding = ProtobufUtils21::inline_bitpacking(bits, None);
Some((compressor, encoding))
} else {
let compressor = Box::new(OutOfLineBitpacking::new(max_bit_width, bits));
let encoding = ProtobufUtils21::out_of_line_bitpacking(
bits,
ProtobufUtils21::flat(max_bit_width, None),
);
Some((compressor, encoding))
}
}
fn maybe_wrap_general_for_mini_block(
inner: Box<dyn MiniBlockCompressor>,
params: &CompressionFieldParams,
) -> Result<Box<dyn MiniBlockCompressor>> {
match params.compression.as_deref() {
None | Some("none") | Some("fsst") => Ok(inner),
Some(raw) => {
let scheme = CompressionScheme::from_str(raw)
.map_err(|_| Error::invalid_input(format!("Unknown compression scheme: {raw}")))?;
let cfg = CompressionConfig::new(scheme, params.compression_level);
Ok(Box::new(GeneralMiniBlockCompressor::new(inner, cfg)))
}
}
}
fn try_general_compression(
version: LanceFileVersion,
field_params: &CompressionFieldParams,
data: &DataBlock,
) -> Result<Option<(Box<dyn BlockCompressor>, CompressionConfig)>> {
if field_params.compression.as_deref() == Some("none") {
return Ok(None);
}
if let Some(compression_scheme) = &field_params.compression
&& version >= LanceFileVersion::V2_2
{
let scheme: CompressionScheme = compression_scheme.parse()?;
let config = CompressionConfig::new(scheme, field_params.compression_level);
let compressor = Box::new(CompressedBufferEncoder::try_new(config)?);
return Ok(Some((compressor, config)));
}
if data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION
&& version >= LanceFileVersion::V2_2
{
let compressor = Box::new(CompressedBufferEncoder::default());
let config = compressor.compressor.config();
return Ok(Some((compressor, config)));
}
Ok(None)
}
impl DefaultCompressionStrategy {
pub fn new() -> Self {
Self::default()
}
pub fn with_params(params: CompressionParams) -> Self {
Self {
params,
version: LanceFileVersion::default(),
}
}
pub fn with_version(mut self, version: LanceFileVersion) -> Self {
self.version = version;
self
}
fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
let mut params = CompressionFieldParams::default();
if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
params.compression = Some(compression.clone());
}
if let Some(level) = field.metadata.get(COMPRESSION_LEVEL_META_KEY) {
params.compression_level = level.parse().ok();
}
if let Some(threshold) = field.metadata.get(RLE_THRESHOLD_META_KEY) {
params.rle_threshold = threshold.parse().ok();
}
if let Some(bss_str) = field.metadata.get(BSS_META_KEY) {
match BssMode::parse(bss_str) {
Some(mode) => params.bss = Some(mode),
None => {
log::warn!("Invalid BSS mode '{}', using default", bss_str);
}
}
}
if let Some(minichunk_size_str) = field
.metadata
.get(super::constants::MINICHUNK_SIZE_META_KEY)
{
if let Ok(minichunk_size) = minichunk_size_str.parse::<i64>() {
if minichunk_size >= 32 * 1024 && *version <= LanceFileVersion::V2_1 {
log::warn!(
"minichunk_size '{}' too large for version '{}', using default",
minichunk_size,
version
);
} else {
params.minichunk_size = Some(minichunk_size);
}
} else {
log::warn!("Invalid minichunk_size '{}', skipping", minichunk_size_str);
}
}
params
}
fn build_fixed_width_compressor(
&self,
params: &CompressionFieldParams,
data: &FixedWidthDataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
if params.compression.as_deref() == Some("none") {
return Ok(Box::new(ValueEncoder::default()));
}
let base = try_bss_for_mini_block(data, params)
.or_else(|| try_rle_for_mini_block(data, params))
.or_else(|| try_bitpack_for_mini_block(data))
.unwrap_or_else(|| Box::new(ValueEncoder::default()));
maybe_wrap_general_for_mini_block(base, params)
}
fn build_variable_width_compressor(
&self,
field: &Field,
data: &VariableWidthBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
let params = self.get_merged_field_params(field);
let compression = params.compression.as_deref();
if data.bits_per_offset != 32 && data.bits_per_offset != 64 {
return Err(Error::invalid_input(format!(
"Variable width compression not supported for {} bit offsets",
data.bits_per_offset
)));
}
let data_size = data.expect_single_stat::<UInt64Type>(Stat::DataSize);
let max_len = data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
if compression == Some("none") {
return Ok(Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size)));
}
let use_fsst = compression == Some("fsst")
|| (compression.is_none()
&& !matches!(field.data_type(), DataType::Binary | DataType::LargeBinary)
&& max_len >= FSST_LEAST_INPUT_MAX_LENGTH
&& data_size >= FSST_LEAST_INPUT_SIZE as u64);
let mut base_encoder: Box<dyn MiniBlockCompressor> = if use_fsst {
Box::new(FsstMiniBlockEncoder::new(params.minichunk_size))
} else {
Box::new(BinaryMiniBlockEncoder::new(params.minichunk_size))
};
if let Some(compression_scheme) = compression.filter(|scheme| *scheme != "fsst") {
let scheme: CompressionScheme = compression_scheme.parse()?;
let config = CompressionConfig::new(scheme, params.compression_level);
base_encoder = Box::new(GeneralMiniBlockCompressor::new(base_encoder, config));
}
Ok(base_encoder)
}
fn get_merged_field_params(&self, field: &Field) -> CompressionFieldParams {
let mut field_params = self
.params
.get_field_params(&field.name, &field.data_type());
let metadata_params = Self::parse_field_metadata(field, &self.version);
field_params.merge(&metadata_params);
field_params
}
}
impl CompressionStrategy for DefaultCompressionStrategy {
fn create_miniblock_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
match data {
DataBlock::FixedWidth(fixed_width_data) => {
let field_params = self.get_merged_field_params(field);
self.build_fixed_width_compressor(&field_params, fixed_width_data)
}
DataBlock::VariableWidth(variable_width_data) => {
self.build_variable_width_compressor(field, variable_width_data)
}
DataBlock::Struct(struct_data_block) => {
if struct_data_block.has_variable_width_child() {
return Err(Error::invalid_input(
"Packed struct mini-block encoding supports only fixed-width children",
));
}
Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
}
DataBlock::FixedSizeList(_) => {
Ok(Box::new(ValueEncoder::default()))
}
_ => Err(Error::not_supported_source(
format!(
"Mini-block compression not yet supported for block type {}",
data.name()
)
.into(),
)),
}
}
fn create_per_value(
&self,
field: &Field,
data: &DataBlock,
) -> Result<Box<dyn PerValueCompressor>> {
let field_params = self.get_merged_field_params(field);
match data {
DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
DataBlock::Struct(struct_block) => {
if field.children.len() != struct_block.children.len() {
return Err(Error::invalid_input(
"Struct field metadata does not match data block children",
));
}
let has_variable_child = struct_block.has_variable_width_child();
if has_variable_child {
if self.version < LanceFileVersion::V2_2 {
return Err(Error::not_supported_source("Variable packed struct encoding requires Lance file version 2.2 or later".into()));
}
Ok(Box::new(PackedStructVariablePerValueEncoder::new(
self.clone(),
field.children.clone(),
)))
} else {
Err(Error::invalid_input(
"Packed struct per-value compression should not be used for fixed-width-only structs",
))
}
}
DataBlock::VariableWidth(variable_width) => {
let compression = field_params.compression.as_deref();
if compression == Some("none") {
return Ok(Box::new(VariableEncoder::default()));
}
let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
let per_value_requested =
compression.is_some_and(|compression| compression != "fsst");
if (max_len > 32 * 1024 || per_value_requested)
&& data_size >= FSST_LEAST_INPUT_SIZE as u64
{
return Ok(Box::new(CompressedBufferEncoder::default()));
}
if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
let variable_compression = Box::new(VariableEncoder::default());
let use_fsst = compression == Some("fsst")
|| (compression.is_none()
&& !matches!(
field.data_type(),
DataType::Binary | DataType::LargeBinary
)
&& max_len >= FSST_LEAST_INPUT_MAX_LENGTH
&& data_size >= FSST_LEAST_INPUT_SIZE as u64);
if use_fsst {
Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
} else {
Ok(variable_compression)
}
} else {
panic!(
"Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.",
variable_width.bits_per_offset
);
}
}
_ => unreachable!(
"Per-value compression not yet supported for block type: {}",
data.name()
),
}
}
fn create_block_compressor(
&self,
field: &Field,
data: &DataBlock,
) -> Result<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
let field_params = self.get_merged_field_params(field);
match data {
DataBlock::FixedWidth(fixed_width) => {
if let Some((compressor, encoding)) =
try_rle_for_block(fixed_width, self.version, &field_params)
{
return Ok((compressor, encoding));
}
if let Some((compressor, encoding)) = try_bitpack_for_block(fixed_width) {
return Ok((compressor, encoding));
}
if let Some((compressor, config)) =
try_general_compression(self.version, &field_params, data)?
{
let encoding = ProtobufUtils21::wrapped(
config,
ProtobufUtils21::flat(fixed_width.bits_per_value, None),
)?;
return Ok((compressor, encoding));
}
let encoder = Box::new(ValueEncoder::default());
let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
Ok((encoder, encoding))
}
DataBlock::VariableWidth(variable_width) => {
if let Some((compressor, config)) =
try_general_compression(self.version, &field_params, data)?
{
let encoding = ProtobufUtils21::wrapped(
config,
ProtobufUtils21::variable(
ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
None,
),
)?;
return Ok((compressor, encoding));
}
let encoder = Box::new(VariableEncoder::default());
let encoding = ProtobufUtils21::variable(
ProtobufUtils21::flat(variable_width.bits_per_offset as u64, None),
None,
);
Ok((encoder, encoding))
}
_ => unreachable!(),
}
}
}
pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
}
pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
fn bits_per_value(&self) -> u64;
}
pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
}
pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
}
pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
fn create_miniblock_decompressor(
&self,
description: &CompressiveEncoding,
decompression_strategy: &dyn DecompressionStrategy,
) -> Result<Box<dyn MiniBlockDecompressor>>;
fn create_fixed_per_value_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>>;
fn create_variable_per_value_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn VariablePerValueDecompressor>>;
fn create_block_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn BlockDecompressor>>;
}
#[derive(Debug, Default)]
pub struct DefaultDecompressionStrategy {}
impl DecompressionStrategy for DefaultDecompressionStrategy {
fn create_miniblock_decompressor(
&self,
description: &CompressiveEncoding,
decompression_strategy: &dyn DecompressionStrategy,
) -> Result<Box<dyn MiniBlockDecompressor>> {
match description.compression.as_ref().unwrap() {
Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
#[cfg(feature = "bitpacking")]
Compression::InlineBitpacking(description) => {
Ok(Box::new(InlineBitpacking::from_description(description)))
}
#[cfg(not(feature = "bitpacking"))]
Compression::InlineBitpacking(_) => Err(Error::not_supported_source(
"this runtime was not built with bitpacking support".into(),
)),
Compression::Variable(variable) => {
let Compression::Flat(offsets) = variable
.offsets
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
else {
panic!("Variable compression only supports flat offsets")
};
Ok(Box::new(BinaryMiniBlockDecompressor::new(
offsets.bits_per_value as u8,
)))
}
Compression::Fsst(description) => {
let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
description.values.as_ref().unwrap(),
decompression_strategy,
)?;
Ok(Box::new(FsstMiniBlockDecompressor::new(
description,
inner_decompressor,
)))
}
Compression::PackedStruct(description) => Ok(Box::new(
PackedStructFixedWidthMiniBlockDecompressor::new(description),
)),
Compression::VariablePackedStruct(_) => Err(Error::not_supported_source(
"variable packed struct decoding is not yet implemented".into(),
)),
Compression::FixedSizeList(fsl) => {
Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
}
Compression::Rle(rle) => {
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
}
Compression::ByteStreamSplit(bss) => {
let Compression::Flat(values) =
bss.values.as_ref().unwrap().compression.as_ref().unwrap()
else {
panic!("ByteStreamSplit compression only supports flat values")
};
Ok(Box::new(ByteStreamSplitDecompressor::new(
values.bits_per_value as usize,
)))
}
Compression::General(general) => {
let inner_decompressor = self.create_miniblock_decompressor(
general.values.as_ref().ok_or_else(|| {
Error::invalid_input("GeneralMiniBlock missing inner encoding")
})?,
decompression_strategy,
)?;
let compression = general.compression.as_ref().ok_or_else(|| {
Error::invalid_input("GeneralMiniBlock missing compression config")
})?;
let scheme = compression.scheme().try_into()?;
let compression_config = CompressionConfig::new(scheme, compression.level);
Ok(Box::new(GeneralMiniBlockDecompressor::new(
inner_decompressor,
compression_config,
)))
}
_ => todo!(),
}
}
fn create_fixed_per_value_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn FixedPerValueDecompressor>> {
match description.compression.as_ref().unwrap() {
Compression::Constant(constant) => Ok(Box::new(ConstantDecompressor::new(
constant
.value
.as_ref()
.map(|v| LanceBuffer::from_bytes(v.clone(), 1)),
))),
Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
Compression::FixedSizeList(fsl) => Ok(Box::new(ValueDecompressor::from_fsl(fsl))),
_ => todo!("fixed-per-value decompressor for {:?}", description),
}
}
fn create_variable_per_value_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn VariablePerValueDecompressor>> {
match description.compression.as_ref().unwrap() {
Compression::Variable(variable) => {
let Compression::Flat(offsets) = variable
.offsets
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
else {
panic!("Variable compression only supports flat offsets")
};
assert!(offsets.bits_per_value < u8::MAX as u64);
Ok(Box::new(VariableDecoder::default()))
}
Compression::Fsst(fsst) => Ok(Box::new(FsstPerValueDecompressor::new(
LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
Box::new(VariableDecoder::default()),
))),
Compression::General(general) => Ok(Box::new(CompressedBufferEncoder::from_scheme(
general.compression.as_ref().expect_ok()?.scheme(),
)?)),
Compression::VariablePackedStruct(description) => {
let mut fields = Vec::with_capacity(description.fields.len());
for field in &description.fields {
let value_encoding = field.value.as_ref().ok_or_else(|| {
Error::invalid_input("VariablePackedStruct field is missing value encoding")
})?;
let decoder = match field.layout.as_ref().ok_or_else(|| {
Error::invalid_input("VariablePackedStruct field is missing layout details")
})? {
crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
bits_per_value,
) => {
let decompressor =
self.create_fixed_per_value_decompressor(value_encoding)?;
VariablePackedStructFieldDecoder {
kind: VariablePackedStructFieldKind::Fixed {
bits_per_value: *bits_per_value,
decompressor: Arc::from(decompressor),
},
}
}
crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
bits_per_length,
) => {
let decompressor =
self.create_variable_per_value_decompressor(value_encoding)?;
VariablePackedStructFieldDecoder {
kind: VariablePackedStructFieldKind::Variable {
bits_per_length: *bits_per_length,
decompressor: Arc::from(decompressor),
},
}
}
};
fields.push(decoder);
}
Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
fields,
)))
}
_ => todo!("variable-per-value decompressor for {:?}", description),
}
}
fn create_block_decompressor(
&self,
description: &CompressiveEncoding,
) -> Result<Box<dyn BlockDecompressor>> {
match description.compression.as_ref().unwrap() {
Compression::InlineBitpacking(inline_bitpacking) => Ok(Box::new(
InlineBitpacking::from_description(inline_bitpacking),
)),
Compression::Flat(flat) => Ok(Box::new(ValueDecompressor::from_flat(flat))),
Compression::Constant(constant) => {
let scalar = constant
.value
.as_ref()
.map(|v| LanceBuffer::from_bytes(v.clone(), 1));
Ok(Box::new(ConstantDecompressor::new(scalar)))
}
Compression::Variable(_) => Ok(Box::new(BinaryBlockDecompressor::default())),
Compression::FixedSizeList(fsl) => {
Ok(Box::new(ValueDecompressor::from_fsl(fsl.as_ref())))
}
Compression::OutOfLineBitpacking(out_of_line) => {
let compressed_bit_width = match out_of_line
.values
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
{
Compression::Flat(flat) => flat.bits_per_value,
_ => {
return Err(Error::invalid_input_source(
"OutOfLineBitpacking values must use Flat encoding".into(),
));
}
};
Ok(Box::new(OutOfLineBitpacking::new(
compressed_bit_width,
out_of_line.uncompressed_bits_per_value,
)))
}
Compression::General(general) => {
let inner_desc = general
.values
.as_ref()
.ok_or_else(|| {
Error::invalid_input("General compression missing inner encoding")
})?
.as_ref();
let inner_decompressor = self.create_block_decompressor(inner_desc)?;
let compression = general.compression.as_ref().ok_or_else(|| {
Error::invalid_input("General compression missing compression config")
})?;
let scheme = compression.scheme().try_into()?;
let config = CompressionConfig::new(scheme, compression.level);
let general_decompressor =
GeneralBlockDecompressor::try_new(inner_decompressor, config)?;
Ok(Box::new(general_decompressor))
}
Compression::Rle(rle) => {
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
}
_ => todo!(),
}
}
}
fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
let values = rle
.values
.as_ref()
.ok_or_else(|| Error::invalid_input("RLE compression missing values encoding"))?;
let run_lengths = rle
.run_lengths
.as_ref()
.ok_or_else(|| Error::invalid_input("RLE compression missing run lengths encoding"))?;
let values = values
.compression
.as_ref()
.ok_or_else(|| Error::invalid_input("RLE compression missing values compression"))?;
let Compression::Flat(values) = values else {
return Err(Error::invalid_input(
"RLE compression only supports flat values",
));
};
let run_lengths = run_lengths
.compression
.as_ref()
.ok_or_else(|| Error::invalid_input("RLE compression missing run lengths compression"))?;
let Compression::Flat(run_lengths) = run_lengths else {
return Err(Error::invalid_input(
"RLE compression only supports flat run lengths",
));
};
if run_lengths.bits_per_value != 8 {
return Err(Error::invalid_input(format!(
"RLE compression only supports 8-bit run lengths, got {}",
run_lengths.bits_per_value
)));
}
Ok(values.bits_per_value)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::LanceBuffer;
use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
use crate::statistics::ComputeStat;
use crate::testing::extract_array_encoding_chain;
use arrow_schema::{DataType, Field as ArrowField};
use std::collections::HashMap;
fn create_test_field(name: &str, data_type: DataType) -> Field {
let arrow_field = ArrowField::new(name, data_type, true);
let mut field = Field::try_from(&arrow_field).unwrap();
field.id = -1;
field
}
fn create_fixed_width_block_with_stats(
bits_per_value: u64,
num_values: u64,
run_count: u64,
) -> DataBlock {
let bytes_per_value = (bits_per_value / 8) as usize;
let total_bytes = bytes_per_value * num_values as usize;
let mut data = vec![0u8; total_bytes];
let values_per_run = (num_values / run_count).max(1);
let mut run_value = 0u8;
for i in 0..num_values as usize {
if i % values_per_run as usize == 0 {
run_value = run_value.wrapping_add(17); }
for j in 0..bytes_per_value {
let byte_offset = i * bytes_per_value + j;
if byte_offset < data.len() {
data[byte_offset] = run_value.wrapping_add(j as u8);
}
}
}
let mut block = FixedWidthDataBlock {
bits_per_value,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};
use crate::statistics::ComputeStat;
block.compute_stat();
DataBlock::FixedWidth(block)
}
fn create_fixed_width_block(bits_per_value: u64, num_values: u64) -> DataBlock {
let bytes_per_value = (bits_per_value / 8) as usize;
let total_bytes = bytes_per_value * num_values as usize;
let mut data = vec![0u8; total_bytes];
for i in 0..num_values as usize {
let byte_offset = i * bytes_per_value;
if byte_offset < data.len() {
data[byte_offset] = (i % 256) as u8;
}
}
let mut block = FixedWidthDataBlock {
bits_per_value,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};
use crate::statistics::ComputeStat;
block.compute_stat();
DataBlock::FixedWidth(block)
}
fn create_variable_width_block(
bits_per_offset: u8,
num_values: u64,
avg_value_size: usize,
) -> DataBlock {
use crate::statistics::ComputeStat;
let mut offsets = Vec::with_capacity((num_values + 1) as usize);
let mut current_offset = 0i64;
offsets.push(current_offset);
for i in 0..num_values {
let value_size = if avg_value_size == 0 {
1
} else {
((avg_value_size as i64 + (i as i64 % 8) - 4).max(1) as usize)
.min(avg_value_size * 2)
};
current_offset += value_size as i64;
offsets.push(current_offset);
}
let total_data_size = current_offset as usize;
let mut data = vec![0u8; total_data_size];
for i in 0..num_values {
let start_offset = offsets[i as usize] as usize;
let end_offset = offsets[(i + 1) as usize] as usize;
let content = (i % 256) as u8;
for j in 0..end_offset - start_offset {
data[start_offset + j] = content.wrapping_add(j as u8);
}
}
let offsets_buffer = match bits_per_offset {
32 => {
let offsets_32: Vec<i32> = offsets.iter().map(|&o| o as i32).collect();
LanceBuffer::reinterpret_vec(offsets_32)
}
64 => LanceBuffer::reinterpret_vec(offsets),
_ => panic!("Unsupported bits_per_offset: {}", bits_per_offset),
};
let mut block = VariableWidthBlock {
data: LanceBuffer::from(data),
offsets: offsets_buffer,
bits_per_offset,
num_values,
block_info: BlockInfo::default(),
};
block.compute_stat();
DataBlock::VariableWidth(block)
}
fn create_fsst_candidate_variable_width_block() -> DataBlock {
create_variable_width_block(32, 4096, FSST_LEAST_INPUT_MAX_LENGTH as usize + 16)
}
#[test]
fn test_parameter_based_compression() {
let mut params = CompressionParams::new();
params.columns.insert(
"*_id".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.3),
compression: Some("lz4".to_string()),
compression_level: None,
bss: Some(BssMode::Off), minichunk_size: None,
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("user_id", DataType::Int32);
let data = create_fixed_width_block_with_stats(32, 1000, 100);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("GeneralMiniBlockCompressor"));
assert!(debug_str.contains("RleEncoder"));
}
#[test]
fn test_type_level_parameters() {
let mut params = CompressionParams::new();
params.types.insert(
"Int32".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.1), compression: Some("zstd".to_string()),
compression_level: Some(3),
bss: Some(BssMode::Off), minichunk_size: None,
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("some_column", DataType::Int32);
let data = create_fixed_width_block_with_stats(32, 1000, 50);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
assert!(format!("{:?}", compressor).contains("RleEncoder"));
}
#[test]
#[cfg(feature = "bitpacking")]
fn test_low_cardinality_prefers_bitpacking_over_rle() {
let strategy = DefaultCompressionStrategy::new();
let field = create_test_field("int_score", DataType::Int64);
let mut values: Vec<u64> = Vec::with_capacity(256);
for run_idx in 0..64 {
let value = match run_idx % 3 {
0 => 3u64,
1 => 4u64,
_ => 5u64,
};
values.extend(std::iter::repeat_n(value, 4));
}
let mut block = FixedWidthDataBlock {
bits_per_value: 64,
data: LanceBuffer::reinterpret_vec(values),
num_values: 256,
block_info: BlockInfo::default(),
};
use crate::statistics::ComputeStat;
block.compute_stat();
let data = DataBlock::FixedWidth(block);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(
debug_str.contains("InlineBitpacking"),
"expected InlineBitpacking, got: {debug_str}"
);
assert!(
!debug_str.contains("RleEncoder"),
"expected RLE to be skipped when bitpacking is smaller, got: {debug_str}"
);
}
fn check_uncompressed_encoding(encoding: &CompressiveEncoding, variable: bool) {
let chain = extract_array_encoding_chain(encoding);
if variable {
assert_eq!(chain.len(), 2);
assert_eq!(chain.first().unwrap().as_str(), "variable");
assert_eq!(chain.get(1).unwrap().as_str(), "flat");
} else {
assert_eq!(chain.len(), 1);
assert_eq!(chain.first().unwrap().as_str(), "flat");
}
}
#[test]
fn test_none_compression() {
let mut params = CompressionParams::new();
params.columns.insert(
"embeddings".to_string(),
CompressionFieldParams {
compression: Some("none".to_string()),
..Default::default()
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("embeddings", DataType::Float32);
let fixed_data = create_fixed_width_block(32, 1000);
let variable_data = create_variable_width_block(32, 10, 32 * 1024);
let compressor = strategy
.create_miniblock_compressor(&field, &fixed_data)
.unwrap();
let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, true);
let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
let (_block, encoding) = compressor.compress(fixed_data).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
let (_block, encoding) = compressor.compress(variable_data).unwrap();
check_uncompressed_encoding(&encoding, true);
}
#[test]
fn test_field_metadata_none_compression() {
let mut arrow_field = ArrowField::new("simple_col", DataType::Binary, true);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
arrow_field = arrow_field.with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();
let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new());
let fixed_data = create_fixed_width_block(32, 1000);
let variable_data = create_variable_width_block(32, 10, 32 * 1024);
let compressor = strategy
.create_miniblock_compressor(&field, &fixed_data)
.unwrap();
let (_block, encoding) = compressor.compress(fixed_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let (_block, encoding) = compressor.compress(variable_data.clone()).unwrap();
check_uncompressed_encoding(&encoding, true);
let compressor = strategy.create_per_value(&field, &fixed_data).unwrap();
let (_block, encoding) = compressor.compress(fixed_data).unwrap();
check_uncompressed_encoding(&encoding, false);
let compressor = strategy.create_per_value(&field, &variable_data).unwrap();
let (_block, encoding) = compressor.compress(variable_data).unwrap();
check_uncompressed_encoding(&encoding, true);
}
#[test]
fn test_auto_fsst_disabled_for_binary_fields() {
let strategy = DefaultCompressionStrategy::new();
let field = create_test_field("bytes", DataType::Binary);
let variable_data = create_fsst_candidate_variable_width_block();
let miniblock = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let miniblock_debug = format!("{:?}", miniblock);
assert!(
miniblock_debug.contains("BinaryMiniBlockEncoder"),
"expected BinaryMiniBlockEncoder, got: {miniblock_debug}"
);
assert!(
!miniblock_debug.contains("FsstMiniBlockEncoder"),
"did not expect FsstMiniBlockEncoder, got: {miniblock_debug}"
);
let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
let per_value_debug = format!("{:?}", per_value);
assert!(
per_value_debug.contains("VariableEncoder"),
"expected VariableEncoder, got: {per_value_debug}"
);
assert!(
!per_value_debug.contains("FsstPerValueEncoder"),
"did not expect FsstPerValueEncoder, got: {per_value_debug}"
);
}
#[test]
fn test_auto_fsst_still_enabled_for_utf8_fields() {
let strategy = DefaultCompressionStrategy::new();
let field = create_test_field("text", DataType::Utf8);
let variable_data = create_fsst_candidate_variable_width_block();
let miniblock = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let miniblock_debug = format!("{:?}", miniblock);
assert!(
miniblock_debug.contains("FsstMiniBlockEncoder"),
"expected FsstMiniBlockEncoder, got: {miniblock_debug}"
);
let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
let per_value_debug = format!("{:?}", per_value);
assert!(
per_value_debug.contains("FsstPerValueEncoder"),
"expected FsstPerValueEncoder, got: {per_value_debug}"
);
}
#[test]
fn test_explicit_fsst_still_supported_for_binary_fields() {
let mut params = CompressionParams::new();
params.columns.insert(
"bytes".to_string(),
CompressionFieldParams {
compression: Some("fsst".to_string()),
..Default::default()
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("bytes", DataType::Binary);
let variable_data = create_fsst_candidate_variable_width_block();
let miniblock = strategy
.create_miniblock_compressor(&field, &variable_data)
.unwrap();
let miniblock_debug = format!("{:?}", miniblock);
assert!(
miniblock_debug.contains("FsstMiniBlockEncoder"),
"expected FsstMiniBlockEncoder, got: {miniblock_debug}"
);
let per_value = strategy.create_per_value(&field, &variable_data).unwrap();
let per_value_debug = format!("{:?}", per_value);
assert!(
per_value_debug.contains("FsstPerValueEncoder"),
"expected FsstPerValueEncoder, got: {per_value_debug}"
);
}
#[test]
fn test_parameter_merge_priority() {
let mut params = CompressionParams::new();
params.types.insert(
"Int32".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.5),
compression: Some("lz4".to_string()),
..Default::default()
},
);
params.columns.insert(
"user_id".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.2),
compression: Some("zstd".to_string()),
compression_level: Some(6),
bss: None,
minichunk_size: None,
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let merged = strategy
.params
.get_field_params("user_id", &DataType::Int32);
assert_eq!(merged.rle_threshold, Some(0.2));
assert_eq!(merged.compression, Some("zstd".to_string()));
assert_eq!(merged.compression_level, Some(6));
let merged = strategy
.params
.get_field_params("other_field", &DataType::Int32);
assert_eq!(merged.rle_threshold, Some(0.5));
assert_eq!(merged.compression, Some("lz4".to_string()));
assert_eq!(merged.compression_level, None);
}
#[test]
fn test_pattern_matching() {
let mut params = CompressionParams::new();
params.columns.insert(
"log_*".to_string(),
CompressionFieldParams {
compression: Some("zstd".to_string()),
compression_level: Some(6),
..Default::default()
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let merged = strategy
.params
.get_field_params("log_messages", &DataType::Utf8);
assert_eq!(merged.compression, Some("zstd".to_string()));
assert_eq!(merged.compression_level, Some(6));
let merged = strategy
.params
.get_field_params("messages_log", &DataType::Utf8);
assert_eq!(merged.compression, None);
}
#[test]
fn test_legacy_metadata_support() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
let mut field = create_test_field("some_column", DataType::Int32);
field.metadata = metadata;
let data = create_fixed_width_block(32, 1000);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
assert!(format!("{:?}", compressor).contains("ValueEncoder"));
}
#[test]
fn test_default_behavior() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let field = create_test_field("random_column", DataType::Int32);
let data = create_fixed_width_block_with_stats(32, 1000, 600);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("ValueEncoder") || debug_str.contains("InlineBitpacking"));
}
#[test]
fn test_field_metadata_compression() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_META_KEY.to_string(), "zstd".to_string());
metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "6".to_string());
let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;
let data = create_fixed_width_block(32, 1000);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("GeneralMiniBlockCompressor"));
}
#[test]
fn test_field_metadata_rle_threshold() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "0.8".to_string());
metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;
let data = create_fixed_width_block_with_stats(32, 1000, 100);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("RleEncoder"));
}
#[test]
fn test_field_metadata_override_params() {
let mut params = CompressionParams::new();
params.columns.insert(
"test_column".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.3),
compression: Some("lz4".to_string()),
compression_level: None,
bss: None,
minichunk_size: None,
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_META_KEY.to_string(), "none".to_string());
let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;
let data = create_fixed_width_block(32, 1000);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
assert!(format!("{:?}", compressor).contains("ValueEncoder"));
}
#[test]
fn test_field_metadata_mixed_configuration() {
let mut params = CompressionParams::new();
params.types.insert(
"Int32".to_string(),
CompressionFieldParams {
rle_threshold: Some(0.5),
compression: Some("lz4".to_string()),
..Default::default()
},
);
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), "3".to_string());
let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;
let data = create_fixed_width_block(32, 1000);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("GeneralMiniBlockCompressor"));
}
#[test]
fn test_bss_field_metadata() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
let arrow_field =
ArrowField::new("temperature", DataType::Float32, false).with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();
let data = create_fixed_width_block(32, 100);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("ByteStreamSplitEncoder"));
}
#[test]
fn test_bss_with_compression() {
let params = CompressionParams::new();
let strategy = DefaultCompressionStrategy::with_params(params);
let mut metadata = HashMap::new();
metadata.insert(BSS_META_KEY.to_string(), "on".to_string());
metadata.insert(COMPRESSION_META_KEY.to_string(), "lz4".to_string());
let arrow_field =
ArrowField::new("sensor_data", DataType::Float64, false).with_metadata(metadata);
let field = Field::try_from(&arrow_field).unwrap();
let data = create_fixed_width_block(64, 100);
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("GeneralMiniBlockCompressor"));
assert!(debug_str.contains("ByteStreamSplitEncoder"));
}
#[test]
#[cfg(any(feature = "lz4", feature = "zstd"))]
fn test_general_block_decompression_fixed_width_v2_2() {
let mut params = CompressionParams::new();
params.columns.insert(
"dict_values".to_string(),
CompressionFieldParams {
compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
..Default::default()
},
);
let mut strategy = DefaultCompressionStrategy::with_params(params);
strategy.version = LanceFileVersion::V2_2;
let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
let data = create_fixed_width_block(24, 1024);
let DataBlock::FixedWidth(expected_block) = &data else {
panic!("expected fixed width block");
};
let expected_bits = expected_block.bits_per_value;
let expected_num_values = expected_block.num_values;
let num_values = expected_num_values;
let (compressor, encoding) = strategy
.create_block_compressor(&field, &data)
.expect("general compression should be selected");
match encoding.compression.as_ref() {
Some(Compression::General(_)) => {}
other => panic!("expected general compression, got {:?}", other),
}
let compressed_buffer = compressor
.compress(data.clone())
.expect("write path general compression should succeed");
let decompressor = DefaultDecompressionStrategy::default()
.create_block_decompressor(&encoding)
.expect("general block decompressor should be created");
let decoded = decompressor
.decompress(compressed_buffer, num_values)
.expect("decompression should succeed");
match decoded {
DataBlock::FixedWidth(block) => {
assert_eq!(block.bits_per_value, expected_bits);
assert_eq!(block.num_values, expected_num_values);
assert_eq!(block.data.as_ref(), expected_block.data.as_ref());
}
_ => panic!("expected fixed width block"),
}
}
#[test]
#[cfg(any(feature = "lz4", feature = "zstd"))]
fn test_general_compression_not_selected_for_v2_1_even_if_requested() {
let mut params = CompressionParams::new();
params.columns.insert(
"dict_values".to_string(),
CompressionFieldParams {
compression: Some(if cfg!(feature = "lz4") { "lz4" } else { "zstd" }.to_string()),
..Default::default()
},
);
let strategy =
DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_1);
let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
let data = create_fixed_width_block(24, 1024);
let (_compressor, encoding) = strategy
.create_block_compressor(&field, &data)
.expect("block compressor selection should succeed");
assert!(
!matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
"general compression should not be selected for V2.1"
);
}
#[test]
fn test_none_compression_disables_auto_general_block_compression() {
let mut params = CompressionParams::new();
params.columns.insert(
"dict_values".to_string(),
CompressionFieldParams {
compression: Some("none".to_string()),
..Default::default()
},
);
let strategy =
DefaultCompressionStrategy::with_params(params).with_version(LanceFileVersion::V2_2);
let field = create_test_field("dict_values", DataType::FixedSizeBinary(3));
let data = create_fixed_width_block(24, 20_000);
assert!(
data.data_size() > MIN_BLOCK_SIZE_FOR_GENERAL_COMPRESSION,
"test requires block size above automatic general compression threshold"
);
let (_compressor, encoding) = strategy
.create_block_compressor(&field, &data)
.expect("block compressor selection should succeed");
assert!(
!matches!(encoding.compression.as_ref(), Some(Compression::General(_))),
"compression=none should disable automatic block general compression"
);
}
#[test]
fn test_rle_block_used_for_version_v2_2() {
let field = create_test_field("test_repdef", DataType::UInt16);
let num_values = 1000u64;
let mut data = Vec::with_capacity(num_values as usize);
for i in 0..10 {
for _ in 0..100 {
data.push(i as u16);
}
}
let mut block = FixedWidthDataBlock {
bits_per_value: 16,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};
block.compute_stat();
let data_block = DataBlock::FixedWidth(block);
let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
.with_version(LanceFileVersion::V2_2);
let (compressor, _) = strategy
.create_block_compressor(&field, &data_block)
.unwrap();
let debug_str = format!("{:?}", compressor);
assert!(debug_str.contains("RleEncoder"));
}
#[test]
fn test_rle_block_not_used_for_version_v2_1() {
let field = create_test_field("test_repdef", DataType::UInt16);
let num_values = 1000u64;
let mut data = Vec::with_capacity(num_values as usize);
for i in 0..10 {
for _ in 0..100 {
data.push(i as u16);
}
}
let mut block = FixedWidthDataBlock {
bits_per_value: 16,
data: LanceBuffer::reinterpret_vec(data),
num_values,
block_info: BlockInfo::default(),
};
block.compute_stat();
let data_block = DataBlock::FixedWidth(block);
let strategy = DefaultCompressionStrategy::with_params(CompressionParams::new())
.with_version(LanceFileVersion::V2_1);
let (compressor, _) = strategy
.create_block_compressor(&field, &data_block)
.unwrap();
let debug_str = format!("{:?}", compressor);
assert!(
!debug_str.contains("RleEncoder"),
"RLE should not be used for V2.1"
);
}
}