lance_encoding/encodings/physical/
bitpack_fastlanes.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow::datatypes::{
7    Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
8};
9use arrow_array::{Array, PrimitiveArray};
10use arrow_buffer::ArrowNativeType;
11use arrow_schema::DataType;
12use byteorder::{ByteOrder, LittleEndian};
13use bytes::Bytes;
14use futures::future::{BoxFuture, FutureExt};
15use log::trace;
16use snafu::location;
17
18use lance_arrow::DataTypeExt;
19use lance_core::{Error, Result};
20
21use crate::buffer::LanceBuffer;
22use crate::compression_algo::fastlanes::BitPacking;
23use crate::data::BlockInfo;
24use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock};
25use crate::decoder::{
26    BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor, PageScheduler,
27    PrimitivePageDecoder,
28};
29use crate::encoder::{
30    ArrayEncoder, BlockCompressor, EncodedArray, MiniBlockChunk, MiniBlockCompressed,
31    MiniBlockCompressor, PerValueCompressor, PerValueDataBlock,
32};
33use crate::format::{pb, ProtobufUtils};
34use crate::statistics::{GetStat, Stat};
35use arrow::array::ArrayRef;
36use bytemuck::{cast_slice, AnyBitPattern};
37
38const LOG_ELEMS_PER_CHUNK: u8 = 10;
39const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
40
41// Compute the compressed_bit_width for a given array of integers
42// todo: compute all statistics before encoding
43// todo: see how to use rust macro to rewrite this function
44pub fn compute_compressed_bit_width_for_non_neg(arrays: &[ArrayRef]) -> u64 {
45    debug_assert!(!arrays.is_empty());
46
47    let res;
48
49    match arrays[0].data_type() {
50        DataType::UInt8 => {
51            let mut global_max: u8 = 0;
52            for array in arrays {
53                let primitive_array = array
54                    .as_any()
55                    .downcast_ref::<PrimitiveArray<UInt8Type>>()
56                    .unwrap();
57                let array_max = arrow::compute::bit_or(primitive_array);
58                global_max = global_max.max(array_max.unwrap_or(0));
59            }
60            let num_bits =
61                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
62            // we will have constant encoding later
63            if num_bits == 0 {
64                res = 1;
65            } else {
66                res = num_bits;
67            }
68        }
69
70        DataType::Int8 => {
71            let mut global_max_width: u64 = 0;
72            for array in arrays {
73                let primitive_array = array
74                    .as_any()
75                    .downcast_ref::<PrimitiveArray<Int8Type>>()
76                    .unwrap();
77                let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
78                global_max_width = global_max_width.max(8 - array_max_width.leading_zeros() as u64);
79            }
80            if global_max_width == 0 {
81                res = 1;
82            } else {
83                res = global_max_width;
84            }
85        }
86
87        DataType::UInt16 => {
88            let mut global_max: u16 = 0;
89            for array in arrays {
90                let primitive_array = array
91                    .as_any()
92                    .downcast_ref::<PrimitiveArray<UInt16Type>>()
93                    .unwrap();
94                let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
95                global_max = global_max.max(array_max);
96            }
97            let num_bits =
98                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
99            if num_bits == 0 {
100                res = 1;
101            } else {
102                res = num_bits;
103            }
104        }
105
106        DataType::Int16 => {
107            let mut global_max_width: u64 = 0;
108            for array in arrays {
109                let primitive_array = array
110                    .as_any()
111                    .downcast_ref::<PrimitiveArray<Int16Type>>()
112                    .unwrap();
113                let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
114                global_max_width =
115                    global_max_width.max(16 - array_max_width.leading_zeros() as u64);
116            }
117            if global_max_width == 0 {
118                res = 1;
119            } else {
120                res = global_max_width;
121            }
122        }
123
124        DataType::UInt32 => {
125            let mut global_max: u32 = 0;
126            for array in arrays {
127                let primitive_array = array
128                    .as_any()
129                    .downcast_ref::<PrimitiveArray<UInt32Type>>()
130                    .unwrap();
131                let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
132                global_max = global_max.max(array_max);
133            }
134            let num_bits =
135                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
136            if num_bits == 0 {
137                res = 1;
138            } else {
139                res = num_bits;
140            }
141        }
142
143        DataType::Int32 => {
144            let mut global_max_width: u64 = 0;
145            for array in arrays {
146                let primitive_array = array
147                    .as_any()
148                    .downcast_ref::<PrimitiveArray<Int32Type>>()
149                    .unwrap();
150                let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
151                global_max_width =
152                    global_max_width.max(32 - array_max_width.leading_zeros() as u64);
153            }
154            if global_max_width == 0 {
155                res = 1;
156            } else {
157                res = global_max_width;
158            }
159        }
160
161        DataType::UInt64 => {
162            let mut global_max: u64 = 0;
163            for array in arrays {
164                let primitive_array = array
165                    .as_any()
166                    .downcast_ref::<PrimitiveArray<UInt64Type>>()
167                    .unwrap();
168                let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
169                global_max = global_max.max(array_max);
170            }
171            let num_bits =
172                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
173            if num_bits == 0 {
174                res = 1;
175            } else {
176                res = num_bits;
177            }
178        }
179
180        DataType::Int64 => {
181            let mut global_max_width: u64 = 0;
182            for array in arrays {
183                let primitive_array = array
184                    .as_any()
185                    .downcast_ref::<PrimitiveArray<Int64Type>>()
186                    .unwrap();
187                let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
188                global_max_width =
189                    global_max_width.max(64 - array_max_width.leading_zeros() as u64);
190            }
191            if global_max_width == 0 {
192                res = 1;
193            } else {
194                res = global_max_width;
195            }
196        }
197        _ => {
198            panic!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64");
199        }
200    };
201    res
202}
203
204// Bitpack integers using fastlanes algorithm, the input is sliced into chunks of 1024 integers, and bitpacked
205// chunk by chunk. when the input is not a multiple of 1024, the last chunk is padded with zeros, this is fine because
206// we also know the number of rows we have.
207// Here self is a borrow of BitpackedForNonNegArrayEncoder, unpacked is a mutable borrow of FixedWidthDataBlock,
208// data_type can be  one of u8, u16, u32, or u64.
209// buffer_index is a mutable borrow of u32, indicating the buffer index of the output EncodedArray.
210// It outputs an fastlanes bitpacked EncodedArray
211macro_rules! encode_fixed_width {
212    ($self:expr, $unpacked:expr, $data_type:ty, $buffer_index:expr) => {{
213        let num_chunks = $unpacked.num_values.div_ceil(ELEMS_PER_CHUNK);
214        let num_full_chunks = $unpacked.num_values / ELEMS_PER_CHUNK;
215        let uncompressed_bit_width = std::mem::size_of::<$data_type>() as u64 * 8;
216
217        // the output vector type is the same as the input type, for example, when input is u16, output is Vec<u16>
218        let packed_chunk_size = 1024 * $self.compressed_bit_width as usize / uncompressed_bit_width as usize;
219
220        let input_slice = $unpacked.data.borrow_to_typed_slice::<$data_type>();
221        let input = input_slice.as_ref();
222
223        let mut output = Vec::with_capacity(num_chunks as usize * packed_chunk_size);
224
225        // Loop over all but the last chunk.
226        (0..num_full_chunks).for_each(|i| {
227            let start_elem = (i * ELEMS_PER_CHUNK) as usize;
228
229            let output_len = output.len();
230            unsafe {
231                output.set_len(output_len + packed_chunk_size);
232                BitPacking::unchecked_pack(
233                    $self.compressed_bit_width,
234                    &input[start_elem..][..ELEMS_PER_CHUNK as usize],
235                    &mut output[output_len..][..packed_chunk_size],
236                );
237            }
238        });
239
240        if num_chunks != num_full_chunks {
241            let last_chunk_elem_num = $unpacked.num_values % ELEMS_PER_CHUNK;
242            let mut last_chunk = vec![0 as $data_type; ELEMS_PER_CHUNK as usize];
243            last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
244                &input[$unpacked.num_values as usize - last_chunk_elem_num as usize..],
245            );
246
247            let output_len = output.len();
248            unsafe {
249                output.set_len(output_len + packed_chunk_size);
250                BitPacking::unchecked_pack(
251                    $self.compressed_bit_width,
252                    &last_chunk,
253                    &mut output[output_len..][..packed_chunk_size],
254                );
255            }
256        }
257
258        let bitpacked_for_non_neg_buffer_index = *$buffer_index;
259        *$buffer_index += 1;
260
261        let encoding = ProtobufUtils::bitpacked_for_non_neg_encoding(
262            $self.compressed_bit_width as u64,
263            uncompressed_bit_width,
264            bitpacked_for_non_neg_buffer_index,
265        );
266        let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
267            bits_per_value: $self.compressed_bit_width as u64,
268            data: LanceBuffer::reinterpret_vec(output),
269            num_values: $unpacked.num_values,
270            block_info: BlockInfo::new(),
271        });
272
273        Result::Ok(EncodedArray {
274            data: packed,
275            encoding,
276        })
277    }};
278}
279
280#[derive(Debug)]
281pub struct BitpackedForNonNegArrayEncoder {
282    pub compressed_bit_width: usize,
283    pub original_data_type: DataType,
284}
285
286impl BitpackedForNonNegArrayEncoder {
287    pub fn new(compressed_bit_width: usize, data_type: DataType) -> Self {
288        Self {
289            compressed_bit_width,
290            original_data_type: data_type,
291        }
292    }
293}
294
295impl ArrayEncoder for BitpackedForNonNegArrayEncoder {
296    fn encode(
297        &self,
298        data: DataBlock,
299        data_type: &DataType,
300        buffer_index: &mut u32,
301    ) -> Result<EncodedArray> {
302        match data {
303            DataBlock::AllNull(_) => {
304                let encoding = ProtobufUtils::basic_all_null_encoding();
305                Ok(EncodedArray { data, encoding })
306            }
307            DataBlock::FixedWidth(mut unpacked) => {
308                match data_type {
309                    DataType::UInt8 | DataType::Int8 => encode_fixed_width!(self, unpacked, u8, buffer_index),
310                    DataType::UInt16 | DataType::Int16 => encode_fixed_width!(self, unpacked, u16, buffer_index),
311                    DataType::UInt32 | DataType::Int32 => encode_fixed_width!(self, unpacked, u32, buffer_index),
312                    DataType::UInt64 | DataType::Int64 => encode_fixed_width!(self, unpacked, u64, buffer_index),
313                    _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
314                }
315            }
316            DataBlock::Nullable(nullable) => {
317                let validity_buffer_index = *buffer_index;
318                *buffer_index += 1;
319
320                let validity_desc = ProtobufUtils::flat_encoding(
321                    1,
322                    validity_buffer_index,
323                    /*compression=*/ None,
324                );
325                let encoded_values: EncodedArray;
326                match *nullable.data {
327                    DataBlock::FixedWidth(mut unpacked) => {
328                        match data_type {
329                            DataType::UInt8 | DataType::Int8 => encoded_values = encode_fixed_width!(self, unpacked, u8, buffer_index)?,
330                            DataType::UInt16 | DataType::Int16 => encoded_values = encode_fixed_width!(self, unpacked, u16, buffer_index)?,
331                            DataType::UInt32 | DataType::Int32 => encoded_values = encode_fixed_width!(self, unpacked, u32, buffer_index)?,
332                            DataType::UInt64 | DataType::Int64 => encoded_values = encode_fixed_width!(self, unpacked, u64, buffer_index)?,
333                            _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
334                        }
335                    }
336                    _ => {
337                        return Err(Error::InvalidInput {
338                            source: "Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into(),
339                            location: location!(),
340                        });
341                    }
342                }
343                let encoding =
344                    ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
345                let encoded = DataBlock::Nullable(NullableDataBlock {
346                    data: Box::new(encoded_values.data),
347                    nulls: nullable.nulls,
348                    block_info: BlockInfo::new(),
349                });
350                Ok(EncodedArray {
351                    data: encoded,
352                    encoding,
353                })
354            }
355            _ => {
356                Err(Error::InvalidInput {
357                    source: "Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into(),
358                    location: location!(),
359                })
360            }
361        }
362    }
363}
364
365#[derive(Debug)]
366pub struct BitpackedForNonNegScheduler {
367    compressed_bit_width: u64,
368    uncompressed_bits_per_value: u64,
369    buffer_offset: u64,
370}
371
372impl BitpackedForNonNegScheduler {
373    pub fn new(
374        compressed_bit_width: u64,
375        uncompressed_bits_per_value: u64,
376        buffer_offset: u64,
377    ) -> Self {
378        Self {
379            compressed_bit_width,
380            uncompressed_bits_per_value,
381            buffer_offset,
382        }
383    }
384
385    fn locate_chunk_start(&self, relative_row_num: u64) -> u64 {
386        let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
387        self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size)
388    }
389
390    fn locate_chunk_end(&self, relative_row_num: u64) -> u64 {
391        let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
392        self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size) + chunk_size
393    }
394}
395
396impl PageScheduler for BitpackedForNonNegScheduler {
397    fn schedule_ranges(
398        &self,
399        ranges: &[std::ops::Range<u64>],
400        scheduler: &Arc<dyn crate::EncodingsIo>,
401        top_level_row: u64,
402    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
403        assert!(!ranges.is_empty());
404
405        let mut byte_ranges = vec![];
406
407        // map one bytes to multiple ranges, one bytes has at least one range corresponding to it
408        let mut bytes_idx_to_range_indices = vec![];
409        let first_byte_range = std::ops::Range {
410            start: self.locate_chunk_start(ranges[0].start),
411            end: self.locate_chunk_end(ranges[0].end - 1),
412        }; // the ranges are half-open
413        byte_ranges.push(first_byte_range);
414        bytes_idx_to_range_indices.push(vec![ranges[0].clone()]);
415
416        for (i, range) in ranges.iter().enumerate().skip(1) {
417            let this_start = self.locate_chunk_start(range.start);
418            let this_end = self.locate_chunk_end(range.end - 1);
419
420            // when the current range start is in the same chunk as the previous range's end, we colaesce this two bytes ranges
421            // when the current range start is not in the same chunk as the previous range's end, we create a new bytes range
422            if this_start == self.locate_chunk_start(ranges[i - 1].end - 1) {
423                byte_ranges.last_mut().unwrap().end = this_end;
424                bytes_idx_to_range_indices
425                    .last_mut()
426                    .unwrap()
427                    .push(range.clone());
428            } else {
429                byte_ranges.push(this_start..this_end);
430                bytes_idx_to_range_indices.push(vec![range.clone()]);
431            }
432        }
433
434        trace!(
435            "Scheduling I/O for {} ranges spread across byte range {}..{}",
436            byte_ranges.len(),
437            byte_ranges[0].start,
438            byte_ranges.last().unwrap().end
439        );
440
441        let bytes = scheduler.submit_request(byte_ranges.clone(), top_level_row);
442
443        // copy the necessary data from `self` to move into the async block
444        let compressed_bit_width = self.compressed_bit_width;
445        let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
446        let num_rows = ranges.iter().map(|range| range.end - range.start).sum();
447
448        async move {
449            let bytes = bytes.await?;
450            let decompressed_output = bitpacked_for_non_neg_decode(
451                compressed_bit_width,
452                uncompressed_bits_per_value,
453                &bytes,
454                &bytes_idx_to_range_indices,
455                num_rows,
456            );
457            Ok(Box::new(BitpackedForNonNegPageDecoder {
458                uncompressed_bits_per_value,
459                decompressed_buf: decompressed_output,
460            }) as Box<dyn PrimitivePageDecoder>)
461        }
462        .boxed()
463    }
464}
465
466#[derive(Debug)]
467struct BitpackedForNonNegPageDecoder {
468    // number of bits in the uncompressed value. E.g. this will be 32 for DataType::UInt32
469    uncompressed_bits_per_value: u64,
470
471    decompressed_buf: LanceBuffer,
472}
473
474impl PrimitivePageDecoder for BitpackedForNonNegPageDecoder {
475    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
476        if ![8, 16, 32, 64].contains(&self.uncompressed_bits_per_value) {
477            return Err(Error::InvalidInput {
478                source: "BitpackedForNonNegPageDecoder should only has uncompressed_bits_per_value of 8, 16, 32, or 64".into(),
479                location: location!(),
480            });
481        }
482
483        let elem_size_in_bytes = self.uncompressed_bits_per_value / 8;
484
485        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
486            data: self.decompressed_buf.slice_with_length(
487                (rows_to_skip * elem_size_in_bytes) as usize,
488                (num_rows * elem_size_in_bytes) as usize,
489            ),
490            bits_per_value: self.uncompressed_bits_per_value,
491            num_values: num_rows,
492            block_info: BlockInfo::new(),
493        }))
494    }
495}
496
497macro_rules! bitpacked_decode {
498    ($uncompressed_type:ty, $compressed_bit_width:expr, $data:expr, $bytes_idx_to_range_indices:expr, $num_rows:expr) => {{
499        let mut decompressed: Vec<$uncompressed_type> = Vec::with_capacity($num_rows as usize);
500        let packed_chunk_size_in_byte: usize = (ELEMS_PER_CHUNK * $compressed_bit_width) as usize / 8;
501        let mut decompress_chunk_buf = vec![0 as $uncompressed_type; ELEMS_PER_CHUNK as usize];
502
503        for (i, bytes) in $data.iter().enumerate() {
504            let mut ranges_idx = 0;
505            let mut curr_range_start = $bytes_idx_to_range_indices[i][0].start;
506            let mut chunk_num = 0;
507
508            while chunk_num * packed_chunk_size_in_byte < bytes.len() {
509                // Copy for memory alignment
510                // TODO: This copy should not be needed
511                let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
512                    [..packed_chunk_size_in_byte]
513                    .to_vec();
514                chunk_num += 1;
515                let chunk = cast_slice(&chunk_in_u8);
516                unsafe {
517                    BitPacking::unchecked_unpack(
518                        $compressed_bit_width as usize,
519                        chunk,
520                        &mut decompress_chunk_buf,
521                    );
522                }
523
524                loop {
525                    // Case 1: All the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
526                    let elems_after_curr_range_start_in_this_chunk =
527                        ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
528                    if curr_range_start + elems_after_curr_range_start_in_this_chunk
529                        <= $bytes_idx_to_range_indices[i][ranges_idx].end
530                    {
531                        decompressed.extend_from_slice(
532                            &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
533                        );
534                        curr_range_start += elems_after_curr_range_start_in_this_chunk;
535                        break;
536                    } else {
537                        // Case 2: Only part of the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
538                        let elems_this_range_needed_in_this_chunk =
539                            ($bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
540                                .min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
541                        decompressed.extend_from_slice(
542                            &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
543                                [..elems_this_range_needed_in_this_chunk as usize],
544                        );
545                        if curr_range_start + elems_this_range_needed_in_this_chunk
546                            == $bytes_idx_to_range_indices[i][ranges_idx].end
547                        {
548                            ranges_idx += 1;
549                            if ranges_idx == $bytes_idx_to_range_indices[i].len() {
550                                break;
551                            }
552                            curr_range_start = $bytes_idx_to_range_indices[i][ranges_idx].start;
553                        } else {
554                            curr_range_start += elems_this_range_needed_in_this_chunk;
555                        }
556                    }
557                }
558            }
559        }
560
561        LanceBuffer::reinterpret_vec(decompressed)
562    }};
563}
564
565fn bitpacked_for_non_neg_decode(
566    compressed_bit_width: u64,
567    uncompressed_bits_per_value: u64,
568    data: &[Bytes],
569    bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
570    num_rows: u64,
571) -> LanceBuffer {
572    match uncompressed_bits_per_value {
573        8 => bitpacked_decode!(
574            u8,
575            compressed_bit_width,
576            data,
577            bytes_idx_to_range_indices,
578            num_rows
579        ),
580        16 => bitpacked_decode!(
581            u16,
582            compressed_bit_width,
583            data,
584            bytes_idx_to_range_indices,
585            num_rows
586        ),
587        32 => bitpacked_decode!(
588            u32,
589            compressed_bit_width,
590            data,
591            bytes_idx_to_range_indices,
592            num_rows
593        ),
594        64 => bitpacked_decode!(
595            u64,
596            compressed_bit_width,
597            data,
598            bytes_idx_to_range_indices,
599            num_rows
600        ),
601        _ => unreachable!(
602            "bitpacked_for_non_neg_decode only supports 8, 16, 32, 64 uncompressed_bits_per_value"
603        ),
604    }
605}
606
607#[derive(Debug, Default)]
608pub struct InlineBitpacking {
609    uncompressed_bit_width: u64,
610}
611
612impl InlineBitpacking {
613    pub fn new(uncompressed_bit_width: u64) -> Self {
614        Self {
615            uncompressed_bit_width,
616        }
617    }
618
619    pub fn from_description(description: &pb::InlineBitpacking) -> Self {
620        Self {
621            uncompressed_bit_width: description.uncompressed_bits_per_value,
622        }
623    }
624
625    pub fn min_size_bytes(bit_width: u64) -> u64 {
626        (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
627    }
628
629    /// Bitpacks a FixedWidthDataBlock into compressed chunks of 1024 values
630    ///
631    /// Each chunk can have a different bit width
632    ///
633    /// Each chunk has the compressed bit width stored inline in the chunk itself.
634    fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
635        mut data: FixedWidthDataBlock,
636    ) -> MiniBlockCompressed {
637        let data_buffer = data.data.borrow_to_typed_slice::<T>();
638        let data_buffer = data_buffer.as_ref();
639
640        let bit_widths = data.expect_stat(Stat::BitWidth);
641        let bit_widths_array = bit_widths
642            .as_any()
643            .downcast_ref::<PrimitiveArray<UInt64Type>>()
644            .unwrap();
645
646        let (packed_chunk_sizes, total_size) = bit_widths_array
647            .values()
648            .iter()
649            .map(|&bit_width| {
650                let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
651                (chunk_size, chunk_size + 1)
652            })
653            .fold(
654                (Vec::with_capacity(bit_widths_array.len()), 0),
655                |(mut sizes, total), (size, inc)| {
656                    sizes.push(size);
657                    (sizes, total + inc)
658                },
659            );
660
661        let mut output: Vec<T> = Vec::with_capacity(total_size);
662        let mut chunks = Vec::with_capacity(bit_widths_array.len());
663
664        for (i, packed_chunk_size) in packed_chunk_sizes
665            .iter()
666            .enumerate()
667            .take(bit_widths_array.len() - 1)
668        {
669            let start_elem = i * ELEMS_PER_CHUNK as usize;
670            let bit_width = bit_widths_array.value(i) as usize;
671            output.push(T::from_usize(bit_width).unwrap());
672            let output_len = output.len();
673            unsafe {
674                output.set_len(output_len + *packed_chunk_size);
675                BitPacking::unchecked_pack(
676                    bit_width,
677                    &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
678                    &mut output[output_len..][..*packed_chunk_size],
679                );
680            }
681            chunks.push(MiniBlockChunk {
682                buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
683                log_num_values: LOG_ELEMS_PER_CHUNK,
684            });
685        }
686
687        // Handle the last chunk
688        let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
689            1024
690        } else {
691            data.num_values % ELEMS_PER_CHUNK
692        };
693        let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
694        last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
695            &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
696        );
697        let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
698        output.push(T::from_usize(bit_width).unwrap());
699        let output_len = output.len();
700        unsafe {
701            output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
702            BitPacking::unchecked_pack(
703                bit_width,
704                &last_chunk,
705                &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
706            );
707        }
708        chunks.push(MiniBlockChunk {
709            buffer_sizes: vec![
710                ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
711                    as u16,
712            ],
713            log_num_values: 0,
714        });
715
716        MiniBlockCompressed {
717            data: vec![LanceBuffer::reinterpret_vec(output)],
718            chunks,
719            num_values: data.num_values,
720        }
721    }
722
723    fn chunk_data(
724        &self,
725        data: FixedWidthDataBlock,
726    ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
727        assert!(data.bits_per_value % 8 == 0);
728        assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
729        let bits_per_value = data.bits_per_value;
730        let compressed = match bits_per_value {
731            8 => Self::bitpack_chunked::<u8>(data),
732            16 => Self::bitpack_chunked::<u16>(data),
733            32 => Self::bitpack_chunked::<u32>(data),
734            64 => Self::bitpack_chunked::<u64>(data),
735            _ => unreachable!(),
736        };
737        (compressed, ProtobufUtils::inline_bitpacking(bits_per_value))
738    }
739
740    fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
741        data: LanceBuffer,
742        num_values: u64,
743    ) -> Result<DataBlock> {
744        assert!(data.len() >= 8);
745        assert!(num_values <= ELEMS_PER_CHUNK);
746
747        // This macro decompresses a chunk(1024 values) of bitpacked values.
748        let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
749        let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
750
751        // Copy for memory alignment
752        let chunk_in_u8: Vec<u8> = data.to_vec();
753        let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
754        let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
755        let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
756
757        // The bit-packed chunk should have number of bytes (bit_width_value * ELEMS_PER_CHUNK / 8)
758        assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
759
760        unsafe {
761            BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
762        }
763
764        decompressed.truncate(num_values as usize);
765        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
766            data: LanceBuffer::reinterpret_vec(decompressed),
767            bits_per_value: uncompressed_bit_width as u64,
768            num_values,
769            block_info: BlockInfo::new(),
770        }))
771    }
772}
773
774impl MiniBlockCompressor for InlineBitpacking {
775    fn compress(
776        &self,
777        chunk: DataBlock,
778    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
779        match chunk {
780            DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
781            _ => Err(Error::InvalidInput {
782                source: format!(
783                    "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
784                    chunk.name()
785                )
786                .into(),
787                location: location!(),
788            }),
789        }
790    }
791}
792
793impl BlockCompressor for InlineBitpacking {
794    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
795        let fixed_width = data.as_fixed_width().unwrap();
796        let (chunked, _) = self.chunk_data(fixed_width);
797        Ok(chunked.data.into_iter().next().unwrap())
798    }
799}
800
801impl MiniBlockDecompressor for InlineBitpacking {
802    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
803        assert_eq!(data.len(), 1);
804        let data = data.into_iter().next().unwrap();
805        match self.uncompressed_bit_width {
806            8 => Self::unchunk::<u8>(data, num_values),
807            16 => Self::unchunk::<u16>(data, num_values),
808            32 => Self::unchunk::<u32>(data, num_values),
809            64 => Self::unchunk::<u64>(data, num_values),
810            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
811        }
812    }
813}
814
815impl BlockDecompressor for InlineBitpacking {
816    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
817        match self.uncompressed_bit_width {
818            8 => Self::unchunk::<u8>(data, num_values),
819            16 => Self::unchunk::<u16>(data, num_values),
820            32 => Self::unchunk::<u32>(data, num_values),
821            64 => Self::unchunk::<u64>(data, num_values),
822            _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
823        }
824    }
825}
826
827/// Bitpacks a FixedWidthDataBlock with a given bit width
828///
829/// This function is simpler as it does not do any chunking, but slightly less efficient.
830/// The compressed bits per value is constant across the entire buffer.
831///
832/// Note: even though we are not strictly "chunking" we are still operating on chunks of
833/// 1024 values because that's what the bitpacking primitives expect.  They just don't
834/// have a unique bit width for each chunk.
835fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
836    mut data: FixedWidthDataBlock,
837    bit_width: usize,
838) -> LanceBuffer {
839    let data_buffer = data.data.borrow_to_typed_slice::<T>();
840    let data_buffer = data_buffer.as_ref();
841
842    let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
843    let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
844    let words_per_chunk =
845        (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
846    #[allow(clippy::uninit_vec)]
847    let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
848    #[allow(clippy::uninit_vec)]
849    unsafe {
850        output.set_len(num_chunks * words_per_chunk);
851    }
852
853    let num_whole_chunks = if last_chunk_is_runt {
854        num_chunks - 1
855    } else {
856        num_chunks
857    };
858
859    // Simple case for complete chunks
860    for i in 0..num_whole_chunks {
861        let input_start = i * ELEMS_PER_CHUNK as usize;
862        let input_end = input_start + ELEMS_PER_CHUNK as usize;
863        let output_start = i * words_per_chunk;
864        let output_end = output_start + words_per_chunk;
865        unsafe {
866            BitPacking::unchecked_pack(
867                bit_width,
868                &data_buffer[input_start..input_end],
869                &mut output[output_start..output_end],
870            );
871        }
872    }
873
874    if !last_chunk_is_runt {
875        return LanceBuffer::reinterpret_vec(output);
876    }
877
878    // If we get here then the last chunk needs to be padded with zeros
879    let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
880    let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
881
882    let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
883    last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
884    let output_start = num_whole_chunks * words_per_chunk;
885    unsafe {
886        BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
887    }
888
889    LanceBuffer::reinterpret_vec(output)
890}
891
892/// Unpacks a FixedWidthDataBlock that has been bitpacked with a constant bit width
893fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
894    mut data: FixedWidthDataBlock,
895    num_values: usize,
896    bits_per_value: usize,
897) -> FixedWidthDataBlock {
898    let words_per_chunk =
899        (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
900    let compressed_words = data.data.borrow_to_typed_slice::<T>();
901
902    let num_chunks = data.num_values as usize / words_per_chunk;
903    debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
904
905    // This is slightly larger than num_values because the last chunk has some padding, we will truncate at the end
906    #[allow(clippy::uninit_vec)]
907    let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
908    #[allow(clippy::uninit_vec)]
909    unsafe {
910        decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
911    }
912
913    for chunk_idx in 0..num_chunks {
914        let input_start = chunk_idx * words_per_chunk;
915        let input_end = input_start + words_per_chunk;
916        let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
917        let output_end = output_start + ELEMS_PER_CHUNK as usize;
918        unsafe {
919            BitPacking::unchecked_unpack(
920                bits_per_value,
921                &compressed_words[input_start..input_end],
922                &mut decompressed[output_start..output_end],
923            );
924        }
925    }
926
927    decompressed.truncate(num_values);
928
929    FixedWidthDataBlock {
930        data: LanceBuffer::reinterpret_vec(decompressed),
931        bits_per_value: data.bits_per_value,
932        num_values: num_values as u64,
933        block_info: BlockInfo::new(),
934    }
935}
936
937/// A transparent compressor that bit packs data
938///
939/// In order for the encoding to be transparent we must have a fixed bit width
940/// across the entire array.  Chunking within the buffer is not supported.  This
941/// means that we will be slightly less efficient than something like the mini-block
942/// approach.
943///
944/// WARNING: DO NOT USE YET.
945///
946/// This was an interesting experiment but it can't be used as a per-value compressor
947/// at the moment.  The resulting data IS transparent but it's not quite so simple.  We
948/// compress in blocks of 1024 and each block has a fixed size but also has some padding.
949///
950/// In other words, if we try the simple math to access the item at index `i` we will be
951/// out of luck because `bits_per_value * i` is not the location.  What we need is something
952/// like:
953///
954/// ```ignore
955/// let chunk_idx = i / 1024;
956/// let chunk_offset = i % 1024;
957/// bits_per_chunk * chunk_idx + bits_per_value * chunk_offset
958/// ```
959///
960/// However, this logic isn't expressible with the per-value traits we have today.  We can
961/// enhance these traits should we need to support it at some point in the future.
962#[derive(Debug)]
963pub struct OutOfLineBitpacking {
964    compressed_bit_width: usize,
965}
966
967impl PerValueCompressor for OutOfLineBitpacking {
968    fn compress(
969        &self,
970        data: DataBlock,
971    ) -> Result<(crate::encoder::PerValueDataBlock, pb::ArrayEncoding)> {
972        let fixed_width = data.as_fixed_width().unwrap();
973        let num_values = fixed_width.num_values;
974        let word_size = fixed_width.bits_per_value;
975        let compressed = match word_size {
976            8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
977            16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
978            32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
979            64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
980            _ => panic!("Bitpacking word size must be 8,16,32,64"),
981        };
982        let compressed = FixedWidthDataBlock {
983            data: compressed,
984            bits_per_value: self.compressed_bit_width as u64,
985            num_values,
986            block_info: BlockInfo::new(),
987        };
988        let encoding =
989            ProtobufUtils::out_of_line_bitpacking(word_size, self.compressed_bit_width as u64);
990        Ok((PerValueDataBlock::Fixed(compressed), encoding))
991    }
992}
993
994impl FixedPerValueDecompressor for OutOfLineBitpacking {
995    fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
996        let unpacked = match data.bits_per_value {
997            8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
998            16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
999            32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
1000            64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
1001            _ => panic!("Bitpacking word size must be 8,16,32,64"),
1002        };
1003        Ok(DataBlock::FixedWidth(unpacked))
1004    }
1005
1006    fn bits_per_value(&self) -> u64 {
1007        self.compressed_bit_width as u64
1008    }
1009}
1010
1011#[cfg(test)]
1012mod test {
1013    use std::{collections::HashMap, sync::Arc};
1014
1015    use arrow_array::{Int64Array, Int8Array};
1016
1017    use arrow_schema::DataType;
1018
1019    use arrow_array::Array;
1020
1021    use crate::{
1022        testing::{check_round_trip_encoding_of_data, TestCases},
1023        version::LanceFileVersion,
1024    };
1025
1026    #[test_log::test(tokio::test)]
1027    async fn test_miniblock_bitpack() {
1028        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1029
1030        let arrays = vec![
1031            Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
1032            Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
1033            Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
1034            Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
1035            Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
1036        ];
1037        check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
1038
1039        for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
1040            let int64_arrays = vec![
1041                Int64Array::from(vec![3; 1024]),
1042                Int64Array::from(vec![8; 1024]),
1043                Int64Array::from(vec![16; 1024]),
1044                Int64Array::from(vec![100; 1024]),
1045                Int64Array::from(vec![512; 1024]),
1046                Int64Array::from(vec![1000; 1024]),
1047                Int64Array::from(vec![2000; 1024]),
1048                Int64Array::from(vec![-1; 10]),
1049            ];
1050
1051            let mut arrays = vec![];
1052            for int64_array in int64_arrays {
1053                arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
1054            }
1055            check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
1056        }
1057    }
1058}