1use std::sync::Arc;
5
6use arrow_array::types::{
7 Int8Type, Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
8};
9use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray, cast::AsArray};
10use arrow_buffer::ArrowNativeType;
11use arrow_buffer::bit_util::ceil;
12use arrow_schema::DataType;
13use bytes::Bytes;
14use futures::future::{BoxFuture, FutureExt};
15use log::trace;
16use num_traits::{AsPrimitive, PrimInt};
17
18use lance_arrow::DataTypeExt;
19use lance_bitpacking::BitPacking;
20use lance_core::{Error, Result};
21
22use crate::buffer::LanceBuffer;
23use crate::data::BlockInfo;
24use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock};
25use crate::decoder::{PageScheduler, PrimitivePageDecoder};
26use crate::format::ProtobufUtils;
27use crate::previous::encoder::{ArrayEncoder, EncodedArray};
28use bytemuck::cast_slice;
29
30const LOG_ELEMS_PER_CHUNK: u8 = 10;
31const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
32
33pub fn compute_compressed_bit_width_for_non_neg(arrays: &[ArrayRef]) -> u64 {
37 debug_assert!(!arrays.is_empty());
38
39 let res;
40
41 match arrays[0].data_type() {
42 DataType::UInt8 => {
43 let mut global_max: u8 = 0;
44 for array in arrays {
45 let primitive_array = array
46 .as_any()
47 .downcast_ref::<PrimitiveArray<UInt8Type>>()
48 .unwrap();
49 let array_max = arrow_arith::aggregate::bit_or(primitive_array);
50 global_max = global_max.max(array_max.unwrap_or(0));
51 }
52 let num_bits =
53 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
54 if num_bits == 0 {
56 res = 1;
57 } else {
58 res = num_bits;
59 }
60 }
61
62 DataType::Int8 => {
63 let mut global_max_width: u64 = 0;
64 for array in arrays {
65 let primitive_array = array
66 .as_any()
67 .downcast_ref::<PrimitiveArray<Int8Type>>()
68 .unwrap();
69 let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
70 global_max_width = global_max_width.max(8 - array_max_width.leading_zeros() as u64);
71 }
72 if global_max_width == 0 {
73 res = 1;
74 } else {
75 res = global_max_width;
76 }
77 }
78
79 DataType::UInt16 => {
80 let mut global_max: u16 = 0;
81 for array in arrays {
82 let primitive_array = array
83 .as_any()
84 .downcast_ref::<PrimitiveArray<UInt16Type>>()
85 .unwrap();
86 let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
87 global_max = global_max.max(array_max);
88 }
89 let num_bits =
90 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
91 if num_bits == 0 {
92 res = 1;
93 } else {
94 res = num_bits;
95 }
96 }
97
98 DataType::Int16 => {
99 let mut global_max_width: u64 = 0;
100 for array in arrays {
101 let primitive_array = array
102 .as_any()
103 .downcast_ref::<PrimitiveArray<Int16Type>>()
104 .unwrap();
105 let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
106 global_max_width =
107 global_max_width.max(16 - array_max_width.leading_zeros() as u64);
108 }
109 if global_max_width == 0 {
110 res = 1;
111 } else {
112 res = global_max_width;
113 }
114 }
115
116 DataType::UInt32 => {
117 let mut global_max: u32 = 0;
118 for array in arrays {
119 let primitive_array = array
120 .as_any()
121 .downcast_ref::<PrimitiveArray<UInt32Type>>()
122 .unwrap();
123 let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
124 global_max = global_max.max(array_max);
125 }
126 let num_bits =
127 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
128 if num_bits == 0 {
129 res = 1;
130 } else {
131 res = num_bits;
132 }
133 }
134
135 DataType::Int32 => {
136 let mut global_max_width: u64 = 0;
137 for array in arrays {
138 let primitive_array = array
139 .as_any()
140 .downcast_ref::<PrimitiveArray<Int32Type>>()
141 .unwrap();
142 let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
143 global_max_width =
144 global_max_width.max(32 - array_max_width.leading_zeros() as u64);
145 }
146 if global_max_width == 0 {
147 res = 1;
148 } else {
149 res = global_max_width;
150 }
151 }
152
153 DataType::UInt64 => {
154 let mut global_max: u64 = 0;
155 for array in arrays {
156 let primitive_array = array
157 .as_any()
158 .downcast_ref::<PrimitiveArray<UInt64Type>>()
159 .unwrap();
160 let array_max = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
161 global_max = global_max.max(array_max);
162 }
163 let num_bits =
164 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
165 if num_bits == 0 {
166 res = 1;
167 } else {
168 res = num_bits;
169 }
170 }
171
172 DataType::Int64 => {
173 let mut global_max_width: u64 = 0;
174 for array in arrays {
175 let primitive_array = array
176 .as_any()
177 .downcast_ref::<PrimitiveArray<Int64Type>>()
178 .unwrap();
179 let array_max_width = arrow_arith::aggregate::bit_or(primitive_array).unwrap_or(0);
180 global_max_width =
181 global_max_width.max(64 - array_max_width.leading_zeros() as u64);
182 }
183 if global_max_width == 0 {
184 res = 1;
185 } else {
186 res = global_max_width;
187 }
188 }
189 _ => {
190 panic!(
191 "BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"
192 );
193 }
194 };
195 res
196}
197
198macro_rules! encode_fixed_width {
206 ($self:expr, $unpacked:expr, $data_type:ty, $buffer_index:expr) => {{
207 let num_chunks = $unpacked.num_values.div_ceil(ELEMS_PER_CHUNK);
208 let num_full_chunks = $unpacked.num_values / ELEMS_PER_CHUNK;
209 let uncompressed_bit_width = std::mem::size_of::<$data_type>() as u64 * 8;
210
211 let packed_chunk_size = 1024 * $self.compressed_bit_width as usize / uncompressed_bit_width as usize;
213
214 let input_slice = $unpacked.data.borrow_to_typed_slice::<$data_type>();
215 let input = input_slice.as_ref();
216
217 let mut output = Vec::with_capacity(num_chunks as usize * packed_chunk_size);
218
219 (0..num_full_chunks).for_each(|i| {
221 let start_elem = (i * ELEMS_PER_CHUNK) as usize;
222
223 let output_len = output.len();
224 unsafe {
225 output.set_len(output_len + packed_chunk_size);
226 BitPacking::unchecked_pack(
227 $self.compressed_bit_width,
228 &input[start_elem..][..ELEMS_PER_CHUNK as usize],
229 &mut output[output_len..][..packed_chunk_size],
230 );
231 }
232 });
233
234 if num_chunks != num_full_chunks {
235 let last_chunk_elem_num = $unpacked.num_values % ELEMS_PER_CHUNK;
236 let mut last_chunk = vec![0 as $data_type; ELEMS_PER_CHUNK as usize];
237 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
238 &input[$unpacked.num_values as usize - last_chunk_elem_num as usize..],
239 );
240
241 let output_len = output.len();
242 unsafe {
243 output.set_len(output_len + packed_chunk_size);
244 BitPacking::unchecked_pack(
245 $self.compressed_bit_width,
246 &last_chunk,
247 &mut output[output_len..][..packed_chunk_size],
248 );
249 }
250 }
251
252 let bitpacked_for_non_neg_buffer_index = *$buffer_index;
253 *$buffer_index += 1;
254
255 let encoding = ProtobufUtils::bitpacked_for_non_neg_encoding(
256 $self.compressed_bit_width as u64,
257 uncompressed_bit_width,
258 bitpacked_for_non_neg_buffer_index,
259 );
260 let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
261 bits_per_value: $self.compressed_bit_width as u64,
262 data: LanceBuffer::reinterpret_vec(output),
263 num_values: $unpacked.num_values,
264 block_info: BlockInfo::new(),
265 });
266
267 Result::Ok(EncodedArray {
268 data: packed,
269 encoding,
270 })
271 }};
272}
273
274#[derive(Debug)]
275pub struct BitpackedForNonNegArrayEncoder {
276 pub compressed_bit_width: usize,
277 pub original_data_type: DataType,
278}
279
280impl BitpackedForNonNegArrayEncoder {
281 pub fn new(compressed_bit_width: usize, data_type: DataType) -> Self {
282 Self {
283 compressed_bit_width,
284 original_data_type: data_type,
285 }
286 }
287}
288
289impl ArrayEncoder for BitpackedForNonNegArrayEncoder {
290 fn encode(
291 &self,
292 data: DataBlock,
293 data_type: &DataType,
294 buffer_index: &mut u32,
295 ) -> Result<EncodedArray> {
296 match data {
297 DataBlock::AllNull(_) => {
298 let encoding = ProtobufUtils::basic_all_null_encoding();
299 Ok(EncodedArray { data, encoding })
300 }
301 DataBlock::FixedWidth(unpacked) => {
302 match data_type {
303 DataType::UInt8 | DataType::Int8 => encode_fixed_width!(self, unpacked, u8, buffer_index),
304 DataType::UInt16 | DataType::Int16 => encode_fixed_width!(self, unpacked, u16, buffer_index),
305 DataType::UInt32 | DataType::Int32 => encode_fixed_width!(self, unpacked, u32, buffer_index),
306 DataType::UInt64 | DataType::Int64 => encode_fixed_width!(self, unpacked, u64, buffer_index),
307 _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
308 }
309 }
310 DataBlock::Nullable(nullable) => {
311 let validity_buffer_index = *buffer_index;
312 *buffer_index += 1;
313
314 let validity_desc = ProtobufUtils::flat_encoding(
315 1,
316 validity_buffer_index,
317 None,
318 );
319 let encoded_values: EncodedArray;
320 match *nullable.data {
321 DataBlock::FixedWidth(unpacked) => {
322 match data_type {
323 DataType::UInt8 | DataType::Int8 => encoded_values = encode_fixed_width!(self, unpacked, u8, buffer_index)?,
324 DataType::UInt16 | DataType::Int16 => encoded_values = encode_fixed_width!(self, unpacked, u16, buffer_index)?,
325 DataType::UInt32 | DataType::Int32 => encoded_values = encode_fixed_width!(self, unpacked, u32, buffer_index)?,
326 DataType::UInt64 | DataType::Int64 => encoded_values = encode_fixed_width!(self, unpacked, u64, buffer_index)?,
327 _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
328 }
329 }
330 _ => {
331 return Err(Error::invalid_input_source("Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into()));
332 }
333 }
334 let encoding =
335 ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
336 let encoded = DataBlock::Nullable(NullableDataBlock {
337 data: Box::new(encoded_values.data),
338 nulls: nullable.nulls,
339 block_info: BlockInfo::new(),
340 });
341 Ok(EncodedArray {
342 data: encoded,
343 encoding,
344 })
345 }
346 _ => {
347 Err(Error::invalid_input_source("Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into()))
348 }
349 }
350 }
351}
352
353#[derive(Debug)]
354pub struct BitpackedForNonNegScheduler {
355 compressed_bit_width: u64,
356 uncompressed_bits_per_value: u64,
357 buffer_offset: u64,
358}
359
360impl BitpackedForNonNegScheduler {
361 pub fn new(
362 compressed_bit_width: u64,
363 uncompressed_bits_per_value: u64,
364 buffer_offset: u64,
365 ) -> Self {
366 Self {
367 compressed_bit_width,
368 uncompressed_bits_per_value,
369 buffer_offset,
370 }
371 }
372
373 fn locate_chunk_start(&self, relative_row_num: u64) -> u64 {
374 let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
375 self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size)
376 }
377
378 fn locate_chunk_end(&self, relative_row_num: u64) -> u64 {
379 let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
380 self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size) + chunk_size
381 }
382}
383
384impl PageScheduler for BitpackedForNonNegScheduler {
385 fn schedule_ranges(
386 &self,
387 ranges: &[std::ops::Range<u64>],
388 scheduler: &Arc<dyn crate::EncodingsIo>,
389 top_level_row: u64,
390 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
391 assert!(!ranges.is_empty());
392
393 let mut byte_ranges = vec![];
394
395 let mut bytes_idx_to_range_indices = vec![];
397 let first_byte_range = std::ops::Range {
398 start: self.locate_chunk_start(ranges[0].start),
399 end: self.locate_chunk_end(ranges[0].end - 1),
400 }; byte_ranges.push(first_byte_range);
402 bytes_idx_to_range_indices.push(vec![ranges[0].clone()]);
403
404 for (i, range) in ranges.iter().enumerate().skip(1) {
405 let this_start = self.locate_chunk_start(range.start);
406 let this_end = self.locate_chunk_end(range.end - 1);
407
408 if this_start == self.locate_chunk_start(ranges[i - 1].end - 1) {
411 byte_ranges.last_mut().unwrap().end = this_end;
412 bytes_idx_to_range_indices
413 .last_mut()
414 .unwrap()
415 .push(range.clone());
416 } else {
417 byte_ranges.push(this_start..this_end);
418 bytes_idx_to_range_indices.push(vec![range.clone()]);
419 }
420 }
421
422 trace!(
423 "Scheduling I/O for {} ranges spread across byte range {}..{}",
424 byte_ranges.len(),
425 byte_ranges[0].start,
426 byte_ranges.last().unwrap().end
427 );
428
429 let bytes = scheduler.submit_request(byte_ranges.clone(), top_level_row);
430
431 let compressed_bit_width = self.compressed_bit_width;
433 let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
434 let num_rows = ranges.iter().map(|range| range.end - range.start).sum();
435
436 async move {
437 let bytes = bytes.await?;
438 let decompressed_output = bitpacked_for_non_neg_decode(
439 compressed_bit_width,
440 uncompressed_bits_per_value,
441 &bytes,
442 &bytes_idx_to_range_indices,
443 num_rows,
444 );
445 Ok(Box::new(BitpackedForNonNegPageDecoder {
446 uncompressed_bits_per_value,
447 decompressed_buf: decompressed_output,
448 }) as Box<dyn PrimitivePageDecoder>)
449 }
450 .boxed()
451 }
452}
453
454#[derive(Debug)]
455struct BitpackedForNonNegPageDecoder {
456 uncompressed_bits_per_value: u64,
458
459 decompressed_buf: LanceBuffer,
460}
461
462impl PrimitivePageDecoder for BitpackedForNonNegPageDecoder {
463 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
464 if ![8, 16, 32, 64].contains(&self.uncompressed_bits_per_value) {
465 return Err(Error::invalid_input_source("BitpackedForNonNegPageDecoder should only has uncompressed_bits_per_value of 8, 16, 32, or 64".into()));
466 }
467
468 let elem_size_in_bytes = self.uncompressed_bits_per_value / 8;
469
470 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
471 data: self.decompressed_buf.slice_with_length(
472 (rows_to_skip * elem_size_in_bytes) as usize,
473 (num_rows * elem_size_in_bytes) as usize,
474 ),
475 bits_per_value: self.uncompressed_bits_per_value,
476 num_values: num_rows,
477 block_info: BlockInfo::new(),
478 }))
479 }
480}
481
482macro_rules! bitpacked_decode {
483 ($uncompressed_type:ty, $compressed_bit_width:expr, $data:expr, $bytes_idx_to_range_indices:expr, $num_rows:expr) => {{
484 let mut decompressed: Vec<$uncompressed_type> = Vec::with_capacity($num_rows as usize);
485 let packed_chunk_size_in_byte: usize = (ELEMS_PER_CHUNK * $compressed_bit_width) as usize / 8;
486 let mut decompress_chunk_buf = vec![0 as $uncompressed_type; ELEMS_PER_CHUNK as usize];
487
488 for (i, bytes) in $data.iter().enumerate() {
489 let mut ranges_idx = 0;
490 let mut curr_range_start = $bytes_idx_to_range_indices[i][0].start;
491 let mut chunk_num = 0;
492
493 while chunk_num * packed_chunk_size_in_byte < bytes.len() {
494 let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
497 [..packed_chunk_size_in_byte]
498 .to_vec();
499 chunk_num += 1;
500 let chunk = cast_slice(&chunk_in_u8);
501 unsafe {
502 BitPacking::unchecked_unpack(
503 $compressed_bit_width as usize,
504 chunk,
505 &mut decompress_chunk_buf,
506 );
507 }
508
509 loop {
510 let elems_after_curr_range_start_in_this_chunk =
512 ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
513 if curr_range_start + elems_after_curr_range_start_in_this_chunk
514 <= $bytes_idx_to_range_indices[i][ranges_idx].end
515 {
516 decompressed.extend_from_slice(
517 &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
518 );
519 curr_range_start += elems_after_curr_range_start_in_this_chunk;
520 break;
521 } else {
522 let elems_this_range_needed_in_this_chunk =
524 ($bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
525 .min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
526 decompressed.extend_from_slice(
527 &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
528 [..elems_this_range_needed_in_this_chunk as usize],
529 );
530 if curr_range_start + elems_this_range_needed_in_this_chunk
531 == $bytes_idx_to_range_indices[i][ranges_idx].end
532 {
533 ranges_idx += 1;
534 if ranges_idx == $bytes_idx_to_range_indices[i].len() {
535 break;
536 }
537 curr_range_start = $bytes_idx_to_range_indices[i][ranges_idx].start;
538 } else {
539 curr_range_start += elems_this_range_needed_in_this_chunk;
540 }
541 }
542 }
543 }
544 }
545
546 LanceBuffer::reinterpret_vec(decompressed)
547 }};
548}
549
550fn bitpacked_for_non_neg_decode(
551 compressed_bit_width: u64,
552 uncompressed_bits_per_value: u64,
553 data: &[Bytes],
554 bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
555 num_rows: u64,
556) -> LanceBuffer {
557 match uncompressed_bits_per_value {
558 8 => bitpacked_decode!(
559 u8,
560 compressed_bit_width,
561 data,
562 bytes_idx_to_range_indices,
563 num_rows
564 ),
565 16 => bitpacked_decode!(
566 u16,
567 compressed_bit_width,
568 data,
569 bytes_idx_to_range_indices,
570 num_rows
571 ),
572 32 => bitpacked_decode!(
573 u32,
574 compressed_bit_width,
575 data,
576 bytes_idx_to_range_indices,
577 num_rows
578 ),
579 64 => bitpacked_decode!(
580 u64,
581 compressed_bit_width,
582 data,
583 bytes_idx_to_range_indices,
584 num_rows
585 ),
586 _ => unreachable!(
587 "bitpacked_for_non_neg_decode only supports 8, 16, 32, 64 uncompressed_bits_per_value"
588 ),
589 }
590}
591
592#[derive(Debug)]
593pub struct BitpackParams {
594 pub num_bits: u64,
595
596 pub signed: bool,
597}
598
599pub fn bitpack_params(arr: &dyn Array) -> Option<BitpackParams> {
602 match arr.data_type() {
603 DataType::UInt8 => bitpack_params_for_type::<UInt8Type>(arr.as_primitive()),
604 DataType::UInt16 => bitpack_params_for_type::<UInt16Type>(arr.as_primitive()),
605 DataType::UInt32 => bitpack_params_for_type::<UInt32Type>(arr.as_primitive()),
606 DataType::UInt64 => bitpack_params_for_type::<UInt64Type>(arr.as_primitive()),
607 DataType::Int8 => bitpack_params_for_signed_type::<Int8Type>(arr.as_primitive()),
608 DataType::Int16 => bitpack_params_for_signed_type::<Int16Type>(arr.as_primitive()),
609 DataType::Int32 => bitpack_params_for_signed_type::<Int32Type>(arr.as_primitive()),
610 DataType::Int64 => bitpack_params_for_signed_type::<Int64Type>(arr.as_primitive()),
611 _ => None,
613 }
614}
615
616fn bitpack_params_for_type<T>(arr: &PrimitiveArray<T>) -> Option<BitpackParams>
619where
620 T: ArrowPrimitiveType,
621 T::Native: PrimInt + AsPrimitive<u64>,
622{
623 let max = arrow_arith::aggregate::bit_or(arr);
624 let num_bits =
625 max.map(|max| arr.data_type().byte_width() as u64 * 8 - max.leading_zeros() as u64);
626
627 num_bits
629 .map(|num_bits| num_bits.max(1))
630 .map(|bits| BitpackParams {
631 num_bits: bits,
632 signed: false,
633 })
634}
635
636fn bitpack_params_for_signed_type<T>(arr: &PrimitiveArray<T>) -> Option<BitpackParams>
641where
642 T: ArrowPrimitiveType,
643 T::Native: PrimInt + AsPrimitive<i64>,
644{
645 let mut add_signed_bit = false;
646 let mut min_leading_bits: Option<u64> = None;
647 for val in arr.iter() {
648 if val.is_none() {
649 continue;
650 }
651 let val = val.unwrap();
652 if min_leading_bits.is_none() {
653 min_leading_bits = Some(u64::MAX);
654 }
655
656 if val.to_i64().unwrap() < 0i64 {
657 min_leading_bits = min_leading_bits.map(|bits| bits.min(val.leading_ones() as u64));
658 add_signed_bit = true;
659 } else {
660 min_leading_bits = min_leading_bits.map(|bits| bits.min(val.leading_zeros() as u64));
661 }
662 }
663
664 let mut min_leading_bits = arr.data_type().byte_width() as u64 * 8 - min_leading_bits?;
665 if add_signed_bit {
666 min_leading_bits += 1;
668 }
669 let num_bits = min_leading_bits.max(1);
671 Some(BitpackParams {
672 num_bits,
673 signed: add_signed_bit,
674 })
675}
676#[derive(Debug)]
677pub struct BitpackedArrayEncoder {
678 num_bits: u64,
679 signed_type: bool,
680}
681
682impl BitpackedArrayEncoder {
683 pub fn new(num_bits: u64, signed_type: bool) -> Self {
684 Self {
685 num_bits,
686 signed_type,
687 }
688 }
689}
690
691impl ArrayEncoder for BitpackedArrayEncoder {
692 fn encode(
693 &self,
694 data: DataBlock,
695 _data_type: &DataType,
696 buffer_index: &mut u32,
697 ) -> Result<EncodedArray> {
698 let dst_bytes_total = ceil(data.num_values() as usize * self.num_bits as usize, 8);
701
702 let mut dst_buffer = vec![0u8; dst_bytes_total];
703 let mut dst_idx = 0;
704 let mut dst_offset = 0;
705
706 let DataBlock::FixedWidth(unpacked) = data else {
707 return Err(Error::invalid_input_source(
708 "Bitpacking only supports fixed width data blocks".into(),
709 ));
710 };
711
712 pack_bits(
713 &unpacked.data,
714 self.num_bits,
715 &mut dst_buffer,
716 &mut dst_idx,
717 &mut dst_offset,
718 );
719
720 let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
721 bits_per_value: self.num_bits,
722 data: LanceBuffer::from(dst_buffer),
723 num_values: unpacked.num_values,
724 block_info: BlockInfo::new(),
725 });
726
727 let bitpacked_buffer_index = *buffer_index;
728 *buffer_index += 1;
729
730 let encoding = ProtobufUtils::bitpacked_encoding(
731 self.num_bits,
732 unpacked.bits_per_value,
733 bitpacked_buffer_index,
734 self.signed_type,
735 );
736
737 Ok(EncodedArray {
738 data: packed,
739 encoding,
740 })
741 }
742}
743
744fn pack_bits(
745 src: &LanceBuffer,
746 num_bits: u64,
747 dst: &mut [u8],
748 dst_idx: &mut usize,
749 dst_offset: &mut u8,
750) {
751 let bit_len = src.len() as u64 * 8;
752
753 let mask = u64::MAX >> (64 - num_bits);
754
755 let mut src_idx = 0;
756 while src_idx < src.len() {
757 let mut curr_mask = mask;
758 let mut curr_src = src[src_idx] & curr_mask as u8;
759 let mut src_offset = 0;
760 let mut src_bits_written = 0;
761
762 while src_bits_written < num_bits {
763 dst[*dst_idx] += (curr_src >> src_offset) << *dst_offset as u64;
764 let bits_written = (num_bits - src_bits_written)
765 .min(8 - src_offset)
766 .min(8 - *dst_offset as u64);
767 src_bits_written += bits_written;
768 *dst_offset += bits_written as u8;
769 src_offset += bits_written;
770
771 if *dst_offset == 8 {
772 *dst_idx += 1;
773 *dst_offset = 0;
774 }
775
776 if src_offset == 8 {
777 src_idx += 1;
778 src_offset = 0;
779 curr_mask >>= 8;
780 if src_idx == src.len() {
781 break;
782 }
783 curr_src = src[src_idx] & curr_mask as u8;
784 }
785 }
786
787 if bit_len != num_bits {
791 let partial_bytes_written = ceil(num_bits as usize, 8);
792
793 let mut to_next_byte = 1;
796 if num_bits.is_multiple_of(8) {
797 to_next_byte = 0;
798 }
799
800 src_idx += src.len() - partial_bytes_written + to_next_byte;
801 }
802 }
803}
804
805#[derive(Debug, Clone, Copy)]
807pub struct BitpackedScheduler {
808 bits_per_value: u64,
809 uncompressed_bits_per_value: u64,
810 buffer_offset: u64,
811 signed: bool,
812}
813
814impl BitpackedScheduler {
815 pub fn new(
816 bits_per_value: u64,
817 uncompressed_bits_per_value: u64,
818 buffer_offset: u64,
819 signed: bool,
820 ) -> Self {
821 Self {
822 bits_per_value,
823 uncompressed_bits_per_value,
824 buffer_offset,
825 signed,
826 }
827 }
828}
829
830impl PageScheduler for BitpackedScheduler {
831 fn schedule_ranges(
832 &self,
833 ranges: &[std::ops::Range<u64>],
834 scheduler: &Arc<dyn crate::EncodingsIo>,
835 top_level_row: u64,
836 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
837 let mut min = u64::MAX;
838 let mut max = 0;
839
840 let mut buffer_bit_start_offsets: Vec<u8> = vec![];
841 let mut buffer_bit_end_offsets: Vec<Option<u8>> = vec![];
842 let byte_ranges = ranges
843 .iter()
844 .map(|range| {
845 let start_byte_offset = range.start * self.bits_per_value / 8;
846 let mut end_byte_offset = range.end * self.bits_per_value / 8;
847 if !(range.end * self.bits_per_value).is_multiple_of(8) {
848 end_byte_offset += 1;
850
851 let end_bit_offset = range.end * self.bits_per_value % 8;
852 buffer_bit_end_offsets.push(Some(end_bit_offset as u8));
853 } else {
854 buffer_bit_end_offsets.push(None);
855 }
856
857 let start_bit_offset = range.start * self.bits_per_value % 8;
858 buffer_bit_start_offsets.push(start_bit_offset as u8);
859
860 let start = self.buffer_offset + start_byte_offset;
861 let end = self.buffer_offset + end_byte_offset;
862 min = min.min(start);
863 max = max.max(end);
864
865 start..end
866 })
867 .collect::<Vec<_>>();
868
869 trace!(
870 "Scheduling I/O for {} ranges spread across byte range {}..{}",
871 byte_ranges.len(),
872 min,
873 max
874 );
875
876 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
877
878 let bits_per_value = self.bits_per_value;
879 let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
880 let signed = self.signed;
881 async move {
882 let bytes = bytes.await?;
883 Ok(Box::new(BitpackedPageDecoder {
884 buffer_bit_start_offsets,
885 buffer_bit_end_offsets,
886 bits_per_value,
887 uncompressed_bits_per_value,
888 signed,
889 data: bytes,
890 }) as Box<dyn PrimitivePageDecoder>)
891 }
892 .boxed()
893 }
894}
895
896#[derive(Debug)]
897struct BitpackedPageDecoder {
898 buffer_bit_start_offsets: Vec<u8>,
900
901 buffer_bit_end_offsets: Vec<Option<u8>>,
905
906 bits_per_value: u64,
909
910 uncompressed_bits_per_value: u64,
912
913 signed: bool,
915
916 data: Vec<Bytes>,
917}
918
919impl PrimitivePageDecoder for BitpackedPageDecoder {
920 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
921 let num_bytes = self.uncompressed_bits_per_value / 8 * num_rows;
922 let mut dest = vec![0; num_bytes as usize];
923
924 debug_assert!(self.bits_per_value <= 64);
926
927 let mut rows_to_skip = rows_to_skip;
928 let mut rows_taken = 0;
929 let byte_len = self.uncompressed_bits_per_value / 8;
930 let mut dst_idx = 0; let mask = u64::MAX >> (64 - self.bits_per_value);
934
935 for i in 0..self.data.len() {
936 let src = &self.data[i];
937 let (mut src_idx, mut src_offset) = match compute_start_offset(
938 rows_to_skip,
939 src.len(),
940 self.bits_per_value,
941 self.buffer_bit_start_offsets[i],
942 self.buffer_bit_end_offsets[i],
943 ) {
944 StartOffset::SkipFull(rows_to_skip_here) => {
945 rows_to_skip -= rows_to_skip_here;
946 continue;
947 }
948 StartOffset::SkipSome(buffer_start_offset) => (
949 buffer_start_offset.index,
950 buffer_start_offset.bit_offset as u64,
951 ),
952 };
953
954 while src_idx < src.len() && rows_taken < num_rows {
955 rows_taken += 1;
956 let mut curr_mask = mask; let mut curr_src = src[src_idx] & (curr_mask << src_offset) as u8;
960
961 let mut src_bits_written = 0;
963
964 let mut dst_offset = 0;
966
967 let is_negative = is_encoded_item_negative(
968 src,
969 src_idx,
970 src_offset,
971 self.bits_per_value as usize,
972 );
973
974 while src_bits_written < self.bits_per_value {
975 dest[dst_idx] += (curr_src >> src_offset) << dst_offset;
977 let bits_written = (self.bits_per_value - src_bits_written)
978 .min(8 - src_offset)
979 .min(8 - dst_offset);
980 src_bits_written += bits_written;
981 dst_offset += bits_written;
982 src_offset += bits_written;
983 curr_mask >>= bits_written;
984
985 if dst_offset == 8 {
986 dst_idx += 1;
987 dst_offset = 0;
988 }
989
990 if src_offset == 8 {
991 src_idx += 1;
992 src_offset = 0;
993 if src_idx == src.len() {
994 break;
995 }
996 curr_src = src[src_idx] & curr_mask as u8;
997 }
998 }
999
1000 let mut negative_padded_current_byte = false;
1002 if self.signed && is_negative && dst_offset > 0 {
1003 negative_padded_current_byte = true;
1004 while dst_offset < 8 {
1005 dest[dst_idx] |= 1 << dst_offset;
1006 dst_offset += 1;
1007 }
1008 }
1009
1010 if self.uncompressed_bits_per_value != self.bits_per_value {
1014 let partial_bytes_written = ceil(self.bits_per_value as usize, 8);
1015
1016 let mut to_next_byte = 1;
1020 if self.bits_per_value.is_multiple_of(8) {
1021 to_next_byte = 0;
1022 }
1023 let next_dst_idx =
1024 dst_idx + byte_len as usize - partial_bytes_written + to_next_byte;
1025
1026 if self.signed && is_negative {
1028 if !negative_padded_current_byte {
1029 dest[dst_idx] = 0xFF;
1030 }
1031 for i in dest.iter_mut().take(next_dst_idx).skip(dst_idx + 1) {
1032 *i = 0xFF;
1033 }
1034 }
1035
1036 dst_idx = next_dst_idx;
1037 }
1038
1039 if let Some(buffer_bit_end_offset) = self.buffer_bit_end_offsets[i]
1042 && src_idx == src.len() - 1
1043 && src_offset >= buffer_bit_end_offset as u64
1044 {
1045 break;
1046 }
1047 }
1048 }
1049
1050 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
1051 data: LanceBuffer::from(dest),
1052 bits_per_value: self.uncompressed_bits_per_value,
1053 num_values: num_rows,
1054 block_info: BlockInfo::new(),
1055 }))
1056 }
1057}
1058
1059fn is_encoded_item_negative(src: &Bytes, src_idx: usize, src_offset: u64, num_bits: usize) -> bool {
1060 let mut last_byte_idx = src_idx + ((src_offset as usize + num_bits) / 8);
1061 let shift_amount = (src_offset as usize + num_bits) % 8;
1062 let shift_amount = if shift_amount == 0 {
1063 last_byte_idx -= 1;
1064 7
1065 } else {
1066 shift_amount - 1
1067 };
1068 let last_byte = src[last_byte_idx];
1069 let sign_bit_mask = 1 << shift_amount;
1070 let sign_bit = last_byte & sign_bit_mask;
1071
1072 sign_bit > 0
1073}
1074
1075#[derive(Debug, PartialEq)]
1076struct BufferStartOffset {
1077 index: usize,
1078 bit_offset: u8,
1079}
1080
1081#[derive(Debug, PartialEq)]
1082enum StartOffset {
1083 SkipFull(u64),
1086
1087 SkipSome(BufferStartOffset),
1089}
1090
1091fn compute_start_offset(
1101 rows_to_skip: u64,
1102 buffer_len: usize,
1103 bits_per_value: u64,
1104 buffer_start_bit_offset: u8,
1105 buffer_end_bit_offset: Option<u8>,
1106) -> StartOffset {
1107 let rows_in_buffer = rows_in_buffer(
1108 buffer_len,
1109 bits_per_value,
1110 buffer_start_bit_offset,
1111 buffer_end_bit_offset,
1112 );
1113 if rows_to_skip >= rows_in_buffer {
1114 return StartOffset::SkipFull(rows_in_buffer);
1115 }
1116
1117 let start_bit = rows_to_skip * bits_per_value + buffer_start_bit_offset as u64;
1118 let start_byte = start_bit / 8;
1119
1120 StartOffset::SkipSome(BufferStartOffset {
1121 index: start_byte as usize,
1122 bit_offset: (start_bit % 8) as u8,
1123 })
1124}
1125
1126fn rows_in_buffer(
1128 buffer_len: usize,
1129 bits_per_value: u64,
1130 buffer_start_bit_offset: u8,
1131 buffer_end_bit_offset: Option<u8>,
1132) -> u64 {
1133 let mut bits_in_buffer = (buffer_len * 8) as u64 - buffer_start_bit_offset as u64;
1134
1135 if let Some(buffer_end_bit_offset) = buffer_end_bit_offset {
1138 bits_in_buffer -= (8 - buffer_end_bit_offset) as u64;
1139 }
1140
1141 bits_in_buffer / bits_per_value
1142}
1143
1144#[cfg(test)]
1145pub mod test {
1146 use crate::{
1147 format::pb,
1148 testing::{ArrayGeneratorProvider, TestCases, check_round_trip_encoding_generated},
1149 version::LanceFileVersion,
1150 };
1151
1152 use super::*;
1153 use std::{marker::PhantomData, sync::Arc};
1154
1155 use arrow_array::{
1156 ArrayRef, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array,
1157 UInt8Array, UInt16Array, UInt32Array, UInt64Array,
1158 types::{UInt8Type, UInt16Type},
1159 };
1160
1161 use arrow_schema::Field;
1162 use lance_datagen::{
1163 ArrayGenerator, ArrayGeneratorExt, RowCount,
1164 array::{fill, rand_with_distribution},
1165 gen_batch,
1166 };
1167 use rand::distr::Uniform;
1168
1169 #[test]
1170 fn test_bitpack_params() {
1171 fn gen_array(generator: Box<dyn ArrayGenerator>) -> ArrayRef {
1172 gen_batch()
1173 .anon_col(generator)
1174 .into_batch_rows(RowCount::from(10000))
1175 .unwrap()
1176 .column(0)
1177 .clone()
1178 }
1179
1180 macro_rules! do_test {
1181 ($num_bits:expr, $data_type:ident, $null_probability:expr) => {
1182 let max = 1 << $num_bits - 1;
1183 let mut arr =
1184 gen_array(fill::<$data_type>(max).with_random_nulls($null_probability));
1185
1186 while arr.null_count() == arr.len() {
1188 arr = gen_array(fill::<$data_type>(max).with_random_nulls($null_probability));
1189 }
1190 let result = bitpack_params(arr.as_ref());
1191 assert!(result.is_some());
1192 assert_eq!($num_bits, result.unwrap().num_bits);
1193 };
1194 }
1195
1196 let test_cases = vec![
1197 (5u64, 0.0f64),
1198 (5u64, 0.9f64),
1199 (1u64, 0.0f64),
1200 (1u64, 0.5f64),
1201 (8u64, 0.0f64),
1202 (8u64, 0.5f64),
1203 ];
1204
1205 for (num_bits, null_probability) in &test_cases {
1206 do_test!(*num_bits, UInt8Type, *null_probability);
1207 do_test!(*num_bits, UInt16Type, *null_probability);
1208 do_test!(*num_bits, UInt32Type, *null_probability);
1209 do_test!(*num_bits, UInt64Type, *null_probability);
1210 }
1211
1212 let test_cases = vec![
1214 (13u64, 0.0f64),
1215 (13u64, 0.5f64),
1216 (16u64, 0.0f64),
1217 (16u64, 0.5f64),
1218 ];
1219 for (num_bits, null_probability) in &test_cases {
1220 do_test!(*num_bits, UInt16Type, *null_probability);
1221 do_test!(*num_bits, UInt32Type, *null_probability);
1222 do_test!(*num_bits, UInt64Type, *null_probability);
1223 }
1224 let test_cases = vec![
1225 (25u64, 0.0f64),
1226 (25u64, 0.5f64),
1227 (32u64, 0.0f64),
1228 (32u64, 0.5f64),
1229 ];
1230 for (num_bits, null_probability) in &test_cases {
1231 do_test!(*num_bits, UInt32Type, *null_probability);
1232 do_test!(*num_bits, UInt64Type, *null_probability);
1233 }
1234 let test_cases = vec![
1235 (48u64, 0.0f64),
1236 (48u64, 0.5f64),
1237 (64u64, 0.0f64),
1238 (64u64, 0.5f64),
1239 ];
1240 for (num_bits, null_probability) in &test_cases {
1241 do_test!(*num_bits, UInt64Type, *null_probability);
1242 }
1243
1244 let arr = Float64Array::from_iter_values(vec![0.1, 0.2, 0.3]);
1246 let result = bitpack_params(&arr);
1247 assert!(result.is_none());
1248 }
1249
1250 #[test]
1251 fn test_num_compressed_bits_signed_types() {
1252 let values = Int32Array::from(vec![1, 2, -7]);
1253 let arr = values;
1254 let result = bitpack_params(&arr);
1255 assert!(result.is_some());
1256 let result = result.unwrap();
1257 assert_eq!(4, result.num_bits);
1258 assert!(result.signed);
1259
1260 let values = Int32Array::from(vec![1, 2, 7]);
1262 let arr = values;
1263 let result = bitpack_params(&arr);
1264 assert!(result.is_some());
1265 let result = result.unwrap();
1266 assert_eq!(3, result.num_bits);
1267 assert!(!result.signed);
1268 }
1269
1270 #[test]
1271 fn test_rows_in_buffer() {
1272 let test_cases = vec![
1273 (5usize, 5u64, 0u8, None, 8u64),
1274 (2, 3, 0, Some(5), 4),
1275 (2, 3, 7, Some(6), 2),
1276 ];
1277
1278 for (
1279 buffer_len,
1280 bits_per_value,
1281 buffer_start_bit_offset,
1282 buffer_end_bit_offset,
1283 expected,
1284 ) in test_cases
1285 {
1286 let result = rows_in_buffer(
1287 buffer_len,
1288 bits_per_value,
1289 buffer_start_bit_offset,
1290 buffer_end_bit_offset,
1291 );
1292 assert_eq!(expected, result);
1293 }
1294 }
1295
1296 #[test]
1297 fn test_compute_start_offset() {
1298 let result = compute_start_offset(0, 5, 5, 0, None);
1299 assert_eq!(
1300 StartOffset::SkipSome(BufferStartOffset {
1301 index: 0,
1302 bit_offset: 0
1303 }),
1304 result
1305 );
1306
1307 let result = compute_start_offset(10, 5, 5, 0, None);
1308 assert_eq!(StartOffset::SkipFull(8), result);
1309 }
1310
1311 #[test_log::test(test)]
1312 fn test_will_bitpack_allowed_types_when_possible() {
1313 let test_cases: Vec<(DataType, ArrayRef, u64)> = vec![
1314 (
1315 DataType::UInt8,
1316 Arc::new(UInt8Array::from_iter_values(vec![0, 1, 2, 3, 4, 5])),
1317 3, ),
1319 (
1320 DataType::UInt16,
1321 Arc::new(UInt16Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 8])),
1322 11,
1323 ),
1324 (
1325 DataType::UInt32,
1326 Arc::new(UInt32Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 16])),
1327 19,
1328 ),
1329 (
1330 DataType::UInt64,
1331 Arc::new(UInt64Array::from_iter_values(vec![0, 1, 2, 3, 4, 5 << 32])),
1332 35,
1333 ),
1334 (
1335 DataType::Int8,
1336 Arc::new(Int8Array::from_iter_values(vec![0, 2, 3, 4, -5])),
1337 4,
1338 ),
1339 (
1340 DataType::Int8,
1342 Arc::new(Int8Array::from_iter_values(vec![0, 2, 3, 4, 5])),
1343 3,
1344 ),
1345 (
1346 DataType::Int16,
1347 Arc::new(Int16Array::from_iter_values(vec![0, 1, 2, 3, -4, 5 << 8])),
1348 12,
1349 ),
1350 (
1351 DataType::Int32,
1352 Arc::new(Int32Array::from_iter_values(vec![0, 1, 2, 3, 4, -5 << 16])),
1353 20,
1354 ),
1355 (
1356 DataType::Int64,
1357 Arc::new(Int64Array::from_iter_values(vec![
1358 0,
1359 1,
1360 2,
1361 -3,
1362 -4,
1363 -5 << 32,
1364 ])),
1365 36,
1366 ),
1367 ];
1368
1369 for (data_type, arr, bits_per_value) in test_cases {
1370 let mut buffed_index = 1;
1371 let params = bitpack_params(arr.as_ref()).unwrap();
1372 let encoder = BitpackedArrayEncoder {
1373 num_bits: params.num_bits,
1374 signed_type: params.signed,
1375 };
1376 let data = DataBlock::from_array(arr);
1377 let result = encoder.encode(data, &data_type, &mut buffed_index).unwrap();
1378
1379 let data = result.data.as_fixed_width().unwrap();
1380 assert_eq!(bits_per_value, data.bits_per_value);
1381
1382 let array_encoding = result.encoding.array_encoding.unwrap();
1383
1384 match array_encoding {
1385 pb::array_encoding::ArrayEncoding::Bitpacked(bitpacked) => {
1386 assert_eq!(bits_per_value, bitpacked.compressed_bits_per_value);
1387 assert_eq!(
1388 (data_type.byte_width() * 8) as u64,
1389 bitpacked.uncompressed_bits_per_value
1390 );
1391 }
1392 _ => {
1393 panic!("Array did not use bitpacking encoding")
1394 }
1395 }
1396 }
1397
1398 let test_cases: Vec<(DataType, ArrayRef)> = vec![
1400 (
1402 DataType::Float32,
1403 Arc::new(Float32Array::from_iter_values(vec![0.1, 0.2, 0.3])),
1404 ),
1405 (
1408 DataType::UInt8,
1409 Arc::new(UInt8Array::from_iter_values(vec![0, 1, 2, 3, 4, 250])),
1410 ),
1411 (
1412 DataType::UInt16,
1413 Arc::new(UInt16Array::from_iter_values(vec![0, 1, 2, 3, 4, 250 << 8])),
1414 ),
1415 (
1416 DataType::UInt32,
1417 Arc::new(UInt32Array::from_iter_values(vec![
1418 0,
1419 1,
1420 2,
1421 3,
1422 4,
1423 250 << 24,
1424 ])),
1425 ),
1426 (
1427 DataType::UInt64,
1428 Arc::new(UInt64Array::from_iter_values(vec![
1429 0,
1430 1,
1431 2,
1432 3,
1433 4,
1434 250 << 56,
1435 ])),
1436 ),
1437 (
1438 DataType::Int8,
1439 Arc::new(Int8Array::from_iter_values(vec![-100])),
1440 ),
1441 (
1442 DataType::Int16,
1443 Arc::new(Int16Array::from_iter_values(vec![-100 << 8])),
1444 ),
1445 (
1446 DataType::Int32,
1447 Arc::new(Int32Array::from_iter_values(vec![-100 << 24])),
1448 ),
1449 (
1450 DataType::Int64,
1451 Arc::new(Int64Array::from_iter_values(vec![-100 << 56])),
1452 ),
1453 ];
1454
1455 for (data_type, arr) in test_cases {
1456 if let Some(params) = bitpack_params(arr.as_ref()) {
1457 assert_eq!(params.num_bits, data_type.byte_width() as u64 * 8);
1458 }
1459 }
1460 }
1461
1462 struct DistributionArrayGeneratorProvider<
1463 DataType,
1464 Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1465 >
1466 where
1467 DataType::Native: Copy + 'static,
1468 PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1469 DataType: ArrowPrimitiveType,
1470 {
1471 phantom: PhantomData<DataType>,
1472 distribution: Dist,
1473 }
1474
1475 impl<DataType, Dist> DistributionArrayGeneratorProvider<DataType, Dist>
1476 where
1477 Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1478 DataType::Native: Copy + 'static,
1479 PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1480 DataType: ArrowPrimitiveType,
1481 {
1482 fn new(dist: Dist) -> Self {
1483 Self {
1484 distribution: dist,
1485 phantom: Default::default(),
1486 }
1487 }
1488 }
1489
1490 impl<DataType, Dist> ArrayGeneratorProvider for DistributionArrayGeneratorProvider<DataType, Dist>
1491 where
1492 Dist: rand::distr::Distribution<DataType::Native> + Clone + Send + Sync + 'static,
1493 DataType::Native: Copy + 'static,
1494 PrimitiveArray<DataType>: From<Vec<DataType::Native>> + 'static,
1495 DataType: ArrowPrimitiveType,
1496 {
1497 fn provide(&self) -> Box<dyn ArrayGenerator> {
1498 rand_with_distribution::<DataType, Dist>(self.distribution.clone())
1499 }
1500
1501 fn copy(&self) -> Box<dyn ArrayGeneratorProvider> {
1502 Box::new(Self {
1503 phantom: self.phantom,
1504 distribution: self.distribution.clone(),
1505 })
1506 }
1507 }
1508
1509 #[test_log::test(tokio::test)]
1510 async fn test_bitpack_primitive() {
1511 let bitpacked_test_cases: &Vec<(DataType, Box<dyn ArrayGeneratorProvider>)> = &vec![
1512 (
1514 DataType::UInt32,
1515 Box::new(
1516 DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1517 Uniform::new(0, 19).unwrap(),
1518 ),
1519 ),
1520 ),
1521 (
1523 DataType::UInt32,
1524 Box::new(
1525 DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1526 Uniform::new(5 << 7, 6 << 7).unwrap(),
1527 ),
1528 ),
1529 ),
1530 (
1531 DataType::UInt64,
1532 Box::new(
1533 DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1534 Uniform::new(5 << 42, 6 << 42).unwrap(),
1535 ),
1536 ),
1537 ),
1538 (
1540 DataType::UInt8,
1541 Box::new(
1542 DistributionArrayGeneratorProvider::<UInt8Type, Uniform<u8>>::new(
1543 Uniform::new(0, 19).unwrap(),
1544 ),
1545 ),
1546 ),
1547 (
1549 DataType::UInt64,
1550 Box::new(
1551 DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1552 Uniform::new(129, 259).unwrap(),
1553 ),
1554 ),
1555 ),
1556 (
1558 DataType::UInt32,
1559 Box::new(
1560 DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1561 Uniform::new(200, 250).unwrap(),
1563 ),
1564 ),
1565 ),
1566 (
1568 DataType::UInt64,
1569 Box::new(
1570 DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1571 Uniform::new(1, 3).unwrap(), ),
1573 ),
1574 ),
1575 (
1577 DataType::UInt32,
1578 Box::new(
1579 DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1580 Uniform::new(200 << 8, 250 << 8).unwrap(),
1582 ),
1583 ),
1584 ),
1585 (
1587 DataType::UInt64,
1588 Box::new(
1589 DistributionArrayGeneratorProvider::<UInt64Type, Uniform<u64>>::new(
1590 Uniform::new(200 << 16, 250 << 16).unwrap(),
1592 ),
1593 ),
1594 ),
1595 (
1597 DataType::UInt32,
1598 Box::new(
1599 DistributionArrayGeneratorProvider::<UInt32Type, Uniform<u32>>::new(
1600 Uniform::new(0, 1).unwrap(),
1601 ),
1602 ),
1603 ),
1604 (
1606 DataType::Int16,
1607 Box::new(
1608 DistributionArrayGeneratorProvider::<Int16Type, Uniform<i16>>::new(
1609 Uniform::new(-5, 5).unwrap(),
1610 ),
1611 ),
1612 ),
1613 (
1614 DataType::Int64,
1615 Box::new(
1616 DistributionArrayGeneratorProvider::<Int64Type, Uniform<i64>>::new(
1617 Uniform::new(-(5 << 42), 6 << 42).unwrap(),
1618 ),
1619 ),
1620 ),
1621 (
1622 DataType::Int32,
1623 Box::new(
1624 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1625 Uniform::new(-(5 << 7), 6 << 7).unwrap(),
1626 ),
1627 ),
1628 ),
1629 (
1631 DataType::Int32,
1632 Box::new(
1633 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1634 Uniform::new(-19, 19).unwrap(),
1635 ),
1636 ),
1637 ),
1638 (
1640 DataType::Int32,
1641 Box::new(
1642 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1643 Uniform::new(-120, 120).unwrap(),
1645 ),
1646 ),
1647 ),
1648 (
1650 DataType::Int32,
1651 Box::new(
1652 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1653 Uniform::new(-120 << 8, 120 << 8).unwrap(),
1655 ),
1656 ),
1657 ),
1658 (
1660 DataType::Int32,
1661 Box::new(
1662 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1663 Uniform::new(10, 20).unwrap(),
1664 ),
1665 ),
1666 ),
1667 (
1669 DataType::Int32,
1670 Box::new(
1671 DistributionArrayGeneratorProvider::<Int32Type, Uniform<i32>>::new(
1672 Uniform::new(0, 1).unwrap(),
1673 ),
1674 ),
1675 ),
1676 ];
1677
1678 for (data_type, array_gen_provider) in bitpacked_test_cases {
1679 let field = Field::new("", data_type.clone(), false);
1680 let test_cases = TestCases::basic().with_min_file_version(LanceFileVersion::V2_1);
1681 check_round_trip_encoding_generated(field, array_gen_provider.copy(), test_cases).await;
1682 }
1683 }
1684}