1const DEFAULT_RLE_COMPRESSION_THRESHOLD: f64 = 0.5;
22
23use crate::{
24 buffer::LanceBuffer,
25 data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock},
26 encodings::{
27 logical::primitive::{fullzip::PerValueCompressor, miniblock::MiniBlockCompressor},
28 physical::{
29 binary::{
30 BinaryBlockDecompressor, BinaryMiniBlockDecompressor, BinaryMiniBlockEncoder,
31 VariableDecoder, VariableEncoder,
32 },
33 bitpack::InlineBitpacking,
34 block::{CompressedBufferEncoder, CompressionConfig, CompressionScheme},
35 byte_stream_split::ByteStreamSplitDecompressor,
36 constant::ConstantDecompressor,
37 fsst::{
38 FsstMiniBlockDecompressor, FsstMiniBlockEncoder, FsstPerValueDecompressor,
39 FsstPerValueEncoder,
40 },
41 general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
42 packed::{
43 PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
44 },
45 rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
46 value::{ValueDecompressor, ValueEncoder},
47 },
48 },
49 format::{pb, ProtobufUtils},
50 statistics::{GetStat, Stat},
51};
52
53use arrow::{array::AsArray, datatypes::UInt64Type};
54use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
55use lance_core::{
56 datatypes::{Field, COMPRESSION_META_KEY, RLE_THRESHOLD_META_KEY},
57 Error, Result,
58};
59use snafu::location;
60
61pub trait BlockCompressor: std::fmt::Debug + Send + Sync {
74 fn compress(&self, data: DataBlock) -> Result<LanceBuffer>;
79}
80
81pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
94 fn create_block_compressor(
96 &self,
97 field: &Field,
98 data: &DataBlock,
99 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)>;
100
101 fn create_per_value(
103 &self,
104 field: &Field,
105 data: &DataBlock,
106 ) -> Result<Box<dyn PerValueCompressor>>;
107
108 fn create_miniblock_compressor(
110 &self,
111 field: &Field,
112 data: &DataBlock,
113 ) -> Result<Box<dyn MiniBlockCompressor>>;
114}
115
116#[derive(Debug, Default)]
117pub struct DefaultCompressionStrategy;
118
119impl CompressionStrategy for DefaultCompressionStrategy {
120 fn create_miniblock_compressor(
121 &self,
122 field: &Field,
123 data: &DataBlock,
124 ) -> Result<Box<dyn MiniBlockCompressor>> {
125 match data {
126 DataBlock::FixedWidth(fixed_width_data) => {
127 let is_byte_width_aligned = fixed_width_data.bits_per_value == 8
128 || fixed_width_data.bits_per_value == 16
129 || fixed_width_data.bits_per_value == 32
130 || fixed_width_data.bits_per_value == 64;
131 let bit_widths = data.expect_stat(Stat::BitWidth);
132 let bit_widths = bit_widths.as_primitive::<UInt64Type>();
133 let has_all_zeros = bit_widths.values().iter().any(|v| *v == 0);
136 let too_small = bit_widths.len() == 1
139 && InlineBitpacking::min_size_bytes(bit_widths.value(0)) >= data.data_size();
140
141 if let Some(compression) = field.metadata.get(COMPRESSION_META_KEY) {
142 if compression.as_str() == "none" {
143 return Ok(Box::new(ValueEncoder::default()));
144 }
145 }
146
147 let rle_threshold: f64 = if let Some(value) =
148 field.metadata.get(RLE_THRESHOLD_META_KEY)
149 {
150 value.as_str().parse().map_err(|_| {
151 Error::invalid_input("rle threshold is not a valid float64", location!())
152 })?
153 } else {
154 DEFAULT_RLE_COMPRESSION_THRESHOLD
155 };
156
157 let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
159 let num_values = fixed_width_data.num_values;
160
161 if (run_count as f64) < (num_values as f64) * rle_threshold && is_byte_width_aligned
163 {
164 if fixed_width_data.bits_per_value >= 32 {
165 return Ok(Box::new(GeneralMiniBlockCompressor::new(
166 Box::new(RleMiniBlockEncoder::new()),
167 CompressionConfig::new(CompressionScheme::Lz4, None),
168 )));
169 }
170 return Ok(Box::new(RleMiniBlockEncoder::new()));
171 }
172
173 if !has_all_zeros && !too_small && is_byte_width_aligned {
174 Ok(Box::new(InlineBitpacking::new(
175 fixed_width_data.bits_per_value,
176 )))
177 } else {
178 Ok(Box::new(ValueEncoder::default()))
179 }
180 }
181 DataBlock::VariableWidth(variable_width_data) => {
182 if variable_width_data.bits_per_offset == 32
183 || variable_width_data.bits_per_offset == 64
184 {
185 let data_size =
186 variable_width_data.expect_single_stat::<UInt64Type>(Stat::DataSize);
187 let max_len =
188 variable_width_data.expect_single_stat::<UInt64Type>(Stat::MaxLength);
189
190 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
191 && data_size >= FSST_LEAST_INPUT_SIZE as u64
192 {
193 Ok(Box::new(FsstMiniBlockEncoder::default()))
194 } else {
195 Ok(Box::new(BinaryMiniBlockEncoder::default()))
196 }
197 } else {
198 todo!(
199 "Implement MiniBlockCompression for VariableWidth DataBlock with {} bit offsets.",
200 variable_width_data.bits_per_offset
201 )
202 }
203 }
204 DataBlock::Struct(struct_data_block) => {
205 if struct_data_block
208 .children
209 .iter()
210 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
211 {
212 panic!("packed struct encoding currently only supports fixed-width fields.")
213 }
214 Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
215 }
216 DataBlock::FixedSizeList(_) => {
217 Ok(Box::new(ValueEncoder::default()))
225 }
226 _ => Err(Error::NotSupported {
227 source: format!(
228 "Mini-block compression not yet supported for block type {}",
229 data.name()
230 )
231 .into(),
232 location: location!(),
233 }),
234 }
235 }
236
237 fn create_per_value(
238 &self,
239 _field: &Field,
240 data: &DataBlock,
241 ) -> Result<Box<dyn PerValueCompressor>> {
242 match data {
243 DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
244 DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
245 DataBlock::VariableWidth(variable_width) => {
246 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
247 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
248
249 if max_len > 32 * 1024 && data_size >= FSST_LEAST_INPUT_SIZE as u64 {
253 return Ok(Box::new(CompressedBufferEncoder::default()));
254 }
255
256 if variable_width.bits_per_offset == 32 || variable_width.bits_per_offset == 64 {
257 let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
258 let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
259
260 let variable_compression = Box::new(VariableEncoder::default());
261
262 if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
263 && data_size >= FSST_LEAST_INPUT_SIZE as u64
264 {
265 Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
266 } else {
267 Ok(variable_compression)
268 }
269 } else {
270 panic!("Does not support MiniBlockCompression for VariableWidth DataBlock with {} bits offsets.", variable_width.bits_per_offset);
271 }
272 }
273 _ => unreachable!(
274 "Per-value compression not yet supported for block type: {}",
275 data.name()
276 ),
277 }
278 }
279
280 fn create_block_compressor(
281 &self,
282 _field: &Field,
283 data: &DataBlock,
284 ) -> Result<(Box<dyn BlockCompressor>, pb::ArrayEncoding)> {
285 match data {
287 DataBlock::FixedWidth(fixed_width) => {
290 let encoder = Box::new(ValueEncoder::default());
291 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
292 Ok((encoder, encoding))
293 }
294 DataBlock::VariableWidth(variable_width) => {
295 let encoder = Box::new(VariableEncoder::default());
296 let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
297 Ok((encoder, encoding))
298 }
299 _ => unreachable!(),
300 }
301 }
302}
303
304pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
305 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock>;
306}
307
308pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
309 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock>;
311 fn bits_per_value(&self) -> u64;
315}
316
317pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
318 fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
320}
321
322pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
323 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
324}
325
326pub trait DecompressionStrategy: std::fmt::Debug + Send + Sync {
327 fn create_miniblock_decompressor(
328 &self,
329 description: &pb::ArrayEncoding,
330 decompression_strategy: &dyn DecompressionStrategy,
331 ) -> Result<Box<dyn MiniBlockDecompressor>>;
332
333 fn create_fixed_per_value_decompressor(
334 &self,
335 description: &pb::ArrayEncoding,
336 ) -> Result<Box<dyn FixedPerValueDecompressor>>;
337
338 fn create_variable_per_value_decompressor(
339 &self,
340 description: &pb::ArrayEncoding,
341 ) -> Result<Box<dyn VariablePerValueDecompressor>>;
342
343 fn create_block_decompressor(
344 &self,
345 description: &pb::ArrayEncoding,
346 ) -> Result<Box<dyn BlockDecompressor>>;
347}
348
349#[derive(Debug, Default)]
350pub struct DefaultDecompressionStrategy {}
351
352impl DecompressionStrategy for DefaultDecompressionStrategy {
353 fn create_miniblock_decompressor(
354 &self,
355 description: &pb::ArrayEncoding,
356 decompression_strategy: &dyn DecompressionStrategy,
357 ) -> Result<Box<dyn MiniBlockDecompressor>> {
358 match description.array_encoding.as_ref().unwrap() {
359 pb::array_encoding::ArrayEncoding::Flat(flat) => {
360 Ok(Box::new(ValueDecompressor::from_flat(flat)))
361 }
362 pb::array_encoding::ArrayEncoding::InlineBitpacking(description) => {
363 Ok(Box::new(InlineBitpacking::from_description(description)))
364 }
365 pb::array_encoding::ArrayEncoding::Variable(variable) => Ok(Box::new(
366 BinaryMiniBlockDecompressor::new(variable.bits_per_offset as u8),
367 )),
368 pb::array_encoding::ArrayEncoding::Fsst(description) => {
369 let inner_decompressor = decompression_strategy.create_miniblock_decompressor(
370 description.binary.as_ref().unwrap(),
371 decompression_strategy,
372 )?;
373 Ok(Box::new(FsstMiniBlockDecompressor::new(
374 description,
375 inner_decompressor,
376 )))
377 }
378 pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
379 Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
380 description,
381 )))
382 }
383 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
384 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
387 }
388 pb::array_encoding::ArrayEncoding::Rle(rle) => {
389 Ok(Box::new(RleMiniBlockDecompressor::new(rle.bits_per_value)))
390 }
391 pb::array_encoding::ArrayEncoding::ByteStreamSplit(bss) => Ok(Box::new(
392 ByteStreamSplitDecompressor::new(bss.bits_per_value as usize),
393 )),
394 pb::array_encoding::ArrayEncoding::GeneralMiniBlock(general) => {
395 let inner_decompressor = self.create_miniblock_decompressor(
397 general.inner.as_ref().ok_or_else(|| {
398 Error::invalid_input("GeneralMiniBlock missing inner encoding", location!())
399 })?,
400 decompression_strategy,
401 )?;
402
403 let compression = general.compression.as_ref().ok_or_else(|| {
405 Error::invalid_input("GeneralMiniBlock missing compression config", location!())
406 })?;
407
408 let scheme = compression.scheme.parse()?;
409
410 let compression_config = crate::encodings::physical::block::CompressionConfig::new(
411 scheme,
412 compression.level,
413 );
414
415 Ok(Box::new(GeneralMiniBlockDecompressor::new(
416 inner_decompressor,
417 compression_config,
418 )))
419 }
420 _ => todo!(),
421 }
422 }
423
424 fn create_fixed_per_value_decompressor(
425 &self,
426 description: &pb::ArrayEncoding,
427 ) -> Result<Box<dyn FixedPerValueDecompressor>> {
428 match description.array_encoding.as_ref().unwrap() {
429 pb::array_encoding::ArrayEncoding::Flat(flat) => {
430 Ok(Box::new(ValueDecompressor::from_flat(flat)))
431 }
432 pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) => {
433 Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
434 }
435 _ => todo!("fixed-per-value decompressor for {:?}", description),
436 }
437 }
438
439 fn create_variable_per_value_decompressor(
440 &self,
441 description: &pb::ArrayEncoding,
442 ) -> Result<Box<dyn VariablePerValueDecompressor>> {
443 match *description.array_encoding.as_ref().unwrap() {
444 pb::array_encoding::ArrayEncoding::Variable(variable) => {
445 assert!(variable.bits_per_offset < u8::MAX as u32);
446 Ok(Box::new(VariableDecoder::default()))
447 }
448 pb::array_encoding::ArrayEncoding::Fsst(ref fsst) => {
449 Ok(Box::new(FsstPerValueDecompressor::new(
450 LanceBuffer::from_bytes(fsst.symbol_table.clone(), 1),
451 Box::new(VariableDecoder::default()),
452 )))
453 }
454 pb::array_encoding::ArrayEncoding::Block(ref block) => Ok(Box::new(
455 CompressedBufferEncoder::from_scheme(&block.scheme)?,
456 )),
457 _ => todo!("variable-per-value decompressor for {:?}", description),
458 }
459 }
460
461 fn create_block_decompressor(
462 &self,
463 description: &pb::ArrayEncoding,
464 ) -> Result<Box<dyn BlockDecompressor>> {
465 match description.array_encoding.as_ref().unwrap() {
466 pb::array_encoding::ArrayEncoding::Flat(flat) => {
467 Ok(Box::new(ValueDecompressor::from_flat(flat)))
468 }
469 pb::array_encoding::ArrayEncoding::Constant(constant) => {
470 let scalar = LanceBuffer::from_bytes(constant.value.clone(), 1);
471 Ok(Box::new(ConstantDecompressor::new(scalar)))
472 }
473 pb::array_encoding::ArrayEncoding::Variable(_) => {
474 Ok(Box::new(BinaryBlockDecompressor::default()))
475 }
476 _ => todo!(),
477 }
478 }
479}