1use std::sync::Arc;
5
6use arrow::datatypes::{
7 Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
8};
9use arrow_array::{Array, PrimitiveArray};
10use arrow_buffer::ArrowNativeType;
11use arrow_schema::DataType;
12use byteorder::{ByteOrder, LittleEndian};
13use bytes::Bytes;
14use futures::future::{BoxFuture, FutureExt};
15use log::trace;
16use snafu::location;
17
18use lance_arrow::DataTypeExt;
19use lance_core::{Error, Result};
20
21use crate::buffer::LanceBuffer;
22use crate::compression_algo::fastlanes::BitPacking;
23use crate::data::BlockInfo;
24use crate::data::{DataBlock, FixedWidthDataBlock, NullableDataBlock};
25use crate::decoder::{
26 BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor, PageScheduler,
27 PrimitivePageDecoder,
28};
29use crate::encoder::{
30 ArrayEncoder, BlockCompressor, EncodedArray, MiniBlockChunk, MiniBlockCompressed,
31 MiniBlockCompressor, PerValueCompressor, PerValueDataBlock,
32};
33use crate::format::{pb, ProtobufUtils};
34use crate::statistics::{GetStat, Stat};
35use arrow::array::ArrayRef;
36use bytemuck::{cast_slice, AnyBitPattern};
37
38const LOG_ELEMS_PER_CHUNK: u8 = 10;
39const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
40
41pub fn compute_compressed_bit_width_for_non_neg(arrays: &[ArrayRef]) -> u64 {
45 debug_assert!(!arrays.is_empty());
46
47 let res;
48
49 match arrays[0].data_type() {
50 DataType::UInt8 => {
51 let mut global_max: u8 = 0;
52 for array in arrays {
53 let primitive_array = array
54 .as_any()
55 .downcast_ref::<PrimitiveArray<UInt8Type>>()
56 .unwrap();
57 let array_max = arrow::compute::bit_or(primitive_array);
58 global_max = global_max.max(array_max.unwrap_or(0));
59 }
60 let num_bits =
61 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
62 if num_bits == 0 {
64 res = 1;
65 } else {
66 res = num_bits;
67 }
68 }
69
70 DataType::Int8 => {
71 let mut global_max_width: u64 = 0;
72 for array in arrays {
73 let primitive_array = array
74 .as_any()
75 .downcast_ref::<PrimitiveArray<Int8Type>>()
76 .unwrap();
77 let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
78 global_max_width = global_max_width.max(8 - array_max_width.leading_zeros() as u64);
79 }
80 if global_max_width == 0 {
81 res = 1;
82 } else {
83 res = global_max_width;
84 }
85 }
86
87 DataType::UInt16 => {
88 let mut global_max: u16 = 0;
89 for array in arrays {
90 let primitive_array = array
91 .as_any()
92 .downcast_ref::<PrimitiveArray<UInt16Type>>()
93 .unwrap();
94 let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
95 global_max = global_max.max(array_max);
96 }
97 let num_bits =
98 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
99 if num_bits == 0 {
100 res = 1;
101 } else {
102 res = num_bits;
103 }
104 }
105
106 DataType::Int16 => {
107 let mut global_max_width: u64 = 0;
108 for array in arrays {
109 let primitive_array = array
110 .as_any()
111 .downcast_ref::<PrimitiveArray<Int16Type>>()
112 .unwrap();
113 let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
114 global_max_width =
115 global_max_width.max(16 - array_max_width.leading_zeros() as u64);
116 }
117 if global_max_width == 0 {
118 res = 1;
119 } else {
120 res = global_max_width;
121 }
122 }
123
124 DataType::UInt32 => {
125 let mut global_max: u32 = 0;
126 for array in arrays {
127 let primitive_array = array
128 .as_any()
129 .downcast_ref::<PrimitiveArray<UInt32Type>>()
130 .unwrap();
131 let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
132 global_max = global_max.max(array_max);
133 }
134 let num_bits =
135 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
136 if num_bits == 0 {
137 res = 1;
138 } else {
139 res = num_bits;
140 }
141 }
142
143 DataType::Int32 => {
144 let mut global_max_width: u64 = 0;
145 for array in arrays {
146 let primitive_array = array
147 .as_any()
148 .downcast_ref::<PrimitiveArray<Int32Type>>()
149 .unwrap();
150 let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
151 global_max_width =
152 global_max_width.max(32 - array_max_width.leading_zeros() as u64);
153 }
154 if global_max_width == 0 {
155 res = 1;
156 } else {
157 res = global_max_width;
158 }
159 }
160
161 DataType::UInt64 => {
162 let mut global_max: u64 = 0;
163 for array in arrays {
164 let primitive_array = array
165 .as_any()
166 .downcast_ref::<PrimitiveArray<UInt64Type>>()
167 .unwrap();
168 let array_max = arrow::compute::bit_or(primitive_array).unwrap_or(0);
169 global_max = global_max.max(array_max);
170 }
171 let num_bits =
172 arrays[0].data_type().byte_width() as u64 * 8 - global_max.leading_zeros() as u64;
173 if num_bits == 0 {
174 res = 1;
175 } else {
176 res = num_bits;
177 }
178 }
179
180 DataType::Int64 => {
181 let mut global_max_width: u64 = 0;
182 for array in arrays {
183 let primitive_array = array
184 .as_any()
185 .downcast_ref::<PrimitiveArray<Int64Type>>()
186 .unwrap();
187 let array_max_width = arrow::compute::bit_or(primitive_array).unwrap_or(0);
188 global_max_width =
189 global_max_width.max(64 - array_max_width.leading_zeros() as u64);
190 }
191 if global_max_width == 0 {
192 res = 1;
193 } else {
194 res = global_max_width;
195 }
196 }
197 _ => {
198 panic!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64");
199 }
200 };
201 res
202}
203
204macro_rules! encode_fixed_width {
212 ($self:expr, $unpacked:expr, $data_type:ty, $buffer_index:expr) => {{
213 let num_chunks = $unpacked.num_values.div_ceil(ELEMS_PER_CHUNK);
214 let num_full_chunks = $unpacked.num_values / ELEMS_PER_CHUNK;
215 let uncompressed_bit_width = std::mem::size_of::<$data_type>() as u64 * 8;
216
217 let packed_chunk_size = 1024 * $self.compressed_bit_width as usize / uncompressed_bit_width as usize;
219
220 let input_slice = $unpacked.data.borrow_to_typed_slice::<$data_type>();
221 let input = input_slice.as_ref();
222
223 let mut output = Vec::with_capacity(num_chunks as usize * packed_chunk_size);
224
225 (0..num_full_chunks).for_each(|i| {
227 let start_elem = (i * ELEMS_PER_CHUNK) as usize;
228
229 let output_len = output.len();
230 unsafe {
231 output.set_len(output_len + packed_chunk_size);
232 BitPacking::unchecked_pack(
233 $self.compressed_bit_width,
234 &input[start_elem..][..ELEMS_PER_CHUNK as usize],
235 &mut output[output_len..][..packed_chunk_size],
236 );
237 }
238 });
239
240 if num_chunks != num_full_chunks {
241 let last_chunk_elem_num = $unpacked.num_values % ELEMS_PER_CHUNK;
242 let mut last_chunk = vec![0 as $data_type; ELEMS_PER_CHUNK as usize];
243 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
244 &input[$unpacked.num_values as usize - last_chunk_elem_num as usize..],
245 );
246
247 let output_len = output.len();
248 unsafe {
249 output.set_len(output_len + packed_chunk_size);
250 BitPacking::unchecked_pack(
251 $self.compressed_bit_width,
252 &last_chunk,
253 &mut output[output_len..][..packed_chunk_size],
254 );
255 }
256 }
257
258 let bitpacked_for_non_neg_buffer_index = *$buffer_index;
259 *$buffer_index += 1;
260
261 let encoding = ProtobufUtils::bitpacked_for_non_neg_encoding(
262 $self.compressed_bit_width as u64,
263 uncompressed_bit_width,
264 bitpacked_for_non_neg_buffer_index,
265 );
266 let packed = DataBlock::FixedWidth(FixedWidthDataBlock {
267 bits_per_value: $self.compressed_bit_width as u64,
268 data: LanceBuffer::reinterpret_vec(output),
269 num_values: $unpacked.num_values,
270 block_info: BlockInfo::new(),
271 });
272
273 Result::Ok(EncodedArray {
274 data: packed,
275 encoding,
276 })
277 }};
278}
279
280#[derive(Debug)]
281pub struct BitpackedForNonNegArrayEncoder {
282 pub compressed_bit_width: usize,
283 pub original_data_type: DataType,
284}
285
286impl BitpackedForNonNegArrayEncoder {
287 pub fn new(compressed_bit_width: usize, data_type: DataType) -> Self {
288 Self {
289 compressed_bit_width,
290 original_data_type: data_type,
291 }
292 }
293}
294
295impl ArrayEncoder for BitpackedForNonNegArrayEncoder {
296 fn encode(
297 &self,
298 data: DataBlock,
299 data_type: &DataType,
300 buffer_index: &mut u32,
301 ) -> Result<EncodedArray> {
302 match data {
303 DataBlock::AllNull(_) => {
304 let encoding = ProtobufUtils::basic_all_null_encoding();
305 Ok(EncodedArray { data, encoding })
306 }
307 DataBlock::FixedWidth(mut unpacked) => {
308 match data_type {
309 DataType::UInt8 | DataType::Int8 => encode_fixed_width!(self, unpacked, u8, buffer_index),
310 DataType::UInt16 | DataType::Int16 => encode_fixed_width!(self, unpacked, u16, buffer_index),
311 DataType::UInt32 | DataType::Int32 => encode_fixed_width!(self, unpacked, u32, buffer_index),
312 DataType::UInt64 | DataType::Int64 => encode_fixed_width!(self, unpacked, u64, buffer_index),
313 _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
314 }
315 }
316 DataBlock::Nullable(nullable) => {
317 let validity_buffer_index = *buffer_index;
318 *buffer_index += 1;
319
320 let validity_desc = ProtobufUtils::flat_encoding(
321 1,
322 validity_buffer_index,
323 None,
324 );
325 let encoded_values: EncodedArray;
326 match *nullable.data {
327 DataBlock::FixedWidth(mut unpacked) => {
328 match data_type {
329 DataType::UInt8 | DataType::Int8 => encoded_values = encode_fixed_width!(self, unpacked, u8, buffer_index)?,
330 DataType::UInt16 | DataType::Int16 => encoded_values = encode_fixed_width!(self, unpacked, u16, buffer_index)?,
331 DataType::UInt32 | DataType::Int32 => encoded_values = encode_fixed_width!(self, unpacked, u32, buffer_index)?,
332 DataType::UInt64 | DataType::Int64 => encoded_values = encode_fixed_width!(self, unpacked, u64, buffer_index)?,
333 _ => unreachable!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64"),
334 }
335 }
336 _ => {
337 return Err(Error::InvalidInput {
338 source: "Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into(),
339 location: location!(),
340 });
341 }
342 }
343 let encoding =
344 ProtobufUtils::basic_some_null_encoding(validity_desc, encoded_values.encoding);
345 let encoded = DataBlock::Nullable(NullableDataBlock {
346 data: Box::new(encoded_values.data),
347 nulls: nullable.nulls,
348 block_info: BlockInfo::new(),
349 });
350 Ok(EncodedArray {
351 data: encoded,
352 encoding,
353 })
354 }
355 _ => {
356 Err(Error::InvalidInput {
357 source: "Bitpacking only supports fixed width data blocks or a nullable data block with fixed width data block inside or a all null data block".into(),
358 location: location!(),
359 })
360 }
361 }
362 }
363}
364
365#[derive(Debug)]
366pub struct BitpackedForNonNegScheduler {
367 compressed_bit_width: u64,
368 uncompressed_bits_per_value: u64,
369 buffer_offset: u64,
370}
371
372impl BitpackedForNonNegScheduler {
373 pub fn new(
374 compressed_bit_width: u64,
375 uncompressed_bits_per_value: u64,
376 buffer_offset: u64,
377 ) -> Self {
378 Self {
379 compressed_bit_width,
380 uncompressed_bits_per_value,
381 buffer_offset,
382 }
383 }
384
385 fn locate_chunk_start(&self, relative_row_num: u64) -> u64 {
386 let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
387 self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size)
388 }
389
390 fn locate_chunk_end(&self, relative_row_num: u64) -> u64 {
391 let chunk_size = ELEMS_PER_CHUNK * self.compressed_bit_width / 8;
392 self.buffer_offset + (relative_row_num / ELEMS_PER_CHUNK * chunk_size) + chunk_size
393 }
394}
395
396impl PageScheduler for BitpackedForNonNegScheduler {
397 fn schedule_ranges(
398 &self,
399 ranges: &[std::ops::Range<u64>],
400 scheduler: &Arc<dyn crate::EncodingsIo>,
401 top_level_row: u64,
402 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
403 assert!(!ranges.is_empty());
404
405 let mut byte_ranges = vec![];
406
407 let mut bytes_idx_to_range_indices = vec![];
409 let first_byte_range = std::ops::Range {
410 start: self.locate_chunk_start(ranges[0].start),
411 end: self.locate_chunk_end(ranges[0].end - 1),
412 }; byte_ranges.push(first_byte_range);
414 bytes_idx_to_range_indices.push(vec![ranges[0].clone()]);
415
416 for (i, range) in ranges.iter().enumerate().skip(1) {
417 let this_start = self.locate_chunk_start(range.start);
418 let this_end = self.locate_chunk_end(range.end - 1);
419
420 if this_start == self.locate_chunk_start(ranges[i - 1].end - 1) {
423 byte_ranges.last_mut().unwrap().end = this_end;
424 bytes_idx_to_range_indices
425 .last_mut()
426 .unwrap()
427 .push(range.clone());
428 } else {
429 byte_ranges.push(this_start..this_end);
430 bytes_idx_to_range_indices.push(vec![range.clone()]);
431 }
432 }
433
434 trace!(
435 "Scheduling I/O for {} ranges spread across byte range {}..{}",
436 byte_ranges.len(),
437 byte_ranges[0].start,
438 byte_ranges.last().unwrap().end
439 );
440
441 let bytes = scheduler.submit_request(byte_ranges.clone(), top_level_row);
442
443 let compressed_bit_width = self.compressed_bit_width;
445 let uncompressed_bits_per_value = self.uncompressed_bits_per_value;
446 let num_rows = ranges.iter().map(|range| range.end - range.start).sum();
447
448 async move {
449 let bytes = bytes.await?;
450 let decompressed_output = bitpacked_for_non_neg_decode(
451 compressed_bit_width,
452 uncompressed_bits_per_value,
453 &bytes,
454 &bytes_idx_to_range_indices,
455 num_rows,
456 );
457 Ok(Box::new(BitpackedForNonNegPageDecoder {
458 uncompressed_bits_per_value,
459 decompressed_buf: decompressed_output,
460 }) as Box<dyn PrimitivePageDecoder>)
461 }
462 .boxed()
463 }
464}
465
466#[derive(Debug)]
467struct BitpackedForNonNegPageDecoder {
468 uncompressed_bits_per_value: u64,
470
471 decompressed_buf: LanceBuffer,
472}
473
474impl PrimitivePageDecoder for BitpackedForNonNegPageDecoder {
475 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
476 if ![8, 16, 32, 64].contains(&self.uncompressed_bits_per_value) {
477 return Err(Error::InvalidInput {
478 source: "BitpackedForNonNegPageDecoder should only has uncompressed_bits_per_value of 8, 16, 32, or 64".into(),
479 location: location!(),
480 });
481 }
482
483 let elem_size_in_bytes = self.uncompressed_bits_per_value / 8;
484
485 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
486 data: self.decompressed_buf.slice_with_length(
487 (rows_to_skip * elem_size_in_bytes) as usize,
488 (num_rows * elem_size_in_bytes) as usize,
489 ),
490 bits_per_value: self.uncompressed_bits_per_value,
491 num_values: num_rows,
492 block_info: BlockInfo::new(),
493 }))
494 }
495}
496
497macro_rules! bitpacked_decode {
498 ($uncompressed_type:ty, $compressed_bit_width:expr, $data:expr, $bytes_idx_to_range_indices:expr, $num_rows:expr) => {{
499 let mut decompressed: Vec<$uncompressed_type> = Vec::with_capacity($num_rows as usize);
500 let packed_chunk_size_in_byte: usize = (ELEMS_PER_CHUNK * $compressed_bit_width) as usize / 8;
501 let mut decompress_chunk_buf = vec![0 as $uncompressed_type; ELEMS_PER_CHUNK as usize];
502
503 for (i, bytes) in $data.iter().enumerate() {
504 let mut ranges_idx = 0;
505 let mut curr_range_start = $bytes_idx_to_range_indices[i][0].start;
506 let mut chunk_num = 0;
507
508 while chunk_num * packed_chunk_size_in_byte < bytes.len() {
509 let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
512 [..packed_chunk_size_in_byte]
513 .to_vec();
514 chunk_num += 1;
515 let chunk = cast_slice(&chunk_in_u8);
516 unsafe {
517 BitPacking::unchecked_unpack(
518 $compressed_bit_width as usize,
519 chunk,
520 &mut decompress_chunk_buf,
521 );
522 }
523
524 loop {
525 let elems_after_curr_range_start_in_this_chunk =
527 ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
528 if curr_range_start + elems_after_curr_range_start_in_this_chunk
529 <= $bytes_idx_to_range_indices[i][ranges_idx].end
530 {
531 decompressed.extend_from_slice(
532 &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
533 );
534 curr_range_start += elems_after_curr_range_start_in_this_chunk;
535 break;
536 } else {
537 let elems_this_range_needed_in_this_chunk =
539 ($bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
540 .min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
541 decompressed.extend_from_slice(
542 &decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
543 [..elems_this_range_needed_in_this_chunk as usize],
544 );
545 if curr_range_start + elems_this_range_needed_in_this_chunk
546 == $bytes_idx_to_range_indices[i][ranges_idx].end
547 {
548 ranges_idx += 1;
549 if ranges_idx == $bytes_idx_to_range_indices[i].len() {
550 break;
551 }
552 curr_range_start = $bytes_idx_to_range_indices[i][ranges_idx].start;
553 } else {
554 curr_range_start += elems_this_range_needed_in_this_chunk;
555 }
556 }
557 }
558 }
559 }
560
561 LanceBuffer::reinterpret_vec(decompressed)
562 }};
563}
564
565fn bitpacked_for_non_neg_decode(
566 compressed_bit_width: u64,
567 uncompressed_bits_per_value: u64,
568 data: &[Bytes],
569 bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
570 num_rows: u64,
571) -> LanceBuffer {
572 match uncompressed_bits_per_value {
573 8 => bitpacked_decode!(
574 u8,
575 compressed_bit_width,
576 data,
577 bytes_idx_to_range_indices,
578 num_rows
579 ),
580 16 => bitpacked_decode!(
581 u16,
582 compressed_bit_width,
583 data,
584 bytes_idx_to_range_indices,
585 num_rows
586 ),
587 32 => bitpacked_decode!(
588 u32,
589 compressed_bit_width,
590 data,
591 bytes_idx_to_range_indices,
592 num_rows
593 ),
594 64 => bitpacked_decode!(
595 u64,
596 compressed_bit_width,
597 data,
598 bytes_idx_to_range_indices,
599 num_rows
600 ),
601 _ => unreachable!(
602 "bitpacked_for_non_neg_decode only supports 8, 16, 32, 64 uncompressed_bits_per_value"
603 ),
604 }
605}
606
607#[derive(Debug, Default)]
608pub struct InlineBitpacking {
609 uncompressed_bit_width: u64,
610}
611
612impl InlineBitpacking {
613 pub fn new(uncompressed_bit_width: u64) -> Self {
614 Self {
615 uncompressed_bit_width,
616 }
617 }
618
619 pub fn from_description(description: &pb::InlineBitpacking) -> Self {
620 Self {
621 uncompressed_bit_width: description.uncompressed_bits_per_value,
622 }
623 }
624
625 pub fn min_size_bytes(bit_width: u64) -> u64 {
626 (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
627 }
628
629 fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
635 mut data: FixedWidthDataBlock,
636 ) -> MiniBlockCompressed {
637 let data_buffer = data.data.borrow_to_typed_slice::<T>();
638 let data_buffer = data_buffer.as_ref();
639
640 let bit_widths = data.expect_stat(Stat::BitWidth);
641 let bit_widths_array = bit_widths
642 .as_any()
643 .downcast_ref::<PrimitiveArray<UInt64Type>>()
644 .unwrap();
645
646 let (packed_chunk_sizes, total_size) = bit_widths_array
647 .values()
648 .iter()
649 .map(|&bit_width| {
650 let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
651 (chunk_size, chunk_size + 1)
652 })
653 .fold(
654 (Vec::with_capacity(bit_widths_array.len()), 0),
655 |(mut sizes, total), (size, inc)| {
656 sizes.push(size);
657 (sizes, total + inc)
658 },
659 );
660
661 let mut output: Vec<T> = Vec::with_capacity(total_size);
662 let mut chunks = Vec::with_capacity(bit_widths_array.len());
663
664 for (i, packed_chunk_size) in packed_chunk_sizes
665 .iter()
666 .enumerate()
667 .take(bit_widths_array.len() - 1)
668 {
669 let start_elem = i * ELEMS_PER_CHUNK as usize;
670 let bit_width = bit_widths_array.value(i) as usize;
671 output.push(T::from_usize(bit_width).unwrap());
672 let output_len = output.len();
673 unsafe {
674 output.set_len(output_len + *packed_chunk_size);
675 BitPacking::unchecked_pack(
676 bit_width,
677 &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
678 &mut output[output_len..][..*packed_chunk_size],
679 );
680 }
681 chunks.push(MiniBlockChunk {
682 buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
683 log_num_values: LOG_ELEMS_PER_CHUNK,
684 });
685 }
686
687 let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
689 1024
690 } else {
691 data.num_values % ELEMS_PER_CHUNK
692 };
693 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
694 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
695 &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
696 );
697 let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
698 output.push(T::from_usize(bit_width).unwrap());
699 let output_len = output.len();
700 unsafe {
701 output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
702 BitPacking::unchecked_pack(
703 bit_width,
704 &last_chunk,
705 &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
706 );
707 }
708 chunks.push(MiniBlockChunk {
709 buffer_sizes: vec![
710 ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
711 as u16,
712 ],
713 log_num_values: 0,
714 });
715
716 MiniBlockCompressed {
717 data: vec![LanceBuffer::reinterpret_vec(output)],
718 chunks,
719 num_values: data.num_values,
720 }
721 }
722
723 fn chunk_data(
724 &self,
725 data: FixedWidthDataBlock,
726 ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
727 assert!(data.bits_per_value % 8 == 0);
728 assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
729 let bits_per_value = data.bits_per_value;
730 let compressed = match bits_per_value {
731 8 => Self::bitpack_chunked::<u8>(data),
732 16 => Self::bitpack_chunked::<u16>(data),
733 32 => Self::bitpack_chunked::<u32>(data),
734 64 => Self::bitpack_chunked::<u64>(data),
735 _ => unreachable!(),
736 };
737 (compressed, ProtobufUtils::inline_bitpacking(bits_per_value))
738 }
739
740 fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
741 data: LanceBuffer,
742 num_values: u64,
743 ) -> Result<DataBlock> {
744 assert!(data.len() >= 8);
745 assert!(num_values <= ELEMS_PER_CHUNK);
746
747 let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
749 let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
750
751 let chunk_in_u8: Vec<u8> = data.to_vec();
753 let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
754 let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
755 let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
756
757 assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
759
760 unsafe {
761 BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
762 }
763
764 decompressed.truncate(num_values as usize);
765 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
766 data: LanceBuffer::reinterpret_vec(decompressed),
767 bits_per_value: uncompressed_bit_width as u64,
768 num_values,
769 block_info: BlockInfo::new(),
770 }))
771 }
772}
773
774impl MiniBlockCompressor for InlineBitpacking {
775 fn compress(
776 &self,
777 chunk: DataBlock,
778 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
779 match chunk {
780 DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
781 _ => Err(Error::InvalidInput {
782 source: format!(
783 "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
784 chunk.name()
785 )
786 .into(),
787 location: location!(),
788 }),
789 }
790 }
791}
792
793impl BlockCompressor for InlineBitpacking {
794 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
795 let fixed_width = data.as_fixed_width().unwrap();
796 let (chunked, _) = self.chunk_data(fixed_width);
797 Ok(chunked.data.into_iter().next().unwrap())
798 }
799}
800
801impl MiniBlockDecompressor for InlineBitpacking {
802 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
803 assert_eq!(data.len(), 1);
804 let data = data.into_iter().next().unwrap();
805 match self.uncompressed_bit_width {
806 8 => Self::unchunk::<u8>(data, num_values),
807 16 => Self::unchunk::<u16>(data, num_values),
808 32 => Self::unchunk::<u32>(data, num_values),
809 64 => Self::unchunk::<u64>(data, num_values),
810 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
811 }
812 }
813}
814
815impl BlockDecompressor for InlineBitpacking {
816 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
817 match self.uncompressed_bit_width {
818 8 => Self::unchunk::<u8>(data, num_values),
819 16 => Self::unchunk::<u16>(data, num_values),
820 32 => Self::unchunk::<u32>(data, num_values),
821 64 => Self::unchunk::<u64>(data, num_values),
822 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
823 }
824 }
825}
826
827fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
836 mut data: FixedWidthDataBlock,
837 bit_width: usize,
838) -> LanceBuffer {
839 let data_buffer = data.data.borrow_to_typed_slice::<T>();
840 let data_buffer = data_buffer.as_ref();
841
842 let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
843 let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
844 let words_per_chunk =
845 (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
846 #[allow(clippy::uninit_vec)]
847 let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
848 #[allow(clippy::uninit_vec)]
849 unsafe {
850 output.set_len(num_chunks * words_per_chunk);
851 }
852
853 let num_whole_chunks = if last_chunk_is_runt {
854 num_chunks - 1
855 } else {
856 num_chunks
857 };
858
859 for i in 0..num_whole_chunks {
861 let input_start = i * ELEMS_PER_CHUNK as usize;
862 let input_end = input_start + ELEMS_PER_CHUNK as usize;
863 let output_start = i * words_per_chunk;
864 let output_end = output_start + words_per_chunk;
865 unsafe {
866 BitPacking::unchecked_pack(
867 bit_width,
868 &data_buffer[input_start..input_end],
869 &mut output[output_start..output_end],
870 );
871 }
872 }
873
874 if !last_chunk_is_runt {
875 return LanceBuffer::reinterpret_vec(output);
876 }
877
878 let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
880 let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
881
882 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
883 last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
884 let output_start = num_whole_chunks * words_per_chunk;
885 unsafe {
886 BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
887 }
888
889 LanceBuffer::reinterpret_vec(output)
890}
891
892fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
894 mut data: FixedWidthDataBlock,
895 num_values: usize,
896 bits_per_value: usize,
897) -> FixedWidthDataBlock {
898 let words_per_chunk =
899 (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
900 let compressed_words = data.data.borrow_to_typed_slice::<T>();
901
902 let num_chunks = data.num_values as usize / words_per_chunk;
903 debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
904
905 #[allow(clippy::uninit_vec)]
907 let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
908 #[allow(clippy::uninit_vec)]
909 unsafe {
910 decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
911 }
912
913 for chunk_idx in 0..num_chunks {
914 let input_start = chunk_idx * words_per_chunk;
915 let input_end = input_start + words_per_chunk;
916 let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
917 let output_end = output_start + ELEMS_PER_CHUNK as usize;
918 unsafe {
919 BitPacking::unchecked_unpack(
920 bits_per_value,
921 &compressed_words[input_start..input_end],
922 &mut decompressed[output_start..output_end],
923 );
924 }
925 }
926
927 decompressed.truncate(num_values);
928
929 FixedWidthDataBlock {
930 data: LanceBuffer::reinterpret_vec(decompressed),
931 bits_per_value: data.bits_per_value,
932 num_values: num_values as u64,
933 block_info: BlockInfo::new(),
934 }
935}
936
937#[derive(Debug)]
963pub struct OutOfLineBitpacking {
964 compressed_bit_width: usize,
965}
966
967impl PerValueCompressor for OutOfLineBitpacking {
968 fn compress(
969 &self,
970 data: DataBlock,
971 ) -> Result<(crate::encoder::PerValueDataBlock, pb::ArrayEncoding)> {
972 let fixed_width = data.as_fixed_width().unwrap();
973 let num_values = fixed_width.num_values;
974 let word_size = fixed_width.bits_per_value;
975 let compressed = match word_size {
976 8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
977 16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
978 32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
979 64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
980 _ => panic!("Bitpacking word size must be 8,16,32,64"),
981 };
982 let compressed = FixedWidthDataBlock {
983 data: compressed,
984 bits_per_value: self.compressed_bit_width as u64,
985 num_values,
986 block_info: BlockInfo::new(),
987 };
988 let encoding =
989 ProtobufUtils::out_of_line_bitpacking(word_size, self.compressed_bit_width as u64);
990 Ok((PerValueDataBlock::Fixed(compressed), encoding))
991 }
992}
993
994impl FixedPerValueDecompressor for OutOfLineBitpacking {
995 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
996 let unpacked = match data.bits_per_value {
997 8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
998 16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
999 32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
1000 64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
1001 _ => panic!("Bitpacking word size must be 8,16,32,64"),
1002 };
1003 Ok(DataBlock::FixedWidth(unpacked))
1004 }
1005
1006 fn bits_per_value(&self) -> u64 {
1007 self.compressed_bit_width as u64
1008 }
1009}
1010
1011#[cfg(test)]
1012mod test {
1013 use std::{collections::HashMap, sync::Arc};
1014
1015 use arrow_array::{Int64Array, Int8Array};
1016
1017 use arrow_schema::DataType;
1018
1019 use arrow_array::Array;
1020
1021 use crate::{
1022 testing::{check_round_trip_encoding_of_data, TestCases},
1023 version::LanceFileVersion,
1024 };
1025
1026 #[test_log::test(tokio::test)]
1027 async fn test_miniblock_bitpack() {
1028 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1029
1030 let arrays = vec![
1031 Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
1032 Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
1033 Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
1034 Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
1035 Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
1036 ];
1037 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
1038
1039 for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
1040 let int64_arrays = vec![
1041 Int64Array::from(vec![3; 1024]),
1042 Int64Array::from(vec![8; 1024]),
1043 Int64Array::from(vec![16; 1024]),
1044 Int64Array::from(vec![100; 1024]),
1045 Int64Array::from(vec![512; 1024]),
1046 Int64Array::from(vec![1000; 1024]),
1047 Int64Array::from(vec![2000; 1024]),
1048 Int64Array::from(vec![-1; 10]),
1049 ];
1050
1051 let mut arrays = vec![];
1052 for int64_array in int64_arrays {
1053 arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
1054 }
1055 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
1056 }
1057 }
1058}