lance_encoding/encodings/physical/
bitpack.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Bitpacking encodings
5//!
6//! These encodings look for unused higher order bits and discard them.  For example, if we
7//! have a u32 array and all values are between 0 and 5000 then we only need 12 bits to store
8//! each value.  The encoding will discard the upper 20 bits and only store 12 bits.
9//!
10//! This is a simple encoding that works well for data that has a small range.
11//!
12//! In order to decode the values we need to know the bit width of the values.  This can be stored
13//! inline with the data (miniblock) or in the encoding description, out of line (full zip).
14//!
15//! The encoding is transparent because the output has a fixed width (just like the input) and
16//! we can easily jump to the correct value.
17
18use arrow::datatypes::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use snafu::location;
23
24use lance_core::{Error, Result};
25
26use crate::buffer::LanceBuffer;
27use crate::compression::{
28    BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
29};
30use crate::compression_algo::fastlanes::BitPacking;
31use crate::data::BlockInfo;
32use crate::data::{DataBlock, FixedWidthDataBlock};
33use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
34use crate::encodings::logical::primitive::miniblock::{
35    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
36};
37use crate::format::{pb, ProtobufUtils};
38use crate::statistics::{GetStat, Stat};
39use bytemuck::{cast_slice, AnyBitPattern};
40
41const LOG_ELEMS_PER_CHUNK: u8 = 10;
42const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
43
44#[derive(Debug, Default)]
45pub struct InlineBitpacking {
46    uncompressed_bit_width: u64,
47}
48
49impl InlineBitpacking {
50    pub fn new(uncompressed_bit_width: u64) -> Self {
51        Self {
52            uncompressed_bit_width,
53        }
54    }
55
56    pub fn from_description(description: &pb::InlineBitpacking) -> Self {
57        Self {
58            uncompressed_bit_width: description.uncompressed_bits_per_value,
59        }
60    }
61
62    pub fn min_size_bytes(bit_width: u64) -> u64 {
63        (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
64    }
65
66    /// Bitpacks a FixedWidthDataBlock into compressed chunks of 1024 values
67    ///
68    /// Each chunk can have a different bit width
69    ///
70    /// Each chunk has the compressed bit width stored inline in the chunk itself.
71    fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
72        mut data: FixedWidthDataBlock,
73    ) -> MiniBlockCompressed {
74        let data_buffer = data.data.borrow_to_typed_slice::<T>();
75        let data_buffer = data_buffer.as_ref();
76
77        let bit_widths = data.expect_stat(Stat::BitWidth);
78        let bit_widths_array = bit_widths
79            .as_any()
80            .downcast_ref::<PrimitiveArray<UInt64Type>>()
81            .unwrap();
82
83        let (packed_chunk_sizes, total_size) = bit_widths_array
84            .values()
85            .iter()
86            .map(|&bit_width| {
87                let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
88                (chunk_size, chunk_size + 1)
89            })
90            .fold(
91                (Vec::with_capacity(bit_widths_array.len()), 0),
92                |(mut sizes, total), (size, inc)| {
93                    sizes.push(size);
94                    (sizes, total + inc)
95                },
96            );
97
98        let mut output: Vec<T> = Vec::with_capacity(total_size);
99        let mut chunks = Vec::with_capacity(bit_widths_array.len());
100
101        for (i, packed_chunk_size) in packed_chunk_sizes
102            .iter()
103            .enumerate()
104            .take(bit_widths_array.len() - 1)
105        {
106            let start_elem = i * ELEMS_PER_CHUNK as usize;
107            let bit_width = bit_widths_array.value(i) as usize;
108            output.push(T::from_usize(bit_width).unwrap());
109            let output_len = output.len();
110            unsafe {
111                output.set_len(output_len + *packed_chunk_size);
112                BitPacking::unchecked_pack(
113                    bit_width,
114                    &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
115                    &mut output[output_len..][..*packed_chunk_size],
116                );
117            }
118            chunks.push(MiniBlockChunk {
119                buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
120                log_num_values: LOG_ELEMS_PER_CHUNK,
121            });
122        }
123
124        // Handle the last chunk
125        let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
126            1024
127        } else {
128            data.num_values % ELEMS_PER_CHUNK
129        };
130        let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
131        last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
132            &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
133        );
134        let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
135        output.push(T::from_usize(bit_width).unwrap());
136        let output_len = output.len();
137        unsafe {
138            output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
139            BitPacking::unchecked_pack(
140                bit_width,
141                &last_chunk,
142                &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
143            );
144        }
145        chunks.push(MiniBlockChunk {
146            buffer_sizes: vec![
147                ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
148                    as u16,
149            ],
150            log_num_values: 0,
151        });
152
153        MiniBlockCompressed {
154            data: vec![LanceBuffer::reinterpret_vec(output)],
155            chunks,
156            num_values: data.num_values,
157        }
158    }
159
160    fn chunk_data(
161        &self,
162        data: FixedWidthDataBlock,
163    ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
164        assert!(data.bits_per_value % 8 == 0);
165        assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
166        let bits_per_value = data.bits_per_value;
167        let compressed = match bits_per_value {
168            8 => Self::bitpack_chunked::<u8>(data),
169            16 => Self::bitpack_chunked::<u16>(data),
170            32 => Self::bitpack_chunked::<u32>(data),
171            64 => Self::bitpack_chunked::<u64>(data),
172            _ => unreachable!(),
173        };
174        (compressed, ProtobufUtils::inline_bitpacking(bits_per_value))
175    }
176
177    fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
178        data: LanceBuffer,
179        num_values: u64,
180    ) -> Result<DataBlock> {
181        assert!(data.len() >= 8);
182        assert!(num_values <= ELEMS_PER_CHUNK);
183
184        // This macro decompresses a chunk(1024 values) of bitpacked values.
185        let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
186        let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
187
188        // Copy for memory alignment
189        let chunk_in_u8: Vec<u8> = data.to_vec();
190        let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
191        let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
192        let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
193
194        // The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8)
195        assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
196
197        unsafe {
198            BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
199        }
200
201        decompressed.truncate(num_values as usize);
202        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
203            data: LanceBuffer::reinterpret_vec(decompressed),
204            bits_per_value: uncompressed_bit_width as u64,
205            num_values,
206            block_info: BlockInfo::new(),
207        }))
208    }
209}
210
211impl MiniBlockCompressor for InlineBitpacking {
212    fn compress(
213        &self,
214        chunk: DataBlock,
215    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
216        match chunk {
217            DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
218            _ => Err(Error::InvalidInput {
219                source: format!(
220                    "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
221                    chunk.name()
222                )
223                .into(),
224                location: location!(),
225            }),
226        }
227    }
228}
229
230impl BlockCompressor for InlineBitpacking {
231    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
232        let fixed_width = data.as_fixed_width().unwrap();
233        let (chunked, _) = self.chunk_data(fixed_width);
234        Ok(chunked.data.into_iter().next().unwrap())
235    }
236}
237
238impl MiniBlockDecompressor for InlineBitpacking {
239    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
240        assert_eq!(data.len(), 1);
241        let data = data.into_iter().next().unwrap();
242        match self.uncompressed_bit_width {
243            8 => Self::unchunk::<u8>(data, num_values),
244            16 => Self::unchunk::<u16>(data, num_values),
245            32 => Self::unchunk::<u32>(data, num_values),
246            64 => Self::unchunk::<u64>(data, num_values),
247            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
248        }
249    }
250}
251
252impl BlockDecompressor for InlineBitpacking {
253    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
254        match self.uncompressed_bit_width {
255            8 => Self::unchunk::<u8>(data, num_values),
256            16 => Self::unchunk::<u16>(data, num_values),
257            32 => Self::unchunk::<u32>(data, num_values),
258            64 => Self::unchunk::<u64>(data, num_values),
259            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
260        }
261    }
262}
263
264/// Bitpacks a FixedWidthDataBlock with a given bit width
265///
266/// This function is simpler as it does not do any chunking, but slightly less efficient.
267/// The compressed bits per value is constant across the entire buffer.
268///
269/// Note: even though we are not strictly "chunking" we are still operating on chunks of
270/// 1024 values because that's what the bitpacking primitives expect.  They just don't
271/// have a unique bit width for each chunk.
272fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
273    mut data: FixedWidthDataBlock,
274    bit_width: usize,
275) -> LanceBuffer {
276    let data_buffer = data.data.borrow_to_typed_slice::<T>();
277    let data_buffer = data_buffer.as_ref();
278
279    let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
280    let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
281    let words_per_chunk =
282        (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
283    #[allow(clippy::uninit_vec)]
284    let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
285    #[allow(clippy::uninit_vec)]
286    unsafe {
287        output.set_len(num_chunks * words_per_chunk);
288    }
289
290    let num_whole_chunks = if last_chunk_is_runt {
291        num_chunks - 1
292    } else {
293        num_chunks
294    };
295
296    // Simple case for complete chunks
297    for i in 0..num_whole_chunks {
298        let input_start = i * ELEMS_PER_CHUNK as usize;
299        let input_end = input_start + ELEMS_PER_CHUNK as usize;
300        let output_start = i * words_per_chunk;
301        let output_end = output_start + words_per_chunk;
302        unsafe {
303            BitPacking::unchecked_pack(
304                bit_width,
305                &data_buffer[input_start..input_end],
306                &mut output[output_start..output_end],
307            );
308        }
309    }
310
311    if !last_chunk_is_runt {
312        return LanceBuffer::reinterpret_vec(output);
313    }
314
315    // If we get here then the last chunk needs to be padded with zeros
316    let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
317    let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
318
319    let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
320    last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
321    let output_start = num_whole_chunks * words_per_chunk;
322    unsafe {
323        BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
324    }
325
326    LanceBuffer::reinterpret_vec(output)
327}
328
329/// Unpacks a FixedWidthDataBlock that has been bitpacked with a constant bit width
330fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
331    mut data: FixedWidthDataBlock,
332    num_values: usize,
333    bits_per_value: usize,
334) -> FixedWidthDataBlock {
335    let words_per_chunk =
336        (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
337    let compressed_words = data.data.borrow_to_typed_slice::<T>();
338
339    let num_chunks = data.num_values as usize / words_per_chunk;
340    debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
341
342    // This is slightly larger than num_values because the last chunk has some padding, we will truncate at the end
343    #[allow(clippy::uninit_vec)]
344    let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
345    #[allow(clippy::uninit_vec)]
346    unsafe {
347        decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
348    }
349
350    for chunk_idx in 0..num_chunks {
351        let input_start = chunk_idx * words_per_chunk;
352        let input_end = input_start + words_per_chunk;
353        let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
354        let output_end = output_start + ELEMS_PER_CHUNK as usize;
355        unsafe {
356            BitPacking::unchecked_unpack(
357                bits_per_value,
358                &compressed_words[input_start..input_end],
359                &mut decompressed[output_start..output_end],
360            );
361        }
362    }
363
364    decompressed.truncate(num_values);
365
366    FixedWidthDataBlock {
367        data: LanceBuffer::reinterpret_vec(decompressed),
368        bits_per_value: data.bits_per_value,
369        num_values: num_values as u64,
370        block_info: BlockInfo::new(),
371    }
372}
373
374/// A transparent compressor that bit packs data
375///
376/// In order for the encoding to be transparent we must have a fixed bit width
377/// across the entire array.  Chunking within the buffer is not supported.  This
378/// means that we will be slightly less efficient than something like the mini-block
379/// approach.
380///
381/// WARNING: DO NOT USE YET.
382///
383/// This was an interesting experiment but it can't be used as a per-value compressor
384/// at the moment.  The resulting data IS transparent but it's not quite so simple.  We
385/// compress in blocks of 1024 and each block has a fixed size but also has some padding.
386///
387/// In other words, if we try the simple math to access the item at index `i` we will be
388/// out of luck because `bits_per_value * i` is not the location.  What we need is something
389/// like:
390///
391/// ```ignore
392/// let chunk_idx = i / 1024;
393/// let chunk_offset = i % 1024;
394/// bits_per_chunk * chunk_idx + bits_per_value * chunk_offset
395/// ```
396///
397/// However, this logic isn't expressible with the per-value traits we have today.  We can
398/// enhance these traits should we need to support it at some point in the future.
399#[derive(Debug)]
400pub struct OutOfLineBitpacking {
401    compressed_bit_width: usize,
402}
403
404impl PerValueCompressor for OutOfLineBitpacking {
405    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
406        let fixed_width = data.as_fixed_width().unwrap();
407        let num_values = fixed_width.num_values;
408        let word_size = fixed_width.bits_per_value;
409        let compressed = match word_size {
410            8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
411            16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
412            32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
413            64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
414            _ => panic!("Bitpacking word size must be 8,16,32,64"),
415        };
416        let compressed = FixedWidthDataBlock {
417            data: compressed,
418            bits_per_value: self.compressed_bit_width as u64,
419            num_values,
420            block_info: BlockInfo::new(),
421        };
422        let encoding =
423            ProtobufUtils::out_of_line_bitpacking(word_size, self.compressed_bit_width as u64);
424        Ok((PerValueDataBlock::Fixed(compressed), encoding))
425    }
426}
427
428impl FixedPerValueDecompressor for OutOfLineBitpacking {
429    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
430        let unpacked = match data.bits_per_value {
431            8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
432            16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
433            32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
434            64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
435            _ => panic!("Bitpacking word size must be 8,16,32,64"),
436        };
437        Ok(DataBlock::FixedWidth(unpacked))
438    }
439
440    fn bits_per_value(&self) -> u64 {
441        self.compressed_bit_width as u64
442    }
443}
444
445#[cfg(test)]
446mod test {
447    use std::{collections::HashMap, sync::Arc};
448
449    use arrow_array::{Int64Array, Int8Array};
450
451    use arrow_schema::DataType;
452
453    use arrow_array::Array;
454
455    use crate::{
456        testing::{check_round_trip_encoding_of_data, TestCases},
457        version::LanceFileVersion,
458    };
459
460    #[test_log::test(tokio::test)]
461    async fn test_miniblock_bitpack() {
462        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
463
464        let mut metadata = HashMap::new();
465        metadata.insert(
466            "lance-encoding:compression".to_string(),
467            "bitpacking".to_string(),
468        );
469
470        let arrays = vec![
471            Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
472            Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
473            Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
474            Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
475            Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
476        ];
477        check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
478
479        for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
480            let int64_arrays = vec![
481                Int64Array::from(vec![3; 1024]),
482                Int64Array::from(vec![8; 1024]),
483                Int64Array::from(vec![16; 1024]),
484                Int64Array::from(vec![100; 1024]),
485                Int64Array::from(vec![512; 1024]),
486                Int64Array::from(vec![1000; 1024]),
487                Int64Array::from(vec![2000; 1024]),
488                Int64Array::from(vec![-1; 10]),
489            ];
490
491            let mut arrays = vec![];
492            for int64_array in int64_arrays {
493                arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
494            }
495
496            let mut metadata = HashMap::new();
497            metadata.insert(
498                "lance-encoding:compression".to_string(),
499                "bitpacking".to_string(),
500            );
501            check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
502        }
503    }
504}