lance_encoding/encodings/physical/
bitpacking.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Bitpacking encodings
5//!
6//! These encodings look for unused higher order bits and discard them.  For example, if we
7//! have a u32 array and all values are between 0 and 5000 then we only need 12 bits to store
8//! each value.  The encoding will discard the upper 20 bits and only store 12 bits.
9//!
10//! This is a simple encoding that works well for data that has a small range.
11//!
12//! In order to decode the values we need to know the bit width of the values.  This can be stored
13//! inline with the data (miniblock) or in the encoding description, out of line (full zip).
14//!
15//! The encoding is transparent because the output has a fixed width (just like the input) and
16//! we can easily jump to the correct value.
17
18use arrow_array::types::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use lance_bitpacking::BitPacking;
23use snafu::location;
24
25use lance_core::{Error, Result};
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{
29    BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
30};
31use crate::data::BlockInfo;
32use crate::data::{DataBlock, FixedWidthDataBlock};
33use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
34use crate::encodings::logical::primitive::miniblock::{
35    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
36};
37use crate::format::pb21::CompressiveEncoding;
38use crate::format::{pb21, ProtobufUtils21};
39use crate::statistics::{GetStat, Stat};
40use bytemuck::{cast_slice, AnyBitPattern};
41
42const LOG_ELEMS_PER_CHUNK: u8 = 10;
43const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
44
45#[derive(Debug, Default)]
46pub struct InlineBitpacking {
47    uncompressed_bit_width: u64,
48}
49
50impl InlineBitpacking {
51    pub fn new(uncompressed_bit_width: u64) -> Self {
52        Self {
53            uncompressed_bit_width,
54        }
55    }
56
57    pub fn from_description(description: &pb21::InlineBitpacking) -> Self {
58        Self {
59            uncompressed_bit_width: description.uncompressed_bits_per_value,
60        }
61    }
62
63    pub fn min_size_bytes(bit_width: u64) -> u64 {
64        (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
65    }
66
67    /// Bitpacks a FixedWidthDataBlock into compressed chunks of 1024 values
68    ///
69    /// Each chunk can have a different bit width
70    ///
71    /// Each chunk has the compressed bit width stored inline in the chunk itself.
72    fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
73        data: FixedWidthDataBlock,
74    ) -> MiniBlockCompressed {
75        let data_buffer = data.data.borrow_to_typed_slice::<T>();
76        let data_buffer = data_buffer.as_ref();
77
78        let bit_widths = data.expect_stat(Stat::BitWidth);
79        let bit_widths_array = bit_widths
80            .as_any()
81            .downcast_ref::<PrimitiveArray<UInt64Type>>()
82            .unwrap();
83
84        let (packed_chunk_sizes, total_size) = bit_widths_array
85            .values()
86            .iter()
87            .map(|&bit_width| {
88                let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
89                (chunk_size, chunk_size + 1)
90            })
91            .fold(
92                (Vec::with_capacity(bit_widths_array.len()), 0),
93                |(mut sizes, total), (size, inc)| {
94                    sizes.push(size);
95                    (sizes, total + inc)
96                },
97            );
98
99        let mut output: Vec<T> = Vec::with_capacity(total_size);
100        let mut chunks = Vec::with_capacity(bit_widths_array.len());
101
102        for (i, packed_chunk_size) in packed_chunk_sizes
103            .iter()
104            .enumerate()
105            .take(bit_widths_array.len() - 1)
106        {
107            let start_elem = i * ELEMS_PER_CHUNK as usize;
108            let bit_width = bit_widths_array.value(i) as usize;
109            output.push(T::from_usize(bit_width).unwrap());
110            let output_len = output.len();
111            unsafe {
112                output.set_len(output_len + *packed_chunk_size);
113                BitPacking::unchecked_pack(
114                    bit_width,
115                    &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
116                    &mut output[output_len..][..*packed_chunk_size],
117                );
118            }
119            chunks.push(MiniBlockChunk {
120                buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
121                log_num_values: LOG_ELEMS_PER_CHUNK,
122            });
123        }
124
125        // Handle the last chunk
126        let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
127            1024
128        } else {
129            data.num_values % ELEMS_PER_CHUNK
130        };
131        let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
132        last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
133            &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
134        );
135        let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
136        output.push(T::from_usize(bit_width).unwrap());
137        let output_len = output.len();
138        unsafe {
139            output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
140            BitPacking::unchecked_pack(
141                bit_width,
142                &last_chunk,
143                &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
144            );
145        }
146        chunks.push(MiniBlockChunk {
147            buffer_sizes: vec![
148                ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
149                    as u16,
150            ],
151            log_num_values: 0,
152        });
153
154        MiniBlockCompressed {
155            data: vec![LanceBuffer::reinterpret_vec(output)],
156            chunks,
157            num_values: data.num_values,
158        }
159    }
160
161    fn chunk_data(&self, data: FixedWidthDataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
162        assert!(data.bits_per_value % 8 == 0);
163        assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
164        let bits_per_value = data.bits_per_value;
165        let compressed = match bits_per_value {
166            8 => Self::bitpack_chunked::<u8>(data),
167            16 => Self::bitpack_chunked::<u16>(data),
168            32 => Self::bitpack_chunked::<u32>(data),
169            64 => Self::bitpack_chunked::<u64>(data),
170            _ => unreachable!(),
171        };
172        (
173            compressed,
174            ProtobufUtils21::inline_bitpacking(
175                bits_per_value,
176                // TODO: Could potentially compress the data here
177                None,
178            ),
179        )
180    }
181
182    fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
183        data: LanceBuffer,
184        num_values: u64,
185    ) -> Result<DataBlock> {
186        assert!(data.len() >= 8);
187        assert!(num_values <= ELEMS_PER_CHUNK);
188
189        // This macro decompresses a chunk(1024 values) of bitpacked values.
190        let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
191        let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
192
193        // Copy for memory alignment
194        let chunk_in_u8: Vec<u8> = data.to_vec();
195        let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
196        let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
197        let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
198
199        // The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8)
200        assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
201
202        unsafe {
203            BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
204        }
205
206        decompressed.truncate(num_values as usize);
207        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
208            data: LanceBuffer::reinterpret_vec(decompressed),
209            bits_per_value: uncompressed_bit_width as u64,
210            num_values,
211            block_info: BlockInfo::new(),
212        }))
213    }
214}
215
216impl MiniBlockCompressor for InlineBitpacking {
217    fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
218        match chunk {
219            DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
220            _ => Err(Error::InvalidInput {
221                source: format!(
222                    "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
223                    chunk.name()
224                )
225                .into(),
226                location: location!(),
227            }),
228        }
229    }
230}
231
232impl BlockCompressor for InlineBitpacking {
233    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
234        let fixed_width = data.as_fixed_width().unwrap();
235        let (chunked, _) = self.chunk_data(fixed_width);
236        Ok(chunked.data.into_iter().next().unwrap())
237    }
238}
239
240impl MiniBlockDecompressor for InlineBitpacking {
241    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
242        assert_eq!(data.len(), 1);
243        let data = data.into_iter().next().unwrap();
244        match self.uncompressed_bit_width {
245            8 => Self::unchunk::<u8>(data, num_values),
246            16 => Self::unchunk::<u16>(data, num_values),
247            32 => Self::unchunk::<u32>(data, num_values),
248            64 => Self::unchunk::<u64>(data, num_values),
249            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
250        }
251    }
252}
253
254impl BlockDecompressor for InlineBitpacking {
255    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
256        match self.uncompressed_bit_width {
257            8 => Self::unchunk::<u8>(data, num_values),
258            16 => Self::unchunk::<u16>(data, num_values),
259            32 => Self::unchunk::<u32>(data, num_values),
260            64 => Self::unchunk::<u64>(data, num_values),
261            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
262        }
263    }
264}
265
266/// Bitpacks a FixedWidthDataBlock with a given bit width
267///
268/// This function is simpler as it does not do any chunking, but slightly less efficient.
269/// The compressed bits per value is constant across the entire buffer.
270///
271/// Note: even though we are not strictly "chunking" we are still operating on chunks of
272/// 1024 values because that's what the bitpacking primitives expect.  They just don't
273/// have a unique bit width for each chunk.
274fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
275    data: FixedWidthDataBlock,
276    bit_width: usize,
277) -> LanceBuffer {
278    let data_buffer = data.data.borrow_to_typed_slice::<T>();
279    let data_buffer = data_buffer.as_ref();
280
281    let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
282    let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
283    let words_per_chunk =
284        (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
285    #[allow(clippy::uninit_vec)]
286    let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
287    #[allow(clippy::uninit_vec)]
288    unsafe {
289        output.set_len(num_chunks * words_per_chunk);
290    }
291
292    let num_whole_chunks = if last_chunk_is_runt {
293        num_chunks - 1
294    } else {
295        num_chunks
296    };
297
298    // Simple case for complete chunks
299    for i in 0..num_whole_chunks {
300        let input_start = i * ELEMS_PER_CHUNK as usize;
301        let input_end = input_start + ELEMS_PER_CHUNK as usize;
302        let output_start = i * words_per_chunk;
303        let output_end = output_start + words_per_chunk;
304        unsafe {
305            BitPacking::unchecked_pack(
306                bit_width,
307                &data_buffer[input_start..input_end],
308                &mut output[output_start..output_end],
309            );
310        }
311    }
312
313    if !last_chunk_is_runt {
314        return LanceBuffer::reinterpret_vec(output);
315    }
316
317    // If we get here then the last chunk needs to be padded with zeros
318    let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
319    let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
320
321    let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
322    last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
323    let output_start = num_whole_chunks * words_per_chunk;
324    unsafe {
325        BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
326    }
327
328    LanceBuffer::reinterpret_vec(output)
329}
330
331/// Unpacks a FixedWidthDataBlock that has been bitpacked with a constant bit width
332fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
333    data: FixedWidthDataBlock,
334    num_values: usize,
335    bits_per_value: usize,
336) -> FixedWidthDataBlock {
337    let words_per_chunk =
338        (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
339    let compressed_words = data.data.borrow_to_typed_slice::<T>();
340
341    let num_chunks = data.num_values as usize / words_per_chunk;
342    debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
343
344    // This is slightly larger than num_values because the last chunk has some padding, we will truncate at the end
345    #[allow(clippy::uninit_vec)]
346    let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
347    #[allow(clippy::uninit_vec)]
348    unsafe {
349        decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
350    }
351
352    for chunk_idx in 0..num_chunks {
353        let input_start = chunk_idx * words_per_chunk;
354        let input_end = input_start + words_per_chunk;
355        let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
356        let output_end = output_start + ELEMS_PER_CHUNK as usize;
357        unsafe {
358            BitPacking::unchecked_unpack(
359                bits_per_value,
360                &compressed_words[input_start..input_end],
361                &mut decompressed[output_start..output_end],
362            );
363        }
364    }
365
366    decompressed.truncate(num_values);
367
368    FixedWidthDataBlock {
369        data: LanceBuffer::reinterpret_vec(decompressed),
370        bits_per_value: data.bits_per_value,
371        num_values: num_values as u64,
372        block_info: BlockInfo::new(),
373    }
374}
375
376/// A transparent compressor that bit packs data
377///
378/// In order for the encoding to be transparent we must have a fixed bit width
379/// across the entire array.  Chunking within the buffer is not supported.  This
380/// means that we will be slightly less efficient than something like the mini-block
381/// approach.
382///
383/// WARNING: DO NOT USE YET.
384///
385/// This was an interesting experiment but it can't be used as a per-value compressor
386/// at the moment.  The resulting data IS transparent but it's not quite so simple.  We
387/// compress in blocks of 1024 and each block has a fixed size but also has some padding.
388///
389/// In other words, if we try the simple math to access the item at index `i` we will be
390/// out of luck because `bits_per_value * i` is not the location.  What we need is something
391/// like:
392///
393/// ```ignore
394/// let chunk_idx = i / 1024;
395/// let chunk_offset = i % 1024;
396/// bits_per_chunk * chunk_idx + bits_per_value * chunk_offset
397/// ```
398///
399/// However, this logic isn't expressible with the per-value traits we have today.  We can
400/// enhance these traits should we need to support it at some point in the future.
401#[derive(Debug)]
402pub struct OutOfLineBitpacking {
403    compressed_bit_width: usize,
404}
405
406impl PerValueCompressor for OutOfLineBitpacking {
407    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
408        let fixed_width = data.as_fixed_width().unwrap();
409        let num_values = fixed_width.num_values;
410        let word_size = fixed_width.bits_per_value;
411        let compressed = match word_size {
412            8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
413            16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
414            32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
415            64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
416            _ => panic!("Bitpacking word size must be 8,16,32,64"),
417        };
418        let compressed = FixedWidthDataBlock {
419            data: compressed,
420            bits_per_value: self.compressed_bit_width as u64,
421            num_values,
422            block_info: BlockInfo::new(),
423        };
424        let encoding = ProtobufUtils21::out_of_line_bitpacking(
425            word_size,
426            // TODO: Are there any other transparent encodings that could be used on the bitpacked
427            // output?
428            ProtobufUtils21::flat(
429                self.compressed_bit_width as u64,
430                // TODO: Could potentially compress the data here
431                None,
432            ),
433        );
434        Ok((PerValueDataBlock::Fixed(compressed), encoding))
435    }
436}
437
438impl FixedPerValueDecompressor for OutOfLineBitpacking {
439    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
440        let unpacked = match data.bits_per_value {
441            8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
442            16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
443            32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
444            64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
445            _ => panic!("Bitpacking word size must be 8,16,32,64"),
446        };
447        Ok(DataBlock::FixedWidth(unpacked))
448    }
449
450    fn bits_per_value(&self) -> u64 {
451        self.compressed_bit_width as u64
452    }
453}
454
455#[cfg(test)]
456mod test {
457    use std::{collections::HashMap, sync::Arc};
458
459    use arrow_array::{Int64Array, Int8Array};
460
461    use arrow_schema::DataType;
462
463    use arrow_array::Array;
464
465    use crate::{
466        testing::{check_round_trip_encoding_of_data, TestCases},
467        version::LanceFileVersion,
468    };
469
470    #[test_log::test(tokio::test)]
471    async fn test_miniblock_bitpack() {
472        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
473
474        let arrays = vec![
475            Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
476            Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
477            Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
478            Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
479            Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
480        ];
481        check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
482
483        for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
484            let int64_arrays = vec![
485                Int64Array::from(vec![3; 1024]),
486                Int64Array::from(vec![8; 1024]),
487                Int64Array::from(vec![16; 1024]),
488                Int64Array::from(vec![100; 1024]),
489                Int64Array::from(vec![512; 1024]),
490                Int64Array::from(vec![1000; 1024]),
491                Int64Array::from(vec![2000; 1024]),
492                Int64Array::from(vec![-1; 10]),
493            ];
494
495            let mut arrays = vec![];
496            for int64_array in int64_arrays {
497                arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
498            }
499
500            check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
501        }
502    }
503
504    #[test_log::test(tokio::test)]
505    async fn test_bitpack_encoding_verification() {
506        use arrow_array::Int32Array;
507
508        // Test bitpacking encoding verification with varied small values that should trigger bitpacking
509        let test_cases = TestCases::default()
510            .with_expected_encoding("inline_bitpacking")
511            .with_file_version(LanceFileVersion::V2_1);
512
513        // Generate data with varied small values to avoid RLE
514        // Mix different values but keep them small to trigger bitpacking
515        let mut values = Vec::new();
516        for i in 0..2048 {
517            values.push(i % 16); // Values 0-15, varied enough to avoid RLE
518        }
519
520        let arrays = vec![Arc::new(Int32Array::from(values)) as Arc<dyn Array>];
521
522        // Explicitly disable BSS to ensure bitpacking is tested
523        let mut metadata = HashMap::new();
524        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
525
526        check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
527    }
528}