1use arrow_array::types::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use lance_bitpacking::BitPacking;
23
24use lance_core::{Error, Result};
25
26use crate::buffer::LanceBuffer;
27use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor};
28use crate::data::BlockInfo;
29use crate::data::{DataBlock, FixedWidthDataBlock};
30use crate::encodings::logical::primitive::miniblock::{
31 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
32};
33use crate::format::pb21::CompressiveEncoding;
34use crate::format::{ProtobufUtils21, pb21};
35use crate::statistics::{GetStat, Stat};
36use bytemuck::{AnyBitPattern, cast_slice};
37
38const LOG_ELEMS_PER_CHUNK: u8 = 10;
39const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
40
41#[derive(Debug, Default)]
42pub struct InlineBitpacking {
43 uncompressed_bit_width: u64,
44}
45
46impl InlineBitpacking {
47 pub fn new(uncompressed_bit_width: u64) -> Self {
48 Self {
49 uncompressed_bit_width,
50 }
51 }
52
53 pub fn from_description(description: &pb21::InlineBitpacking) -> Self {
54 Self {
55 uncompressed_bit_width: description.uncompressed_bits_per_value,
56 }
57 }
58
59 pub fn min_size_bytes(compressed_bit_width: u64) -> u64 {
65 (ELEMS_PER_CHUNK * compressed_bit_width).div_ceil(8)
66 }
67
68 fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
74 data: FixedWidthDataBlock,
75 ) -> MiniBlockCompressed {
76 debug_assert!(data.num_values > 0);
77 let data_buffer = data.data.borrow_to_typed_slice::<T>();
78 let data_buffer = data_buffer.as_ref();
79
80 let bit_widths = data.expect_stat(Stat::BitWidth);
81 let bit_widths_array = bit_widths
82 .as_any()
83 .downcast_ref::<PrimitiveArray<UInt64Type>>()
84 .unwrap();
85
86 let (packed_chunk_sizes, total_size) = bit_widths_array
87 .values()
88 .iter()
89 .map(|&bit_width| {
90 let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
91 (chunk_size, chunk_size + 1)
92 })
93 .fold(
94 (Vec::with_capacity(bit_widths_array.len()), 0),
95 |(mut sizes, total), (size, inc)| {
96 sizes.push(size);
97 (sizes, total + inc)
98 },
99 );
100
101 let mut output: Vec<T> = Vec::with_capacity(total_size);
102 let mut chunks = Vec::with_capacity(bit_widths_array.len());
103
104 for (i, packed_chunk_size) in packed_chunk_sizes
105 .iter()
106 .enumerate()
107 .take(bit_widths_array.len() - 1)
108 {
109 let start_elem = i * ELEMS_PER_CHUNK as usize;
110 let bit_width = bit_widths_array.value(i) as usize;
111 output.push(T::from_usize(bit_width).unwrap());
112 let output_len = output.len();
113 unsafe {
114 output.set_len(output_len + *packed_chunk_size);
115 BitPacking::unchecked_pack(
116 bit_width,
117 &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
118 &mut output[output_len..][..*packed_chunk_size],
119 );
120 }
121 chunks.push(MiniBlockChunk {
122 buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u32],
123 log_num_values: LOG_ELEMS_PER_CHUNK,
124 });
125 }
126
127 let last_chunk_elem_num = if data.num_values.is_multiple_of(ELEMS_PER_CHUNK) {
129 ELEMS_PER_CHUNK
130 } else {
131 data.num_values % ELEMS_PER_CHUNK
132 };
133 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
134 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
135 &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
136 );
137 let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
138 output.push(T::from_usize(bit_width).unwrap());
139 let output_len = output.len();
140 unsafe {
141 output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
142 BitPacking::unchecked_pack(
143 bit_width,
144 &last_chunk,
145 &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
146 );
147 }
148 chunks.push(MiniBlockChunk {
149 buffer_sizes: vec![
150 ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
151 as u32,
152 ],
153 log_num_values: 0,
154 });
155
156 MiniBlockCompressed {
157 data: vec![LanceBuffer::reinterpret_vec(output)],
158 chunks,
159 num_values: data.num_values,
160 }
161 }
162
163 fn chunk_data(&self, data: FixedWidthDataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
164 assert!(data.bits_per_value.is_multiple_of(8));
165 assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
166 let bits_per_value = data.bits_per_value;
167 let compressed = match bits_per_value {
168 8 => Self::bitpack_chunked::<u8>(data),
169 16 => Self::bitpack_chunked::<u16>(data),
170 32 => Self::bitpack_chunked::<u32>(data),
171 64 => Self::bitpack_chunked::<u64>(data),
172 _ => unreachable!(),
173 };
174 (
175 compressed,
176 ProtobufUtils21::inline_bitpacking(
177 bits_per_value,
178 None,
180 ),
181 )
182 }
183
184 fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
185 data: LanceBuffer,
186 num_values: u64,
187 ) -> Result<DataBlock> {
188 assert!(data.len() >= std::mem::size_of::<T>());
190 assert!(num_values <= ELEMS_PER_CHUNK);
191
192 let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
194 let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
195
196 let chunk_in_u8: Vec<u8> = data.to_vec();
198 let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
199 let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
200 let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
201 assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
203 unsafe {
204 BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
205 }
206
207 decompressed.truncate(num_values as usize);
208 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
209 data: LanceBuffer::reinterpret_vec(decompressed),
210 bits_per_value: uncompressed_bit_width as u64,
211 num_values,
212 block_info: BlockInfo::new(),
213 }))
214 }
215}
216
217impl MiniBlockCompressor for InlineBitpacking {
218 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
219 match chunk {
220 DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
221 _ => Err(Error::invalid_input_source(
222 format!(
223 "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
224 chunk.name()
225 )
226 .into(),
227 )),
228 }
229 }
230}
231
232impl BlockCompressor for InlineBitpacking {
233 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
234 let fixed_width = data.as_fixed_width().unwrap();
235 let (chunked, _) = self.chunk_data(fixed_width);
236 Ok(chunked.data.into_iter().next().unwrap())
237 }
238}
239
240impl MiniBlockDecompressor for InlineBitpacking {
241 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
242 assert_eq!(data.len(), 1);
243 let data = data.into_iter().next().unwrap();
244 match self.uncompressed_bit_width {
245 8 => Self::unchunk::<u8>(data, num_values),
246 16 => Self::unchunk::<u16>(data, num_values),
247 32 => Self::unchunk::<u32>(data, num_values),
248 64 => Self::unchunk::<u64>(data, num_values),
249 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
250 }
251 }
252}
253
254impl BlockDecompressor for InlineBitpacking {
255 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
256 match self.uncompressed_bit_width {
257 8 => Self::unchunk::<u8>(data, num_values),
258 16 => Self::unchunk::<u16>(data, num_values),
259 32 => Self::unchunk::<u32>(data, num_values),
260 64 => Self::unchunk::<u64>(data, num_values),
261 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
262 }
263 }
264}
265
266fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
272 data: FixedWidthDataBlock,
273 compressed_bits_per_value: usize,
274) -> LanceBuffer {
275 let data_buffer = data.data.borrow_to_typed_slice::<T>();
276 let data_buffer = data_buffer.as_ref();
277
278 let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
279 let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
280 let words_per_chunk = (ELEMS_PER_CHUNK as usize * compressed_bits_per_value)
281 .div_ceil(data.bits_per_value as usize);
282 #[allow(clippy::uninit_vec)]
283 let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
284 #[allow(clippy::uninit_vec)]
285 unsafe {
286 output.set_len(num_chunks * words_per_chunk);
287 }
288
289 let num_whole_chunks = if last_chunk_is_runt {
290 num_chunks - 1
291 } else {
292 num_chunks
293 };
294
295 for i in 0..num_whole_chunks {
297 let input_start = i * ELEMS_PER_CHUNK as usize;
298 let input_end = input_start + ELEMS_PER_CHUNK as usize;
299 let output_start = i * words_per_chunk;
300 let output_end = output_start + words_per_chunk;
301 unsafe {
302 BitPacking::unchecked_pack(
303 compressed_bits_per_value,
304 &data_buffer[input_start..input_end],
305 &mut output[output_start..output_end],
306 );
307 }
308 }
309
310 if !last_chunk_is_runt {
311 return LanceBuffer::reinterpret_vec(output);
312 }
313
314 let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
315 unsafe {
317 output.set_len(num_whole_chunks * words_per_chunk);
318 }
319 let remaining_items = data_buffer.len() - last_chunk_start;
320
321 let uncompressed_bits = data.bits_per_value as usize;
322 let tail_bit_savings = uncompressed_bits.saturating_sub(compressed_bits_per_value);
323 let padding_cost = compressed_bits_per_value * (ELEMS_PER_CHUNK as usize - remaining_items);
324 let tail_pack_savings = tail_bit_savings.saturating_mul(remaining_items);
325 debug_assert!(remaining_items > 0, "remaining_items must be non-zero");
326 debug_assert!(tail_bit_savings > 0, "tail_bit_savings must be non-zero");
327
328 if padding_cost < tail_pack_savings {
329 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
331 last_chunk[..remaining_items].copy_from_slice(&data_buffer[last_chunk_start..]);
332 let start = output.len();
333 unsafe {
334 output.set_len(start + words_per_chunk);
336 BitPacking::unchecked_pack(
337 compressed_bits_per_value,
338 &last_chunk,
339 &mut output[start..start + words_per_chunk],
340 );
341 }
342 } else {
343 output.extend_from_slice(&data_buffer[last_chunk_start..]);
345 }
346
347 LanceBuffer::reinterpret_vec(output)
348}
349
350fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
356 data: FixedWidthDataBlock,
357 num_values: usize,
358 compressed_bits_per_value: usize,
359) -> FixedWidthDataBlock {
360 let words_per_chunk = (ELEMS_PER_CHUNK as usize * compressed_bits_per_value)
361 .div_ceil(data.bits_per_value as usize);
362 let compressed_words = data.data.borrow_to_typed_slice::<T>();
363
364 let num_whole_chunks = num_values / ELEMS_PER_CHUNK as usize;
365 let tail_values = num_values % ELEMS_PER_CHUNK as usize;
366 let expected_full_words = num_whole_chunks * words_per_chunk;
367 let expected_new_len = expected_full_words + tail_values;
368 let tail_is_raw = tail_values > 0 && compressed_words.len() == expected_new_len;
369
370 let extra_tail_capacity = ELEMS_PER_CHUNK as usize;
371 #[allow(clippy::uninit_vec)]
372 let mut decompressed: Vec<T> =
373 Vec::with_capacity(num_values.saturating_add(extra_tail_capacity));
374 let chunk_value_len = num_whole_chunks * ELEMS_PER_CHUNK as usize;
375 unsafe {
376 decompressed.set_len(chunk_value_len);
377 }
378
379 for chunk_idx in 0..num_whole_chunks {
380 let input_start = chunk_idx * words_per_chunk;
381 let input_end = input_start + words_per_chunk;
382 let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
383 let output_end = output_start + ELEMS_PER_CHUNK as usize;
384 unsafe {
385 BitPacking::unchecked_unpack(
386 compressed_bits_per_value,
387 &compressed_words[input_start..input_end],
388 &mut decompressed[output_start..output_end],
389 );
390 }
391 }
392
393 if tail_values > 0 {
394 if tail_is_raw {
397 let tail_start = expected_full_words;
398 decompressed.extend_from_slice(&compressed_words[tail_start..tail_start + tail_values]);
399 } else {
400 let tail_start = expected_full_words;
401 let output_start = decompressed.len();
402 unsafe {
403 decompressed.set_len(output_start + ELEMS_PER_CHUNK as usize);
404 }
405 unsafe {
406 BitPacking::unchecked_unpack(
407 compressed_bits_per_value,
408 &compressed_words[tail_start..tail_start + words_per_chunk],
409 &mut decompressed[output_start..output_start + ELEMS_PER_CHUNK as usize],
410 );
411 }
412 decompressed.truncate(output_start + tail_values);
413 }
414 }
415
416 debug_assert_eq!(decompressed.len(), num_values);
417
418 FixedWidthDataBlock {
419 data: LanceBuffer::reinterpret_vec(decompressed),
420 bits_per_value: data.bits_per_value,
421 num_values: num_values as u64,
422 block_info: BlockInfo::new(),
423 }
424}
425
426#[derive(Debug)]
452pub struct OutOfLineBitpacking {
453 compressed_bit_width: u64,
454 uncompressed_bit_width: u64,
455}
456
457impl OutOfLineBitpacking {
458 pub fn new(compressed_bit_width: u64, uncompressed_bit_width: u64) -> Self {
459 Self {
460 compressed_bit_width,
461 uncompressed_bit_width,
462 }
463 }
464}
465
466impl BlockCompressor for OutOfLineBitpacking {
467 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
468 let fixed_width = data.as_fixed_width().unwrap();
469 let compressed = match fixed_width.bits_per_value {
470 8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width as usize),
471 16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width as usize),
472 32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width as usize),
473 64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width as usize),
474 _ => panic!("Bitpacking word size must be 8,16,32,64"),
475 };
476 Ok(compressed)
477 }
478}
479
480impl BlockDecompressor for OutOfLineBitpacking {
481 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
482 let word_size = match self.uncompressed_bit_width {
483 8 => std::mem::size_of::<u8>(),
484 16 => std::mem::size_of::<u16>(),
485 32 => std::mem::size_of::<u32>(),
486 64 => std::mem::size_of::<u64>(),
487 _ => panic!("Bitpacking word size must be 8,16,32,64"),
488 };
489 debug_assert_eq!(data.len() % word_size, 0);
490 let total_words = (data.len() / word_size) as u64;
491 let block = FixedWidthDataBlock {
492 data,
493 bits_per_value: self.uncompressed_bit_width,
494 num_values: total_words,
495 block_info: BlockInfo::new(),
496 };
497
498 let unpacked = match self.uncompressed_bit_width {
499 8 => unpack_out_of_line::<u8>(
500 block,
501 num_values as usize,
502 self.compressed_bit_width as usize,
503 ),
504 16 => unpack_out_of_line::<u16>(
505 block,
506 num_values as usize,
507 self.compressed_bit_width as usize,
508 ),
509 32 => unpack_out_of_line::<u32>(
510 block,
511 num_values as usize,
512 self.compressed_bit_width as usize,
513 ),
514 64 => unpack_out_of_line::<u64>(
515 block,
516 num_values as usize,
517 self.compressed_bit_width as usize,
518 ),
519 _ => unreachable!(),
520 };
521 Ok(DataBlock::FixedWidth(unpacked))
522 }
523}
524
525#[cfg(test)]
526mod test {
527 use std::{collections::HashMap, sync::Arc};
528
529 use arrow_array::{Array, Int8Array, Int64Array};
530 use arrow_schema::DataType;
531
532 use super::{ELEMS_PER_CHUNK, bitpack_out_of_line, unpack_out_of_line};
533 use crate::{
534 buffer::LanceBuffer,
535 data::{BlockInfo, FixedWidthDataBlock},
536 testing::{TestCases, check_round_trip_encoding_of_data},
537 version::LanceFileVersion,
538 };
539
540 #[test_log::test(tokio::test)]
541 async fn test_miniblock_bitpack() {
542 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
543
544 let arrays = vec![
545 Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
546 Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
547 Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
548 Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
549 Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
550 ];
551 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
552
553 for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
554 let int64_arrays = vec![
555 Int64Array::from(vec![3; 1024]),
556 Int64Array::from(vec![8; 1024]),
557 Int64Array::from(vec![16; 1024]),
558 Int64Array::from(vec![100; 1024]),
559 Int64Array::from(vec![512; 1024]),
560 Int64Array::from(vec![1000; 1024]),
561 Int64Array::from(vec![2000; 1024]),
562 Int64Array::from(vec![-1; 10]),
563 ];
564
565 let mut arrays = vec![];
566 for int64_array in int64_arrays {
567 arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
568 }
569
570 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
571 }
572 }
573
574 #[test_log::test(tokio::test)]
575 async fn test_bitpack_encoding_verification() {
576 use arrow_array::Int32Array;
577
578 let test_cases = TestCases::default()
580 .with_expected_encoding("inline_bitpacking")
581 .with_min_file_version(LanceFileVersion::V2_1);
582
583 let mut values = Vec::new();
586 for i in 0..2048 {
587 values.push(i % 16); }
589
590 let arrays = vec![Arc::new(Int32Array::from(values)) as Arc<dyn Array>];
591
592 let mut metadata = HashMap::new();
594 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
595
596 check_round_trip_encoding_of_data(arrays, &test_cases, metadata.clone()).await;
597 }
598
599 #[test_log::test(tokio::test)]
600 async fn test_miniblock_bitpack_zero_chunk_selection() {
601 use arrow_array::Int32Array;
602
603 let test_cases = TestCases::default()
604 .with_expected_encoding("inline_bitpacking")
605 .with_min_file_version(LanceFileVersion::V2_1);
606
607 let mut vals = vec![0i32; 1024];
610 for i in 0..1024 {
611 vals.push(i % 16);
612 }
613
614 let arrays = vec![Arc::new(Int32Array::from(vals)) as Arc<dyn Array>];
615
616 let mut metadata = HashMap::new();
618 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
619 metadata.insert("lance-encoding:rle-threshold".to_string(), "0".to_string());
620
621 check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
622 }
623
624 #[test]
625 fn test_out_of_line_bitpack_raw_tail_roundtrip() {
626 let bit_width = 8usize;
627 let word_bits = std::mem::size_of::<u32>() as u64 * 8;
628 let values: Vec<u32> = (0..1025).map(|i| (i % 200) as u32).collect();
629 let input = FixedWidthDataBlock {
630 data: LanceBuffer::reinterpret_vec(values.clone()),
631 bits_per_value: word_bits,
632 num_values: values.len() as u64,
633 block_info: BlockInfo::new(),
634 };
635
636 let compressed = bitpack_out_of_line::<u32>(input, bit_width);
637 let compressed_words = compressed.borrow_to_typed_slice::<u32>().to_vec();
638 let words_per_chunk = (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(word_bits as usize);
639 assert_eq!(
640 compressed_words.len(),
641 words_per_chunk + (values.len() - ELEMS_PER_CHUNK as usize),
642 );
643
644 let compressed_block = FixedWidthDataBlock {
645 data: LanceBuffer::reinterpret_vec(compressed_words.clone()),
646 bits_per_value: word_bits,
647 num_values: compressed_words.len() as u64,
648 block_info: BlockInfo::new(),
649 };
650
651 let decoded = unpack_out_of_line::<u32>(compressed_block, values.len(), bit_width);
652 let decoded_values = decoded.data.borrow_to_typed_slice::<u32>();
653 assert_eq!(decoded_values.as_ref(), values.as_slice());
654 }
655}