1use arrow_array::types::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use lance_bitpacking::BitPacking;
23use snafu::location;
24
25use lance_core::{Error, Result};
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{BlockCompressor, BlockDecompressor, MiniBlockDecompressor};
29use crate::data::BlockInfo;
30use crate::data::{DataBlock, FixedWidthDataBlock};
31use crate::encodings::logical::primitive::miniblock::{
32 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
33};
34use crate::format::pb21::CompressiveEncoding;
35use crate::format::{pb21, ProtobufUtils21};
36use crate::statistics::{GetStat, Stat};
37use bytemuck::{cast_slice, AnyBitPattern};
38
39const LOG_ELEMS_PER_CHUNK: u8 = 10;
40const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
41
42#[derive(Debug, Default)]
43pub struct InlineBitpacking {
44 uncompressed_bit_width: u64,
45}
46
47impl InlineBitpacking {
48 pub fn new(uncompressed_bit_width: u64) -> Self {
49 Self {
50 uncompressed_bit_width,
51 }
52 }
53
54 pub fn from_description(description: &pb21::InlineBitpacking) -> Self {
55 Self {
56 uncompressed_bit_width: description.uncompressed_bits_per_value,
57 }
58 }
59
60 pub fn min_size_bytes(compressed_bit_width: u64) -> u64 {
66 (ELEMS_PER_CHUNK * compressed_bit_width).div_ceil(8)
67 }
68
69 fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
75 data: FixedWidthDataBlock,
76 ) -> MiniBlockCompressed {
77 debug_assert!(data.num_values > 0);
78 let data_buffer = data.data.borrow_to_typed_slice::<T>();
79 let data_buffer = data_buffer.as_ref();
80
81 let bit_widths = data.expect_stat(Stat::BitWidth);
82 let bit_widths_array = bit_widths
83 .as_any()
84 .downcast_ref::<PrimitiveArray<UInt64Type>>()
85 .unwrap();
86
87 let (packed_chunk_sizes, total_size) = bit_widths_array
88 .values()
89 .iter()
90 .map(|&bit_width| {
91 let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
92 (chunk_size, chunk_size + 1)
93 })
94 .fold(
95 (Vec::with_capacity(bit_widths_array.len()), 0),
96 |(mut sizes, total), (size, inc)| {
97 sizes.push(size);
98 (sizes, total + inc)
99 },
100 );
101
102 let mut output: Vec<T> = Vec::with_capacity(total_size);
103 let mut chunks = Vec::with_capacity(bit_widths_array.len());
104
105 for (i, packed_chunk_size) in packed_chunk_sizes
106 .iter()
107 .enumerate()
108 .take(bit_widths_array.len() - 1)
109 {
110 let start_elem = i * ELEMS_PER_CHUNK as usize;
111 let bit_width = bit_widths_array.value(i) as usize;
112 output.push(T::from_usize(bit_width).unwrap());
113 let output_len = output.len();
114 unsafe {
115 output.set_len(output_len + *packed_chunk_size);
116 BitPacking::unchecked_pack(
117 bit_width,
118 &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
119 &mut output[output_len..][..*packed_chunk_size],
120 );
121 }
122 chunks.push(MiniBlockChunk {
123 buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
124 log_num_values: LOG_ELEMS_PER_CHUNK,
125 });
126 }
127
128 let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
130 ELEMS_PER_CHUNK
131 } else {
132 data.num_values % ELEMS_PER_CHUNK
133 };
134 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
135 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
136 &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
137 );
138 let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
139 output.push(T::from_usize(bit_width).unwrap());
140 let output_len = output.len();
141 unsafe {
142 output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
143 BitPacking::unchecked_pack(
144 bit_width,
145 &last_chunk,
146 &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
147 );
148 }
149 chunks.push(MiniBlockChunk {
150 buffer_sizes: vec![
151 ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
152 as u16,
153 ],
154 log_num_values: 0,
155 });
156
157 MiniBlockCompressed {
158 data: vec![LanceBuffer::reinterpret_vec(output)],
159 chunks,
160 num_values: data.num_values,
161 }
162 }
163
164 fn chunk_data(&self, data: FixedWidthDataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
165 assert!(data.bits_per_value % 8 == 0);
166 assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
167 let bits_per_value = data.bits_per_value;
168 let compressed = match bits_per_value {
169 8 => Self::bitpack_chunked::<u8>(data),
170 16 => Self::bitpack_chunked::<u16>(data),
171 32 => Self::bitpack_chunked::<u32>(data),
172 64 => Self::bitpack_chunked::<u64>(data),
173 _ => unreachable!(),
174 };
175 (
176 compressed,
177 ProtobufUtils21::inline_bitpacking(
178 bits_per_value,
179 None,
181 ),
182 )
183 }
184
185 fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
186 data: LanceBuffer,
187 num_values: u64,
188 ) -> Result<DataBlock> {
189 assert!(data.len() >= std::mem::size_of::<T>());
191 assert!(num_values <= ELEMS_PER_CHUNK);
192
193 let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
195 let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
196
197 let chunk_in_u8: Vec<u8> = data.to_vec();
199 let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
200 let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
201 let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
202 assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
204 unsafe {
205 BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
206 }
207
208 decompressed.truncate(num_values as usize);
209 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
210 data: LanceBuffer::reinterpret_vec(decompressed),
211 bits_per_value: uncompressed_bit_width as u64,
212 num_values,
213 block_info: BlockInfo::new(),
214 }))
215 }
216}
217
218impl MiniBlockCompressor for InlineBitpacking {
219 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
220 match chunk {
221 DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
222 _ => Err(Error::InvalidInput {
223 source: format!(
224 "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
225 chunk.name()
226 )
227 .into(),
228 location: location!(),
229 }),
230 }
231 }
232}
233
234impl BlockCompressor for InlineBitpacking {
235 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
236 let fixed_width = data.as_fixed_width().unwrap();
237 let (chunked, _) = self.chunk_data(fixed_width);
238 Ok(chunked.data.into_iter().next().unwrap())
239 }
240}
241
242impl MiniBlockDecompressor for InlineBitpacking {
243 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
244 assert_eq!(data.len(), 1);
245 let data = data.into_iter().next().unwrap();
246 match self.uncompressed_bit_width {
247 8 => Self::unchunk::<u8>(data, num_values),
248 16 => Self::unchunk::<u16>(data, num_values),
249 32 => Self::unchunk::<u32>(data, num_values),
250 64 => Self::unchunk::<u64>(data, num_values),
251 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
252 }
253 }
254}
255
256impl BlockDecompressor for InlineBitpacking {
257 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
258 match self.uncompressed_bit_width {
259 8 => Self::unchunk::<u8>(data, num_values),
260 16 => Self::unchunk::<u16>(data, num_values),
261 32 => Self::unchunk::<u32>(data, num_values),
262 64 => Self::unchunk::<u64>(data, num_values),
263 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
264 }
265 }
266}
267
268fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
274 data: FixedWidthDataBlock,
275 compressed_bits_per_value: usize,
276) -> LanceBuffer {
277 let data_buffer = data.data.borrow_to_typed_slice::<T>();
278 let data_buffer = data_buffer.as_ref();
279
280 let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
281 let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
282 let words_per_chunk = (ELEMS_PER_CHUNK as usize * compressed_bits_per_value)
283 .div_ceil(data.bits_per_value as usize);
284 #[allow(clippy::uninit_vec)]
285 let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
286 #[allow(clippy::uninit_vec)]
287 unsafe {
288 output.set_len(num_chunks * words_per_chunk);
289 }
290
291 let num_whole_chunks = if last_chunk_is_runt {
292 num_chunks - 1
293 } else {
294 num_chunks
295 };
296
297 for i in 0..num_whole_chunks {
299 let input_start = i * ELEMS_PER_CHUNK as usize;
300 let input_end = input_start + ELEMS_PER_CHUNK as usize;
301 let output_start = i * words_per_chunk;
302 let output_end = output_start + words_per_chunk;
303 unsafe {
304 BitPacking::unchecked_pack(
305 compressed_bits_per_value,
306 &data_buffer[input_start..input_end],
307 &mut output[output_start..output_end],
308 );
309 }
310 }
311
312 if !last_chunk_is_runt {
313 return LanceBuffer::reinterpret_vec(output);
314 }
315
316 let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
317 unsafe {
319 output.set_len(num_whole_chunks * words_per_chunk);
320 }
321 let remaining_items = data_buffer.len() - last_chunk_start;
322
323 let uncompressed_bits = data.bits_per_value as usize;
324 let tail_bit_savings = uncompressed_bits.saturating_sub(compressed_bits_per_value);
325 let padding_cost = compressed_bits_per_value * (ELEMS_PER_CHUNK as usize - remaining_items);
326 let tail_pack_savings = tail_bit_savings.saturating_mul(remaining_items);
327 debug_assert!(remaining_items > 0, "remaining_items must be non-zero");
328 debug_assert!(tail_bit_savings > 0, "tail_bit_savings must be non-zero");
329
330 if padding_cost < tail_pack_savings {
331 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
333 last_chunk[..remaining_items].copy_from_slice(&data_buffer[last_chunk_start..]);
334 let start = output.len();
335 unsafe {
336 output.set_len(start + words_per_chunk);
338 BitPacking::unchecked_pack(
339 compressed_bits_per_value,
340 &last_chunk,
341 &mut output[start..start + words_per_chunk],
342 );
343 }
344 } else {
345 output.extend_from_slice(&data_buffer[last_chunk_start..]);
347 }
348
349 LanceBuffer::reinterpret_vec(output)
350}
351
352fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
358 data: FixedWidthDataBlock,
359 num_values: usize,
360 compressed_bits_per_value: usize,
361) -> FixedWidthDataBlock {
362 let words_per_chunk = (ELEMS_PER_CHUNK as usize * compressed_bits_per_value)
363 .div_ceil(data.bits_per_value as usize);
364 let compressed_words = data.data.borrow_to_typed_slice::<T>();
365
366 let num_whole_chunks = num_values / ELEMS_PER_CHUNK as usize;
367 let tail_values = num_values % ELEMS_PER_CHUNK as usize;
368 let expected_full_words = num_whole_chunks * words_per_chunk;
369 let expected_new_len = expected_full_words + tail_values;
370 let tail_is_raw = tail_values > 0 && compressed_words.len() == expected_new_len;
371
372 let extra_tail_capacity = ELEMS_PER_CHUNK as usize;
373 #[allow(clippy::uninit_vec)]
374 let mut decompressed: Vec<T> =
375 Vec::with_capacity(num_values.saturating_add(extra_tail_capacity));
376 let chunk_value_len = num_whole_chunks * ELEMS_PER_CHUNK as usize;
377 unsafe {
378 decompressed.set_len(chunk_value_len);
379 }
380
381 for chunk_idx in 0..num_whole_chunks {
382 let input_start = chunk_idx * words_per_chunk;
383 let input_end = input_start + words_per_chunk;
384 let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
385 let output_end = output_start + ELEMS_PER_CHUNK as usize;
386 unsafe {
387 BitPacking::unchecked_unpack(
388 compressed_bits_per_value,
389 &compressed_words[input_start..input_end],
390 &mut decompressed[output_start..output_end],
391 );
392 }
393 }
394
395 if tail_values > 0 {
396 if tail_is_raw {
399 let tail_start = expected_full_words;
400 decompressed.extend_from_slice(&compressed_words[tail_start..tail_start + tail_values]);
401 } else {
402 let tail_start = expected_full_words;
403 let output_start = decompressed.len();
404 unsafe {
405 decompressed.set_len(output_start + ELEMS_PER_CHUNK as usize);
406 }
407 unsafe {
408 BitPacking::unchecked_unpack(
409 compressed_bits_per_value,
410 &compressed_words[tail_start..tail_start + words_per_chunk],
411 &mut decompressed[output_start..output_start + ELEMS_PER_CHUNK as usize],
412 );
413 }
414 decompressed.truncate(output_start + tail_values);
415 }
416 }
417
418 debug_assert_eq!(decompressed.len(), num_values);
419
420 FixedWidthDataBlock {
421 data: LanceBuffer::reinterpret_vec(decompressed),
422 bits_per_value: data.bits_per_value,
423 num_values: num_values as u64,
424 block_info: BlockInfo::new(),
425 }
426}
427
428#[derive(Debug)]
454pub struct OutOfLineBitpacking {
455 compressed_bit_width: u64,
456 uncompressed_bit_width: u64,
457}
458
459impl OutOfLineBitpacking {
460 pub fn new(compressed_bit_width: u64, uncompressed_bit_width: u64) -> Self {
461 Self {
462 compressed_bit_width,
463 uncompressed_bit_width,
464 }
465 }
466}
467
468impl BlockCompressor for OutOfLineBitpacking {
469 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
470 let fixed_width = data.as_fixed_width().unwrap();
471 let compressed = match fixed_width.bits_per_value {
472 8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width as usize),
473 16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width as usize),
474 32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width as usize),
475 64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width as usize),
476 _ => panic!("Bitpacking word size must be 8,16,32,64"),
477 };
478 Ok(compressed)
479 }
480}
481
482impl BlockDecompressor for OutOfLineBitpacking {
483 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
484 let word_size = match self.uncompressed_bit_width {
485 8 => std::mem::size_of::<u8>(),
486 16 => std::mem::size_of::<u16>(),
487 32 => std::mem::size_of::<u32>(),
488 64 => std::mem::size_of::<u64>(),
489 _ => panic!("Bitpacking word size must be 8,16,32,64"),
490 };
491 debug_assert_eq!(data.len() % word_size, 0);
492 let total_words = (data.len() / word_size) as u64;
493 let block = FixedWidthDataBlock {
494 data,
495 bits_per_value: self.uncompressed_bit_width,
496 num_values: total_words,
497 block_info: BlockInfo::new(),
498 };
499
500 let unpacked = match self.uncompressed_bit_width {
501 8 => unpack_out_of_line::<u8>(
502 block,
503 num_values as usize,
504 self.compressed_bit_width as usize,
505 ),
506 16 => unpack_out_of_line::<u16>(
507 block,
508 num_values as usize,
509 self.compressed_bit_width as usize,
510 ),
511 32 => unpack_out_of_line::<u32>(
512 block,
513 num_values as usize,
514 self.compressed_bit_width as usize,
515 ),
516 64 => unpack_out_of_line::<u64>(
517 block,
518 num_values as usize,
519 self.compressed_bit_width as usize,
520 ),
521 _ => unreachable!(),
522 };
523 Ok(DataBlock::FixedWidth(unpacked))
524 }
525}
526
527#[cfg(test)]
528mod test {
529 use std::{collections::HashMap, sync::Arc};
530
531 use arrow_array::{Array, Int64Array, Int8Array};
532 use arrow_schema::DataType;
533
534 use super::{bitpack_out_of_line, unpack_out_of_line, ELEMS_PER_CHUNK};
535 use crate::{
536 buffer::LanceBuffer,
537 data::{BlockInfo, FixedWidthDataBlock},
538 testing::{check_round_trip_encoding_of_data, TestCases},
539 version::LanceFileVersion,
540 };
541
542 #[test_log::test(tokio::test)]
543 async fn test_miniblock_bitpack() {
544 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
545
546 let arrays = vec![
547 Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
548 Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
549 Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
550 Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
551 Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
552 ];
553 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
554
555 for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
556 let int64_arrays = vec![
557 Int64Array::from(vec![3; 1024]),
558 Int64Array::from(vec![8; 1024]),
559 Int64Array::from(vec![16; 1024]),
560 Int64Array::from(vec![100; 1024]),
561 Int64Array::from(vec![512; 1024]),
562 Int64Array::from(vec![1000; 1024]),
563 Int64Array::from(vec![2000; 1024]),
564 Int64Array::from(vec![-1; 10]),
565 ];
566
567 let mut arrays = vec![];
568 for int64_array in int64_arrays {
569 arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
570 }
571
572 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
573 }
574 }
575
576 #[test_log::test(tokio::test)]
577 async fn test_bitpack_encoding_verification() {
578 use arrow_array::Int32Array;
579
580 let test_cases = TestCases::default()
582 .with_expected_encoding("inline_bitpacking")
583 .with_min_file_version(LanceFileVersion::V2_1);
584
585 let mut values = Vec::new();
588 for i in 0..2048 {
589 values.push(i % 16); }
591
592 let arrays = vec![Arc::new(Int32Array::from(values)) as Arc<dyn Array>];
593
594 let mut metadata = HashMap::new();
596 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
597
598 check_round_trip_encoding_of_data(arrays, &test_cases, metadata.clone()).await;
599 }
600
601 #[test_log::test(tokio::test)]
602 async fn test_miniblock_bitpack_zero_chunk_selection() {
603 use arrow_array::Int32Array;
604
605 let test_cases = TestCases::default()
606 .with_expected_encoding("inline_bitpacking")
607 .with_min_file_version(LanceFileVersion::V2_1);
608
609 let mut vals = vec![0i32; 1024];
612 for i in 0..1024 {
613 vals.push(i % 16);
614 }
615
616 let arrays = vec![Arc::new(Int32Array::from(vals)) as Arc<dyn Array>];
617
618 let mut metadata = HashMap::new();
620 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
621 metadata.insert("lance-encoding:rle-threshold".to_string(), "0".to_string());
622
623 check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
624 }
625
626 #[test]
627 fn test_out_of_line_bitpack_raw_tail_roundtrip() {
628 let bit_width = 8usize;
629 let word_bits = std::mem::size_of::<u32>() as u64 * 8;
630 let values: Vec<u32> = (0..1025).map(|i| (i % 200) as u32).collect();
631 let input = FixedWidthDataBlock {
632 data: LanceBuffer::reinterpret_vec(values.clone()),
633 bits_per_value: word_bits,
634 num_values: values.len() as u64,
635 block_info: BlockInfo::new(),
636 };
637
638 let compressed = bitpack_out_of_line::<u32>(input, bit_width);
639 let compressed_words = compressed.borrow_to_typed_slice::<u32>().to_vec();
640 let words_per_chunk = (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(word_bits as usize);
641 assert_eq!(
642 compressed_words.len(),
643 words_per_chunk + (values.len() - ELEMS_PER_CHUNK as usize),
644 );
645
646 let compressed_block = FixedWidthDataBlock {
647 data: LanceBuffer::reinterpret_vec(compressed_words.clone()),
648 bits_per_value: word_bits,
649 num_values: compressed_words.len() as u64,
650 block_info: BlockInfo::new(),
651 };
652
653 let decoded = unpack_out_of_line::<u32>(compressed_block, values.len(), bit_width);
654 let decoded_values = decoded.data.borrow_to_typed_slice::<u32>();
655 assert_eq!(decoded_values.as_ref(), values.as_slice());
656 }
657}