Skip to main content

lance_encoding/previous/encodings/physical/
bitpack.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::types::{
7    Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
8};
9use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, cast::AsArray};
10use arrow_buffer::ArrowNativeType;
11use arrow_buffer::bit_util::ceil;
12use arrow_schema::DataType;
13use bytes::Bytes;
14use futures::future::{BoxFuture, FutureExt};
15use log::trace;
16use num_traits::{AsPrimitive, PrimInt};
17
18use lance_arrow::DataTypeExt;
19use lance_bitpacking::BitPacking;
20use lance_core::{Error, Result};
21
22use crate::buffer::LanceBuffer;
23use crate::data::BlockInfo;
24use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock};
25use crate::decoder::{PageScheduler, PrimitivePageDecoder};
26use crate::format::ProtobufUtils;
27use crate::previous::encoder::{ArrayEncoder, EncodedArray};
28use bytemuck::cast_slice;
29
30const LOG_ELEMS_PER_CHUNK: u8 = 10;
31const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
32
33// Compute the compressed_bit_width for a given array of integers
34// todo: compute all statistics before encoding
35// todo: see how to use rust macro to rewrite this function
36pub fn compute_compressed_bit_width_for_non_neg(arrays: &[ArrayRef]) -> u64 {
37    debug_assert!(!arrays.is_empty());
38
39    let res;
40
41    match arrays[0].data_type() {
42        DataType::UInt8 => {
43            let mut global_max: u8 = 0;
44            for array in arrays {
45                let primitive_array = array
46                    .as_any()
47                    .downcast_ref::<PrimitiveArray<UInt8Type>>()
48                    .unwrap();
49                let array_max = arrow_arith::aggregate::bit_or(primitive_array);
50                global_max = global_max.max(array_max.unwrap_or(0));
51            }
52            let num_bits =
53                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
54            // we will have constant encoding later
55            if num_bits == 0 {
56                res = 1;
57            } else {
58                res = num_bits;
59            }
60        }
61
62        DataType::Int8 => {
63            let mut global_max_width: u64 = 0;
64            for array in arrays {
65                let primitive_array = array
66                    .as_any()
67                    .downcast_ref::<PrimitiveArray<Int8Type>>()
68                    .unwrap();
69                let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
70                global_max_width = global_max_width.max(8 - array_max_width.leading_zeros() as u64);
71            }
72            if global_max_width == 0 {
73                res = 1;
74            } else {
75                res = global_max_width;
76            }
77        }
78
79        DataType::UInt16 => {
80            let mut global_max: u16 = 0;
81            for array in arrays {
82                let primitive_array = array
83                    .as_any()
84                    .downcast_ref::<PrimitiveArray<UInt16Type>>()
85                    .unwrap();
86                let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
87                global_max = global_max.max(array_max);
88            }
89            let num_bits =
90                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
91            if num_bits == 0 {
92                res = 1;
93            } else {
94                res = num_bits;
95            }
96        }
97
98        DataType::Int16 => {
99            let mut global_max_width: u64 = 0;
100            for array in arrays {
101                let primitive_array = array
102                    .as_any()
103                    .downcast_ref::<PrimitiveArray<Int16Type>>()
104                    .unwrap();
105                let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
106                global_max_width =
107                    global_max_width.max(16 - array_max_width.leading_zeros() as u64);
108            }
109            if global_max_width == 0 {
110                res = 1;
111            } else {
112                res = global_max_width;
113            }
114        }
115
116        DataType::UInt32 => {
117            let mut global_max: u32 = 0;
118            for array in arrays {
119                let primitive_array = array
120                    .as_any()
121                    .downcast_ref::<PrimitiveArray<UInt32Type>>()
122                    .unwrap();
123                let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
124                global_max = global_max.max(array_max);
125            }
126            let num_bits =
127                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
128            if num_bits == 0 {
129                res = 1;
130            } else {
131                res = num_bits;
132            }
133        }
134
135        DataType::Int32 => {
136            let mut global_max_width: u64 = 0;
137            for array in arrays {
138                let primitive_array = array
139                    .as_any()
140                    .downcast_ref::<PrimitiveArray<Int32Type>>()
141                    .unwrap();
142                let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
143                global_max_width =
144                    global_max_width.max(32 - array_max_width.leading_zeros() as u64);
145            }
146            if global_max_width == 0 {
147                res = 1;
148            } else {
149                res = global_max_width;
150            }
151        }
152
153        DataType::UInt64 => {
154            let mut global_max: u64 = 0;
155            for array in arrays {
156                let primitive_array = array
157                    .as_any()
158                    .downcast_ref::<PrimitiveArray<UInt64Type>>()
159                    .unwrap();
160                let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
161                global_max = global_max.max(array_max);
162            }
163            let num_bits =
164                arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
165            if num_bits == 0 {
166                res = 1;
167            } else {
168                res = num_bits;
169            }
170        }
171
172        DataType::Int64 => {
173            let mut global_max_width: u64 = 0;
174            for array in arrays {
175                let primitive_array = array
176                    .as_any()
177                    .downcast_ref::<PrimitiveArray<Int64Type>>()
178                    .unwrap();
179                let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
180                global_max_width =
181                    global_max_width.max(64 - array_max_width.leading_zeros() as u64);
182            }
183            if global_max_width == 0 {
184                res = 1;
185            } else {
186                res = global_max_width;
187            }
188        }
189        _ => {
190            panic!(
191                "BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"
192            );
193        }
194    };
195    res
196}
197
198// Bitpack integers using fastlanes algorithm, the input is sliced into chunks of 1024 integers, and bitpacked
199// chunk by chunk. when the input is not a multiple of 1024, the last chunk is padded with zeros, this is fine because
200// we also know the number of rows we have.
201// Here self is a borrow of BitpackedForNonNegArrayEncoder, unpacked is a mutable borrow of FixedWidthDataBlock,
202// data_type can be  one of u8, u16, u32, or u64.
203// buffer_index is a mutable borrow of u32, indicating the buffer index of the output EncodedArray.
204// It outputs an fastlanes bitpacked EncodedArray
205macro_rules! encode_fixed_width {
206    ($self:expr, $unpacked:expr, $data_type:ty, $buffer_index:expr) => {{
207        let num_chunks = $unpacked.num_values.div_ceil(ELEMS_PER_CHUNK);
208        let num_full_chunks = $unpacked.num_values / ELEMS_PER_CHUNK;
209        let uncompressed_bit_width = std::mem::size_of::<$data_type>() as u64 * 8;
210
211        // the output vector type is the same as the input type, for example, when input is u16, output is Vec<u16>
212        let packed_chunk_size = 1024 * $self.compressed_bit_width as usize / uncompressed_bit_width as usize;
213
214        let input_slice = $unpacked.data.borrow_to_typed_slice::<$data_type>();
215        let input = input_slice.as_ref();
216
217        let mut output = Vec::with_capacity(num_chunks as usize * packed_chunk_size);
218
219        // Loop over all but the last chunk.
220        (0..num_full_chunks).for_each(|i| {
221            let start_elem = (i * ELEMS_PER_CHUNK) as usize;
222
223            let output_len = output.len();
224            unsafe {
225                output.set_len(output_len + packed_chunk_size);
226                BitPacking::unchecked_pack(
227                    $self.compressed_bit_width,
228                    &input[start_elem..][..ELEMS_PER_CHUNK as usize],
229                    &mut output[output_len..][..packed_chunk_size],
230                );
231            }
232        });
233
234        if num_chunks != num_full_chunks {
235            let last_chunk_elem_num = $unpacked.num_values % ELEMS_PER_CHUNK;
236            let mut last_chunk = vec![0 as $data_type; ELEMS_PER_CHUNK as usize];
237            last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
238                &input[$unpacked.num_values as usize - last_chunk_elem_num as usize..],
239            );
240
241            let output_len = output.len();
242            unsafe {
243                output.set_len(output_len + packed_chunk_size);
244                BitPacking::unchecked_pack(
245                    $self.compressed_bit_width,
246                    &last_chunk,
247                    &mut output[output_len..][..packed_chunk_size],
248                );
249            }
250        }
251
252        let bitpacked_for_non_neg_buffer_index = *$buffer_index;
253        *$buffer_index += 1;
254
255        let encoding = ProtobufUtils::bitpacked_for_non_neg_encoding(
256            $self.compressed_bit_width as u64,
257            uncompressed_bit_width,
258            bitpacked_for_non_neg_buffer_index,
259        );
260        let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
261            bits_per_value: $self.compressed_bit_width as u64,
262            data: LanceBuffer::reinterpret_vec(output),
263            num_values: $unpacked.num_values,
264            block_info: BlockInfo::new(),
265        });
266
267        Result::Ok(EncodedArray {
268            data: packed,
269            encoding,
270        })
271    }};
272}
273
274#[derive(Debug)]
275pub struct BitpackedForNonNegArrayEncoder {
276    pub compressed_bit_width: usize,
277    pub original_data_type: DataType,
278}
279
280impl BitpackedForNonNegArrayEncoder {
281    pub fn new(compressed_bit_width: usize, data_type: DataType) -> Self {
282        Self {
283            compressed_bit_width,
284            original_data_type: data_type,
285        }
286    }
287}
288
289impl ArrayEncoder for BitpackedForNonNegArrayEncoder {
290    fn encode(
291        &self,
292        data: DataBlock,
293        data_type: &DataType,
294        buffer_index: &mut u32,
295    ) -> Result<EncodedArray> {
296        match data {
297            DataBlock::AllNull(_) => {
298                let encoding = ProtobufUtils::basic_all_null_encoding();
299                Ok(EncodedArray { data, encoding })
300            }
301            DataBlock::FixedWidth(unpacked) => {
302                match data_type {
303                    DataType::UInt8 | DataType::Int8 => encode_fixed_width!(self, unpacked, u8, buffer_index),
304                    DataType::UInt16 | DataType::Int16 => encode_fixed_width!(self, unpacked, u16, buffer_index),
305                    DataType::UInt32 | DataType::Int32 => encode_fixed_width!(self, unpacked, u32, buffer_index),
306                    DataType::UInt64 | DataType::Int64 => encode_fixed_width!(self, unpacked, u64, buffer_index),
307                    _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
308                }
309            }
310            DataBlock::Nullable(nullable) => {
311                let validity_buffer_index = *buffer_index;
312                *buffer_index += 1;
313
314                let validity_desc = ProtobufUtils::flat_encoding(
315                    1,
316                    validity_buffer_index,
317                    /*compression=*/ None,
318                );
319                let encoded_values: EncodedArray;
320                match *nullable.data {
321                    DataBlock::FixedWidth(unpacked) => {
322                        match data_type {
323                            DataType::UInt8 | DataType::Int8 => encoded_values = encode_fixed_width!(self, unpacked, u8, buffer_index)?,
324                            DataType::UInt16 | DataType::Int16 => encoded_values = encode_fixed_width!(self, unpacked, u16, buffer_index)?,
325                            DataType::UInt32 | DataType::Int32 => encoded_values = encode_fixed_width!(self, unpacked, u32, buffer_index)?,
326                            DataType::UInt64 | DataType::Int64 => encoded_values = encode_fixed_width!(self, unpacked, u64, buffer_index)?,
327                            _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
328                        }
329                    }
330                    _ => {
331                        return Err(Error::invalid_input_source("Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into()));
332                    }
333                }
334                let encoding =
335                    ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
336                let encoded = DataBlock::Nullable(NullableDataBlock {
337                    data: Box::new(encoded_values.data),
338                    nulls: nullable.nulls,
339                    block_info: BlockInfo::new(),
340                });
341                Ok(EncodedArray {
342                    data: encoded,
343                    encoding,
344                })
345            }
346            _ => {
347                Err(Error::invalid_input_source("Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into()))
348            }
349        }
350    }
351}
352
353#[derive(Debug)]
354pub struct BitpackedForNonNegScheduler {
355    compressed_bit_width: u64,
356    uncompressed_bits_per_value: u64,
357    buffer_offset: u64,
358}
359
360impl BitpackedForNonNegScheduler {
361    pub fn new(
362        compressed_bit_width: u64,
363        uncompressed_bits_per_value: u64,
364        buffer_offset: u64,
365    ) -> Self {
366        Self {
367            compressed_bit_width,
368            uncompressed_bits_per_value,
369            buffer_offset,
370        }
371    }
372
373    fn locate_chunk_start(&self, relative_row_num: u64) -> u64 {
374        let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
375        self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size)
376    }
377
378    fn locate_chunk_end(&self, relative_row_num: u64) -> u64 {
379        let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
380        self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size) + chunk_size
381    }
382}
383
384impl PageScheduler for BitpackedForNonNegScheduler {
385    fn schedule_ranges(
386        &self,
387        ranges: &[std::ops::Range<u64>],
388        scheduler: &Arc<dyn crate::EncodingsIo>,
389        top_level_row: u64,
390    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
391        assert!(!ranges.is_empty());
392
393        let mut byte_ranges = vec![];
394
395        // map one bytes to multiple ranges, one bytes has at least one range corresponding to it
396        let mut bytes_idx_to_range_indices = vec![];
397        let first_byte_range = std::ops::Range {
398            start: self.locate_chunk_start(ranges[0].start),
399            end: self.locate_chunk_end(ranges[0].end - 1),
400        }; // the ranges are half-open
401        byte_ranges.push(first_byte_range);
402        bytes_idx_to_range_indices.push(vec![ranges[0].clone()]);
403
404        for (i, range) in ranges.iter().enumerate().skip(1) {
405            let this_start = self.locate_chunk_start(range.start);
406            let this_end = self.locate_chunk_end(range.end - 1);
407
408            // when the current range start is in the same chunk as the previous range's end, we colaesce this two bytes ranges
409            // when the current range start is not in the same chunk as the previous range's end, we create a new bytes range
410            if this_start == self.locate_chunk_start(ranges[i - 1].end - 1) {
411                byte_ranges.last_mut().unwrap().end = this_end;
412                bytes_idx_to_range_indices
413                    .last_mut()
414                    .unwrap()
415                    .push(range.clone());
416            } else {
417                byte_ranges.push(this_start..this_end);
418                bytes_idx_to_range_indices.push(vec![range.clone()]);
419            }
420        }
421
422        trace!(
423            "Scheduling I/O for {} ranges spread across byte range {}..{}",
424            byte_ranges.len(),
425            byte_ranges[0].start,
426            byte_ranges.last().unwrap().end
427        );
428
429        let bytes = scheduler.submit_request(byte_ranges.clone(), top_level_row);
430
431        // copy the necessary data from `self` to move into the async block
432        let compressed_bit_width = self.compressed_bit_width;
433        let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
434        let num_rows = ranges.iter().map(|range| range.end - range.start).sum();
435
436        async move {
437            let bytes = bytes.await?;
438            let decompressed_output = bitpacked_for_non_neg_decode(
439                compressed_bit_width,
440                uncompressed_bits_per_value,
441                &bytes,
442                &bytes_idx_to_range_indices,
443                num_rows,
444            );
445            Ok(Box::new(BitpackedForNonNegPageDecoder {
446                uncompressed_bits_per_value,
447                decompressed_buf: decompressed_output,
448            }) as Box<dyn PrimitivePageDecoder>)
449        }
450        .boxed()
451    }
452}
453
454#[derive(Debug)]
455struct BitpackedForNonNegPageDecoder {
456    // number of bits in the uncompressed value. E.g. this will be 32 for DataType::UInt32
457    uncompressed_bits_per_value: u64,
458
459    decompressed_buf: LanceBuffer,
460}
461
462impl PrimitivePageDecoder for BitpackedForNonNegPageDecoder {
463    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
464        if ![8, 16, 32, 64].contains(&self.uncompressed_bits_per_value) {
465            return Err(Error::invalid_input_source("BitpackedForNonNegPageDecoder should only has uncompressed_bits_per_value of 8, 16, 32, or 64".into()));
466        }
467
468        let elem_size_in_bytes = self.uncompressed_bits_per_value / 8;
469
470        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
471            data: self.decompressed_buf.slice_with_length(
472                (rows_to_skip * elem_size_in_bytes) as usize,
473                (num_rows * elem_size_in_bytes) as usize,
474            ),
475            bits_per_value: self.uncompressed_bits_per_value,
476            num_values: num_rows,
477            block_info: BlockInfo::new(),
478        }))
479    }
480}
481
482macro_rules! bitpacked_decode {
483    ($uncompressed_type:ty, $compressed_bit_width:expr, $data:expr, $bytes_idx_to_range_indices:expr, $num_rows:expr) => {{
484        let mut decompressed: Vec<$uncompressed_type> = Vec::with_capacity($num_rows as usize);
485        let packed_chunk_size_in_byte: usize = (ELEMS_PER_CHUNK * $compressed_bit_width) as usize / 8;
486        let mut decompress_chunk_buf = vec![0 as $uncompressed_type; ELEMS_PER_CHUNK as usize];
487
488        for (i, bytes) in $data.iter().enumerate() {
489            let mut ranges_idx = 0;
490            let mut curr_range_start = $bytes_idx_to_range_indices[i][0].start;
491            let mut chunk_num = 0;
492
493            while chunk_num * packed_chunk_size_in_byte < bytes.len() {
494                // Copy for memory alignment
495                // TODO: This copy should not be needed
496                let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
497                    [..packed_chunk_size_in_byte]
498                    .to_vec();
499                chunk_num += 1;
500                let chunk = cast_slice(&chunk_in_u8);
501                unsafe {
502                    BitPacking::unchecked_unpack(
503                        $compressed_bit_width as usize,
504                        chunk,
505                        &mut decompress_chunk_buf,
506                    );
507                }
508
509                loop {
510                    // Case 1: All the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
511                    let elems_after_curr_range_start_in_this_chunk =
512                        ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
513                    if curr_range_start + elems_after_curr_range_start_in_this_chunk
514                        <= $bytes_idx_to_range_indices[i][ranges_idx].end
515                    {
516                        decompressed.extend_from_slice(
517                            &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
518                        );
519                        curr_range_start += elems_after_curr_range_start_in_this_chunk;
520                        break;
521                    } else {
522                        // Case 2: Only part of the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
523                        let elems_this_range_needed_in_this_chunk =
524                            ($bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
525                                .min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
526                        decompressed.extend_from_slice(
527                            &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
528                                [..elems_this_range_needed_in_this_chunk as usize],
529                        );
530                        if curr_range_start + elems_this_range_needed_in_this_chunk
531                            == $bytes_idx_to_range_indices[i][ranges_idx].end
532                        {
533                            ranges_idx += 1;
534                            if ranges_idx == $bytes_idx_to_range_indices[i].len() {
535                                break;
536                            }
537                            curr_range_start = $bytes_idx_to_range_indices[i][ranges_idx].start;
538                        } else {
539                            curr_range_start += elems_this_range_needed_in_this_chunk;
540                        }
541                    }
542                }
543            }
544        }
545
546        LanceBuffer::reinterpret_vec(decompressed)
547    }};
548}
549
550fn bitpacked_for_non_neg_decode(
551    compressed_bit_width: u64,
552    uncompressed_bits_per_value: u64,
553    data: &[Bytes],
554    bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
555    num_rows: u64,
556) -> LanceBuffer {
557    match uncompressed_bits_per_value {
558        8 => bitpacked_decode!(
559            u8,
560            compressed_bit_width,
561            data,
562            bytes_idx_to_range_indices,
563            num_rows
564        ),
565        16 => bitpacked_decode!(
566            u16,
567            compressed_bit_width,
568            data,
569            bytes_idx_to_range_indices,
570            num_rows
571        ),
572        32 => bitpacked_decode!(
573            u32,
574            compressed_bit_width,
575            data,
576            bytes_idx_to_range_indices,
577            num_rows
578        ),
579        64 => bitpacked_decode!(
580            u64,
581            compressed_bit_width,
582            data,
583            bytes_idx_to_range_indices,
584            num_rows
585        ),
586        _ => unreachable!(
587            "bitpacked_for_non_neg_decode only supports 8, 16, 32, 64 uncompressed_bits_per_value"
588        ),
589    }
590}
591
592#[derive(Debug)]
593pub struct BitpackParams {
594    pub num_bits: u64,
595
596    pub signed: bool,
597}
598
599// Compute the number of bits to use for each item, if this array can be encoded using
600// bitpacking encoding. Returns `None` if the type or array data is not supported.
601pub fn bitpack_params(arr: &dyn Array) -> Option<BitpackParams> {
602    match arr.data_type() {
603        DataType::UInt8 => bitpack_params_for_type::<UInt8Type>(arr.as_primitive()),
604        DataType::UInt16 => bitpack_params_for_type::<UInt16Type>(arr.as_primitive()),
605        DataType::UInt32 => bitpack_params_for_type::<UInt32Type>(arr.as_primitive()),
606        DataType::UInt64 => bitpack_params_for_type::<UInt64Type>(arr.as_primitive()),
607        DataType::Int8 => bitpack_params_for_signed_type::<Int8Type>(arr.as_primitive()),
608        DataType::Int16 => bitpack_params_for_signed_type::<Int16Type>(arr.as_primitive()),
609        DataType::Int32 => bitpack_params_for_signed_type::<Int32Type>(arr.as_primitive()),
610        DataType::Int64 => bitpack_params_for_signed_type::<Int64Type>(arr.as_primitive()),
611        // TODO -- eventually we could support temporal types as well
612        _ => None,
613    }
614}
615
616// Compute the number bits to use for bitpacking generically.
617// returns None if the array is empty or all nulls
618fn bitpack_params_for_type<T>(arr: &PrimitiveArray<T>) -> Option<BitpackParams>
619where
620    T: ArrowPrimitiveType,
621    T::Native: PrimInt + AsPrimitive<u64>,
622{
623    let max = arrow_arith::aggregate::bit_or(arr);
624    let num_bits =
625        max.map(|max| arr.data_type().byte_width() as u64 * 8 - max.leading_zeros() as u64);
626
627    // we can't bitpack into 0 bits, so the minimum is 1
628    num_bits
629        .map(|num_bits| num_bits.max(1))
630        .map(|bits| BitpackParams {
631            num_bits: bits,
632            signed: false,
633        })
634}
635
636/// determine the minimum number of bits that can be used to represent
637/// an array of signed values. It includes all the significant bits for
638/// the value + plus 1 bit to represent the sign. If there are no negative values
639/// then it will not add a signed bit
640fn bitpack_params_for_signed_type<T>(arr: &PrimitiveArray<T>) -> Option<BitpackParams>
641where
642    T: ArrowPrimitiveType,
643    T::Native: PrimInt + AsPrimitive<i64>,
644{
645    let mut add_signed_bit = false;
646    let mut min_leading_bits: Option<u64> = None;
647    for val in arr.iter() {
648        if val.is_none() {
649            continue;
650        }
651        let val = val.unwrap();
652        if min_leading_bits.is_none() {
653            min_leading_bits = Some(u64::MAX);
654        }
655
656        if val.to_i64().unwrap() < 0i64 {
657            min_leading_bits = min_leading_bits.map(|bits| bits.min(val.leading_ones() as u64));
658            add_signed_bit = true;
659        } else {
660            min_leading_bits = min_leading_bits.map(|bits| bits.min(val.leading_zeros() as u64));
661        }
662    }
663
664    let mut min_leading_bits = arr.data_type().byte_width() as u64 * 8 - min_leading_bits?;
665    if add_signed_bit {
666        // Need extra sign bit
667        min_leading_bits += 1;
668    }
669    // cannot bitpack into <1 bit
670    let num_bits = min_leading_bits.max(1);
671    Some(BitpackParams {
672        num_bits,
673        signed: add_signed_bit,
674    })
675}
676#[derive(Debug)]
677pub struct BitpackedArrayEncoder {
678    num_bits: u64,
679    signed_type: bool,
680}
681
682impl BitpackedArrayEncoder {
683    pub fn new(num_bits: u64, signed_type: bool) -> Self {
684        Self {
685            num_bits,
686            signed_type,
687        }
688    }
689}
690
691impl ArrayEncoder for BitpackedArrayEncoder {
692    fn encode(
693        &self,
694        data: DataBlock,
695        _data_type: &DataType,
696        buffer_index: &mut u32,
697    ) -> Result<EncodedArray> {
698        // calculate the total number of bytes we need to allocate for the destination.
699        // this will be the number of items in the source array times the number of bits.
700        let dst_bytes_total = ceil(data.num_values() as usize * self.num_bits as usize, 8);
701
702        let mut dst_buffer = vec![0u8; dst_bytes_total];
703        let mut dst_idx = 0;
704        let mut dst_offset = 0;
705
706        let DataBlock::FixedWidth(unpacked) = data else {
707            return Err(Error::invalid_input_source(
708                "Bitpacking only supports fixed width data blocks".into(),
709            ));
710        };
711
712        pack_bits(
713            &unpacked.data,
714            self.num_bits,
715            &mut dst_buffer,
716            &mut dst_idx,
717            &mut dst_offset,
718        );
719
720        let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
721            bits_per_value: self.num_bits,
722            data: LanceBuffer::from(dst_buffer),
723            num_values: unpacked.num_values,
724            block_info: BlockInfo::new(),
725        });
726
727        let bitpacked_buffer_index = *buffer_index;
728        *buffer_index += 1;
729
730        let encoding = ProtobufUtils::bitpacked_encoding(
731            self.num_bits,
732            unpacked.bits_per_value,
733            bitpacked_buffer_index,
734            self.signed_type,
735        );
736
737        Ok(EncodedArray {
738            data: packed,
739            encoding,
740        })
741    }
742}
743
744fn pack_bits(
745    src: &LanceBuffer,
746    num_bits: u64,
747    dst: &mut [u8],
748    dst_idx: &mut usize,
749    dst_offset: &mut u8,
750) {
751    let bit_len = src.len() as u64 * 8;
752
753    let mask = u64::MAX >> (64 - num_bits);
754
755    let mut src_idx = 0;
756    while src_idx < src.len() {
757        let mut curr_mask = mask;
758        let mut curr_src = src[src_idx] & curr_mask as u8;
759        let mut src_offset = 0;
760        let mut src_bits_written = 0;
761
762        while src_bits_written < num_bits {
763            dst[*dst_idx] += (curr_src >> src_offset) << *dst_offset as u64;
764            let bits_written = (num_bits - src_bits_written)
765                .min(8 - src_offset)
766                .min(8 - *dst_offset as u64);
767            src_bits_written += bits_written;
768            *dst_offset += bits_written as u8;
769            src_offset += bits_written;
770
771            if *dst_offset == 8 {
772                *dst_idx += 1;
773                *dst_offset = 0;
774            }
775
776            if src_offset == 8 {
777                src_idx += 1;
778                src_offset = 0;
779                curr_mask >>= 8;
780                if src_idx == src.len() {
781                    break;
782                }
783                curr_src = src[src_idx] & curr_mask as u8;
784            }
785        }
786
787        // advance source_offset to the next byte if we're not at the end..
788        // note that we don't need to do this if we wrote the full number of bits
789        // because source index would have been advanced by the inner loop above
790        if bit_len != num_bits {
791            let partial_bytes_written = ceil(num_bits as usize, 8);
792
793            // we also want to the next location in src, unless we wrote something
794            // byte-aligned in which case the logic above would have already advanced
795            let mut to_next_byte = 1;
796            if num_bits.is_multiple_of(8) {
797                to_next_byte = 0;
798            }
799
800            src_idx += src.len() - partial_bytes_written + to_next_byte;
801        }
802    }
803}
804
805// A physical scheduler for bitpacked buffers
806#[derive(Debug, Clone, Copy)]
807pub struct BitpackedScheduler {
808    bits_per_value: u64,
809    uncompressed_bits_per_value: u64,
810    buffer_offset: u64,
811    signed: bool,
812}
813
814impl BitpackedScheduler {
815    pub fn new(
816        bits_per_value: u64,
817        uncompressed_bits_per_value: u64,
818        buffer_offset: u64,
819        signed: bool,
820    ) -> Self {
821        Self {
822            bits_per_value,
823            uncompressed_bits_per_value,
824            buffer_offset,
825            signed,
826        }
827    }
828}
829
830impl PageScheduler for BitpackedScheduler {
831    fn schedule_ranges(
832        &self,
833        ranges: &[std::ops::Range<u64>],
834        scheduler: &Arc<dyn crate::EncodingsIo>,
835        top_level_row: u64,
836    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
837        let mut min = u64::MAX;
838        let mut max = 0;
839
840        let mut buffer_bit_start_offsets: Vec<u8> = vec![];
841        let mut buffer_bit_end_offsets: Vec<Option<u8>> = vec![];
842        let byte_ranges = ranges
843            .iter()
844            .map(|range| {
845                let start_byte_offset = range.start * self.bits_per_value / 8;
846                let mut end_byte_offset = range.end * self.bits_per_value / 8;
847                if !(range.end * self.bits_per_value).is_multiple_of(8) {
848                    // If the end of the range is not byte-aligned, we need to read one more byte
849                    end_byte_offset += 1;
850
851                    let end_bit_offset = range.end * self.bits_per_value % 8;
852                    buffer_bit_end_offsets.push(Some(end_bit_offset as u8));
853                } else {
854                    buffer_bit_end_offsets.push(None);
855                }
856
857                let start_bit_offset = range.start * self.bits_per_value % 8;
858                buffer_bit_start_offsets.push(start_bit_offset as u8);
859
860                let start = self.buffer_offset + start_byte_offset;
861                let end = self.buffer_offset + end_byte_offset;
862                min = min.min(start);
863                max = max.max(end);
864
865                start..end
866            })
867            .collect::<Vec<_>>();
868
869        trace!(
870            "Scheduling I/O for {} ranges spread across byte range {}..{}",
871            byte_ranges.len(),
872            min,
873            max
874        );
875
876        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
877
878        let bits_per_value = self.bits_per_value;
879        let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
880        let signed = self.signed;
881        async move {
882            let bytes = bytes.await?;
883            Ok(Box::new(BitpackedPageDecoder {
884                buffer_bit_start_offsets,
885                buffer_bit_end_offsets,
886                bits_per_value,
887                uncompressed_bits_per_value,
888                signed,
889                data: bytes,
890            }) as Box<dyn PrimitivePageDecoder>)
891        }
892        .boxed()
893    }
894}
895
896#[derive(Debug)]
897struct BitpackedPageDecoder {
898    // bit offsets of the first value within each buffer
899    buffer_bit_start_offsets: Vec<u8>,
900
901    // bit offsets of the last value within each buffer. e.g. if there was a buffer
902    // with 2 values, packed into 5 bits, this would be [Some(3)], indicating that
903    // the bits from the 3rd->8th bit in the last byte shouldn't be decoded.
904    buffer_bit_end_offsets: Vec<Option<u8>>,
905
906    // the number of bits used to represent a compressed value. E.g. if the max value
907    // in the page was 7 (0b111), then this will be 3
908    bits_per_value: u64,
909
910    // number of bits in the uncompressed value. E.g. this will be 32 for u32
911    uncompressed_bits_per_value: u64,
912
913    // whether or not to use the msb as a sign bit during decoding
914    signed: bool,
915
916    data: Vec<Bytes>,
917}
918
919impl PrimitivePageDecoder for BitpackedPageDecoder {
920    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
921        let num_bytes = self.uncompressed_bits_per_value / 8 * num_rows;
922        let mut dest = vec![0; num_bytes as usize];
923
924        // current maximum supported bits per value = 64
925        debug_assert!(self.bits_per_value <= 64);
926
927        let mut rows_to_skip = rows_to_skip;
928        let mut rows_taken = 0;
929        let byte_len = self.uncompressed_bits_per_value / 8;
930        let mut dst_idx = 0; // index for current byte being written to destination buffer
931
932        // create bit mask for source bits
933        let mask = u64::MAX >> (64 - self.bits_per_value);
934
935        for i in 0..self.data.len() {
936            let src = &self.data[i];
937            let (mut src_idx, mut src_offset) = match compute_start_offset(
938                rows_to_skip,
939                src.len(),
940                self.bits_per_value,
941                self.buffer_bit_start_offsets[i],
942                self.buffer_bit_end_offsets[i],
943            ) {
944                StartOffset::SkipFull(rows_to_skip_here) => {
945                    rows_to_skip -= rows_to_skip_here;
946                    continue;
947                }
948                StartOffset::SkipSome(buffer_start_offset) => (
949                    buffer_start_offset.index,
950                    buffer_start_offset.bit_offset as u64,
951                ),
952            };
953
954            while src_idx < src.len() && rows_taken < num_rows {
955                rows_taken += 1;
956                let mut curr_mask = mask; // copy mask
957
958                // current source byte being written to destination
959                let mut curr_src = src[src_idx] & (curr_mask << src_offset) as u8;
960
961                // how many bits from the current source value have been written to destination
962                let mut src_bits_written = 0;
963
964                // the offset within the current destination byte to write to
965                let mut dst_offset = 0;
966
967                let is_negative = is_encoded_item_negative(
968                    src,
969                    src_idx,
970                    src_offset,
971                    self.bits_per_value as usize,
972                );
973
974                while src_bits_written < self.bits_per_value {
975                    // write bits from current source byte into destination
976                    dest[dst_idx] += (curr_src >> src_offset) << dst_offset;
977                    let bits_written = (self.bits_per_value - src_bits_written)
978                        .min(8 - src_offset)
979                        .min(8 - dst_offset);
980                    src_bits_written += bits_written;
981                    dst_offset += bits_written;
982                    src_offset += bits_written;
983                    curr_mask >>= bits_written;
984
985                    if dst_offset == 8 {
986                        dst_idx += 1;
987                        dst_offset = 0;
988                    }
989
990                    if src_offset == 8 {
991                        src_idx += 1;
992                        src_offset = 0;
993                        if src_idx == src.len() {
994                            break;
995                        }
996                        curr_src = src[src_idx] & curr_mask as u8;
997                    }
998                }
999
1000                // if the type is signed, need to pad out the rest of the byte with 1s
1001                let mut negative_padded_current_byte = false;
1002                if self.signed && is_negative && dst_offset > 0 {
1003                    negative_padded_current_byte = true;
1004                    while dst_offset < 8 {
1005                        dest[dst_idx] |= 1 << dst_offset;
1006                        dst_offset += 1;
1007                    }
1008                }
1009
1010                // advance destination offset to the next location
1011                // note that we don't need to do this if we wrote the full number of bits
1012                // because source index would have been advanced by the inner loop above
1013                if self.uncompressed_bits_per_value != self.bits_per_value {
1014                    let partial_bytes_written = ceil(self.bits_per_value as usize, 8);
1015
1016                    // we also want to move one location to the next location in destination,
1017                    // unless we wrote something byte-aligned in which case the logic above
1018                    // would have already advanced dst_idx
1019                    let mut to_next_byte = 1;
1020                    if self.bits_per_value.is_multiple_of(8) {
1021                        to_next_byte = 0;
1022                    }
1023                    let next_dst_idx =
1024                        dst_idx + byte_len as usize - partial_bytes_written + to_next_byte;
1025
1026                    // pad remaining bytes with 1 for negative signed numbers
1027                    if self.signed && is_negative {
1028                        if !negative_padded_current_byte {
1029                            dest[dst_idx] = 0xFF;
1030                        }
1031                        for i in dest.iter_mut().take(next_dst_idx).skip(dst_idx + 1) {
1032                            *i = 0xFF;
1033                        }
1034                    }
1035
1036                    dst_idx = next_dst_idx;
1037                }
1038
1039                // If we've reached the last byte, there may be some extra bits from the
1040                // next value outside the range. We don't want to be taking those.
1041                if let Some(buffer_bit_end_offset) = self.buffer_bit_end_offsets[i]
1042                    && src_idx == src.len() - 1
1043                    && src_offset >= buffer_bit_end_offset as u64
1044                {
1045                    break;
1046                }
1047            }
1048        }
1049
1050        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
1051            data: LanceBuffer::from(dest),
1052            bits_per_value: self.uncompressed_bits_per_value,
1053            num_values: num_rows,
1054            block_info: BlockInfo::new(),
1055        }))
1056    }
1057}
1058
1059fn is_encoded_item_negative(src: &Bytes, src_idx: usize, src_offset: u64, num_bits: usize) -> bool {
1060    let mut last_byte_idx = src_idx + ((src_offset as usize + num_bits) / 8);
1061    let shift_amount = (src_offset as usize + num_bits) % 8;
1062    let shift_amount = if shift_amount == 0 {
1063        last_byte_idx -= 1;
1064        7
1065    } else {
1066        shift_amount - 1
1067    };
1068    let last_byte = src[last_byte_idx];
1069    let sign_bit_mask = 1 << shift_amount;
1070    let sign_bit = last_byte & sign_bit_mask;
1071
1072    sign_bit > 0
1073}
1074
1075#[derive(Debug, PartialEq)]
1076struct BufferStartOffset {
1077    index: usize,
1078    bit_offset: u8,
1079}
1080
1081#[derive(Debug, PartialEq)]
1082enum StartOffset {
1083    // skip the full buffer. The value is how many rows are skipped
1084    // by skipping the full buffer (e.g., # rows in buffer)
1085    SkipFull(u64),
1086
1087    // skip to some start offset in the buffer
1088    SkipSome(BufferStartOffset),
1089}
1090
1091/// compute how far ahead in this buffer should we skip ahead and start reading
1092///
1093/// * `rows_to_skip` - how many rows to skip
1094/// * `buffer_len` - length buf buffer (in bytes)
1095/// * `bits_per_value` - number of bits used to represent a single bitpacked value
1096/// * `buffer_start_bit_offset` - offset of the start of the first value within the
1097///   buffer's  first byte
1098/// * `buffer_end_bit_offset` - end bit of the last value within the buffer. Can be
1099///   `None` if the end of the last value is byte aligned with end of buffer.
1100fn compute_start_offset(
1101    rows_to_skip: u64,
1102    buffer_len: usize,
1103    bits_per_value: u64,
1104    buffer_start_bit_offset: u8,
1105    buffer_end_bit_offset: Option<u8>,
1106) -> StartOffset {
1107    let rows_in_buffer = rows_in_buffer(
1108        buffer_len,
1109        bits_per_value,
1110        buffer_start_bit_offset,
1111        buffer_end_bit_offset,
1112    );
1113    if rows_to_skip >= rows_in_buffer {
1114        return StartOffset::SkipFull(rows_in_buffer);
1115    }
1116
1117    let start_bit = rows_to_skip * bits_per_value + buffer_start_bit_offset as u64;
1118    let start_byte = start_bit / 8;
1119
1120    StartOffset::SkipSome(BufferStartOffset {
1121        index: start_byte as usize,
1122        bit_offset: (start_bit % 8) as u8,
1123    })
1124}
1125
1126/// calculates the number of rows in a buffer
1127fn rows_in_buffer(
1128    buffer_len: usize,
1129    bits_per_value: u64,
1130    buffer_start_bit_offset: u8,
1131    buffer_end_bit_offset: Option<u8>,
1132) -> u64 {
1133    let mut bits_in_buffer = (buffer_len * 8) as u64 - buffer_start_bit_offset as u64;
1134
1135    // if the end of the last value of the buffer isn't byte aligned, subtract the
1136    // end offset from the total number of bits in buffer
1137    if let Some(buffer_end_bit_offset) = buffer_end_bit_offset {
1138        bits_in_buffer -= (8 - buffer_end_bit_offset) as u64;
1139    }
1140
1141    bits_in_buffer / bits_per_value
1142}
1143
1144#[cfg(test)]
1145pub mod test {
1146    use crate::{
1147        format::pb,
1148        testing::{ArrayGeneratorProvider, TestCases, check_round_trip_encoding_generated},
1149        version::LanceFileVersion,
1150    };
1151
1152    use super::*;
1153    use std::{marker::PhantomData, sync::Arc};
1154
1155    use arrow_array::{
1156        ArrayRef, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
1157        UInt8Array, UInt16Array, UInt32Array, UInt64Array,
1158        types::{UInt8Type, UInt16Type},
1159    };
1160
1161    use arrow_schema::Field;
1162    use lance_datagen::{
1163        ArrayGenerator, ArrayGeneratorExt, RowCount,
1164        array::{fill, rand_with_distribution},
1165        gen_batch,
1166    };
1167    use rand::distr::Uniform;
1168
1169    #[test]
1170    fn test_bitpack_params() {
1171        fn gen_array(generator: Box<dyn ArrayGenerator>) -> ArrayRef {
1172            gen_batch()
1173                .anon_col(generator)
1174                .into_batch_rows(RowCount::from(10000))
1175                .unwrap()
1176                .column(0)
1177                .clone()
1178        }
1179
1180        macro_rules! do_test {
1181            ($num_bits:expr, $data_type:ident, $null_probability:expr) => {
1182                let max = 1 << $num_bits - 1;
1183                let mut arr =
1184                    gen_array(fill::<$data_type>(max).with_random_nulls($null_probability));
1185
1186                // ensure we don't randomly generate all nulls, that won't work
1187                while arr.null_count() == arr.len() {
1188                    arr = gen_array(fill::<$data_type>(max).with_random_nulls($null_probability));
1189                }
1190                let result = bitpack_params(arr.as_ref());
1191                assert!(result.is_some());
1192                assert_eq!($num_bits, result.unwrap().num_bits);
1193            };
1194        }
1195
1196        let test_cases = vec![
1197            (5u64, 0.0f64),
1198            (5u64, 0.9f64),
1199            (1u64, 0.0f64),
1200            (1u64, 0.5f64),
1201            (8u64, 0.0f64),
1202            (8u64, 0.5f64),
1203        ];
1204
1205        for (num_bits, null_probability) in &test_cases {
1206            do_test!(*num_bits, UInt8Type, *null_probability);
1207            do_test!(*num_bits, UInt16Type, *null_probability);
1208            do_test!(*num_bits, UInt32Type, *null_probability);
1209            do_test!(*num_bits, UInt64Type, *null_probability);
1210        }
1211
1212        // do some test cases that that will only work on larger types
1213        let test_cases = vec![
1214            (13u64, 0.0f64),
1215            (13u64, 0.5f64),
1216            (16u64, 0.0f64),
1217            (16u64, 0.5f64),
1218        ];
1219        for (num_bits, null_probability) in &test_cases {
1220            do_test!(*num_bits, UInt16Type, *null_probability);
1221            do_test!(*num_bits, UInt32Type, *null_probability);
1222            do_test!(*num_bits, UInt64Type, *null_probability);
1223        }
1224        let test_cases = vec![
1225            (25u64, 0.0f64),
1226            (25u64, 0.5f64),
1227            (32u64, 0.0f64),
1228            (32u64, 0.5f64),
1229        ];
1230        for (num_bits, null_probability) in &test_cases {
1231            do_test!(*num_bits, UInt32Type, *null_probability);
1232            do_test!(*num_bits, UInt64Type, *null_probability);
1233        }
1234        let test_cases = vec![
1235            (48u64, 0.0f64),
1236            (48u64, 0.5f64),
1237            (64u64, 0.0f64),
1238            (64u64, 0.5f64),
1239        ];
1240        for (num_bits, null_probability) in &test_cases {
1241            do_test!(*num_bits, UInt64Type, *null_probability);
1242        }
1243
1244        // test that it returns None for datatypes that don't support bitpacking
1245        let arr = Float64Array::from_iter_values(vec![0.1, 0.2, 0.3]);
1246        let result = bitpack_params(&arr);
1247        assert!(result.is_none());
1248    }
1249
1250    #[test]
1251    fn test_num_compressed_bits_signed_types() {
1252        let values = Int32Array::from(vec![1, 2, -7]);
1253        let arr = values;
1254        let result = bitpack_params(&arr);
1255        assert!(result.is_some());
1256        let result = result.unwrap();
1257        assert_eq!(4, result.num_bits);
1258        assert!(result.signed);
1259
1260        // check that it doesn't add a sign bit if it doesn't need to
1261        let values = Int32Array::from(vec![1, 2, 7]);
1262        let arr = values;
1263        let result = bitpack_params(&arr);
1264        assert!(result.is_some());
1265        let result = result.unwrap();
1266        assert_eq!(3, result.num_bits);
1267        assert!(!result.signed);
1268    }
1269
1270    #[test]
1271    fn test_rows_in_buffer() {
1272        let test_cases = vec![
1273            (5usize, 5u64, 0u8, None, 8u64),
1274            (2, 3, 0, Some(5), 4),
1275            (2, 3, 7, Some(6), 2),
1276        ];
1277
1278        for (
1279            buffer_len,
1280            bits_per_value,
1281            buffer_start_bit_offset,
1282            buffer_end_bit_offset,
1283            expected,
1284        ) in test_cases
1285        {
1286            let result = rows_in_buffer(
1287                buffer_len,
1288                bits_per_value,
1289                buffer_start_bit_offset,
1290                buffer_end_bit_offset,
1291            );
1292            assert_eq!(expected, result);
1293        }
1294    }
1295
1296    #[test]
1297    fn test_compute_start_offset() {
1298        let result = compute_start_offset(0, 5, 5, 0, None);
1299        assert_eq!(
1300            StartOffset::SkipSome(BufferStartOffset {
1301                index: 0,
1302                bit_offset: 0
1303            }),
1304            result
1305        );
1306
1307        let result = compute_start_offset(10, 5, 5, 0, None);
1308        assert_eq!(StartOffset::SkipFull(8), result);
1309    }
1310
1311    #[test_log::test(test)]
1312    fn test_will_bitpack_allowed_types_when_possible() {
1313        let test_cases: Vec<(DataType, ArrayRef, u64)> = vec![
1314            (
1315                DataType::UInt8,
1316                Arc::new(UInt8Array::from_iter_values(vec![0, 1, 2, 3, 4, 5])),
1317                3, // bits per value
1318            ),
1319            (
1320                DataType::UInt16,
1321                Arc::new(UInt16Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 8])),
1322                11,
1323            ),
1324            (
1325                DataType::UInt32,
1326                Arc::new(UInt32Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 16])),
1327                19,
1328            ),
1329            (
1330                DataType::UInt64,
1331                Arc::new(UInt64Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 32])),
1332                35,
1333            ),
1334            (
1335                DataType::Int8,
1336                Arc::new(Int8Array::from_iter_values(vec![0, 2, 3, 4, -5])),
1337                4,
1338            ),
1339            (
1340                // check it will not pack with signed bit if all values of signed type are positive
1341                DataType::Int8,
1342                Arc::new(Int8Array::from_iter_values(vec![0, 2, 3, 4, 5])),
1343                3,
1344            ),
1345            (
1346                DataType::Int16,
1347                Arc::new(Int16Array::from_iter_values(vec![0, 1, 2, 3, -4, 5 << 8])),
1348                12,
1349            ),
1350            (
1351                DataType::Int32,
1352                Arc::new(Int32Array::from_iter_values(vec![0, 1, 2, 3, 4, -5 << 16])),
1353                20,
1354            ),
1355            (
1356                DataType::Int64,
1357                Arc::new(Int64Array::from_iter_values(vec![
1358                    0,
1359                    1,
1360                    2,
1361                    -3,
1362                    -4,
1363                    -5 << 32,
1364                ])),
1365                36,
1366            ),
1367        ];
1368
1369        for (data_type, arr, bits_per_value) in test_cases {
1370            let mut buffed_index = 1;
1371            let params = bitpack_params(arr.as_ref()).unwrap();
1372            let encoder = BitpackedArrayEncoder {
1373                num_bits: params.num_bits,
1374                signed_type: params.signed,
1375            };
1376            let data = DataBlock::from_array(arr);
1377            let result = encoder.encode(data, &data_type, &mut buffed_index).unwrap();
1378
1379            let data = result.data.as_fixed_width().unwrap();
1380            assert_eq!(bits_per_value, data.bits_per_value);
1381
1382            let array_encoding = result.encoding.array_encoding.unwrap();
1383
1384            match array_encoding {
1385                pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
1386                    assert_eq!(bits_per_value, bitpacked.compressed_bits_per_value);
1387                    assert_eq!(
1388                        (data_type.byte_width() * 8) as u64,
1389                        bitpacked.uncompressed_bits_per_value
1390                    );
1391                }
1392                _ => {
1393                    panic!("Array did not use bitpacking encoding")
1394                }
1395            }
1396        }
1397
1398        // check it will otherwise use flat encoding
1399        let test_cases: Vec<(DataType, ArrayRef)> = vec![
1400            // it should use flat encoding for datatypes that don't support bitpacking
1401            (
1402                DataType::Float32,
1403                Arc::new(Float32Array::from_iter_values(vec![0.1, 0.2, 0.3])),
1404            ),
1405            // it should still use flat encoding if bitpacked encoding would be packed
1406            // into the full byte range
1407            (
1408                DataType::UInt8,
1409                Arc::new(UInt8Array::from_iter_values(vec![0, 1, 2, 3, 4, 250])),
1410            ),
1411            (
1412                DataType::UInt16,
1413                Arc::new(UInt16Array::from_iter_values(vec![0, 1, 2, 3, 4, 250 << 8])),
1414            ),
1415            (
1416                DataType::UInt32,
1417                Arc::new(UInt32Array::from_iter_values(vec![
1418                    0,
1419                    1,
1420                    2,
1421                    3,
1422                    4,
1423                    250 << 24,
1424                ])),
1425            ),
1426            (
1427                DataType::UInt64,
1428                Arc::new(UInt64Array::from_iter_values(vec![
1429                    0,
1430                    1,
1431                    2,
1432                    3,
1433                    4,
1434                    250 << 56,
1435                ])),
1436            ),
1437            (
1438                DataType::Int8,
1439                Arc::new(Int8Array::from_iter_values(vec![-100])),
1440            ),
1441            (
1442                DataType::Int16,
1443                Arc::new(Int16Array::from_iter_values(vec![-100 << 8])),
1444            ),
1445            (
1446                DataType::Int32,
1447                Arc::new(Int32Array::from_iter_values(vec![-100 << 24])),
1448            ),
1449            (
1450                DataType::Int64,
1451                Arc::new(Int64Array::from_iter_values(vec![-100 << 56])),
1452            ),
1453        ];
1454
1455        for (data_type, arr) in test_cases {
1456            if let Some(params) = bitpack_params(arr.as_ref()) {
1457                assert_eq!(params.num_bits, data_type.byte_width() as u64 * 8);
1458            }
1459        }
1460    }
1461
1462    struct DistributionArrayGeneratorProvider<
1463        DataType,
1464        Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1465    >
1466    where
1467        DataType::Native: Copy + 'static,
1468        PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1469        DataType: ArrowPrimitiveType,
1470    {
1471        phantom: PhantomData<DataType>,
1472        distribution: Dist,
1473    }
1474
1475    impl<DataType, Dist> DistributionArrayGeneratorProvider<DataType, Dist>
1476    where
1477        Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1478        DataType::Native: Copy + 'static,
1479        PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1480        DataType: ArrowPrimitiveType,
1481    {
1482        fn new(dist: Dist) -> Self {
1483            Self {
1484                distribution: dist,
1485                phantom: Default::default(),
1486            }
1487        }
1488    }
1489
1490    impl<DataType, Dist> ArrayGeneratorProvider for DistributionArrayGeneratorProvider<DataType, Dist>
1491    where
1492        Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1493        DataType::Native: Copy + 'static,
1494        PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1495        DataType: ArrowPrimitiveType,
1496    {
1497        fn provide(&self) -> Box<dyn ArrayGenerator> {
1498            rand_with_distribution::<DataType, Dist>(self.distribution.clone())
1499        }
1500
1501        fn copy(&self) -> Box<dyn ArrayGeneratorProvider> {
1502            Box::new(Self {
1503                phantom: self.phantom,
1504                distribution: self.distribution.clone(),
1505            })
1506        }
1507    }
1508
1509    #[test_log::test(tokio::test)]
1510    async fn test_bitpack_primitive() {
1511        let bitpacked_test_cases: &Vec<(DataType, Box<dyn ArrayGeneratorProvider>)> = &vec![
1512            // check less than one byte for multi-byte type
1513            (
1514                DataType::UInt32,
1515                Box::new(
1516                    DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1517                        Uniform::new(0, 19).unwrap(),
1518                    ),
1519                ),
1520            ),
1521            // // check that more than one byte for multi-byte type
1522            (
1523                DataType::UInt32,
1524                Box::new(
1525                    DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1526                        Uniform::new(5 << 7, 6 << 7).unwrap(),
1527                    ),
1528                ),
1529            ),
1530            (
1531                DataType::UInt64,
1532                Box::new(
1533                    DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1534                        Uniform::new(5 << 42, 6 << 42).unwrap(),
1535                    ),
1536                ),
1537            ),
1538            // check less than one byte for single-byte type
1539            (
1540                DataType::UInt8,
1541                Box::new(
1542                    DistributionArrayGeneratorProvider::<UInt8Type, Uniform<u8>>::new(
1543                        Uniform::new(0, 19).unwrap(),
1544                    ),
1545                ),
1546            ),
1547            // check less than one byte for single-byte type
1548            (
1549                DataType::UInt64,
1550                Box::new(
1551                    DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1552                        Uniform::new(129, 259).unwrap(),
1553                    ),
1554                ),
1555            ),
1556            // check byte aligned for single byte
1557            (
1558                DataType::UInt32,
1559                Box::new(
1560                    DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1561                        // this range should always give 8 bits
1562                        Uniform::new(200, 250).unwrap(),
1563                    ),
1564                ),
1565            ),
1566            // check where the num_bits divides evenly into the bit length of the type
1567            (
1568                DataType::UInt64,
1569                Box::new(
1570                    DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1571                        Uniform::new(1, 3).unwrap(), // 2 bits
1572                    ),
1573                ),
1574            ),
1575            // check byte aligned for multiple bytes
1576            (
1577                DataType::UInt32,
1578                Box::new(
1579                    DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1580                        // this range should always always give 16 bits
1581                        Uniform::new(200 << 8, 250 << 8).unwrap(),
1582                    ),
1583                ),
1584            ),
1585            // check byte aligned where the num bits doesn't divide evenly into the byte length
1586            (
1587                DataType::UInt64,
1588                Box::new(
1589                    DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1590                        // this range should always give 24 hits
1591                        Uniform::new(200 << 16, 250 << 16).unwrap(),
1592                    ),
1593                ),
1594            ),
1595            // check that we can still encode an all-0 array
1596            (
1597                DataType::UInt32,
1598                Box::new(
1599                    DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1600                        Uniform::new(0, 1).unwrap(),
1601                    ),
1602                ),
1603            ),
1604            // check for signed types
1605            (
1606                DataType::Int16,
1607                Box::new(
1608                    DistributionArrayGeneratorProvider::<Int16Type, Uniform<i16>>::new(
1609                        Uniform::new(-5, 5).unwrap(),
1610                    ),
1611                ),
1612            ),
1613            (
1614                DataType::Int64,
1615                Box::new(
1616                    DistributionArrayGeneratorProvider::<Int64Type, Uniform<i64>>::new(
1617                        Uniform::new(-(5 << 42), 6 << 42).unwrap(),
1618                    ),
1619                ),
1620            ),
1621            (
1622                DataType::Int32,
1623                Box::new(
1624                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1625                        Uniform::new(-(5 << 7), 6 << 7).unwrap(),
1626                    ),
1627                ),
1628            ),
1629            // check signed where packed to < 1 byte for multi-byte type
1630            (
1631                DataType::Int32,
1632                Box::new(
1633                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1634                        Uniform::new(-19, 19).unwrap(),
1635                    ),
1636                ),
1637            ),
1638            // check signed byte aligned to single byte
1639            (
1640                DataType::Int32,
1641                Box::new(
1642                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1643                        // this range should always give 8 bits
1644                        Uniform::new(-120, 120).unwrap(),
1645                    ),
1646                ),
1647            ),
1648            // check signed byte aligned to multiple bytes
1649            (
1650                DataType::Int32,
1651                Box::new(
1652                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1653                        // this range should always give 16 bits
1654                        Uniform::new(-120 << 8, 120 << 8).unwrap(),
1655                    ),
1656                ),
1657            ),
1658            // check that it works for all positive integers even if type is signed
1659            (
1660                DataType::Int32,
1661                Box::new(
1662                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1663                        Uniform::new(10, 20).unwrap(),
1664                    ),
1665                ),
1666            ),
1667            // check that all 0 works for signed type
1668            (
1669                DataType::Int32,
1670                Box::new(
1671                    DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1672                        Uniform::new(0, 1).unwrap(),
1673                    ),
1674                ),
1675            ),
1676        ];
1677
1678        for (data_type, array_gen_provider) in bitpacked_test_cases {
1679            let field = Field::new("", data_type.clone(), false);
1680            let test_cases = TestCases::basic().with_min_file_version(LanceFileVersion::V2_1);
1681            check_round_trip_encoding_generated(field, array_gen_provider.copy(), test_cases).await;
1682        }
1683    }
1684}