1use arrow_array::types::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use lance_bitpacking::BitPacking;
23use snafu::location;
24
25use lance_core::{Error, Result};
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{
29 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
30};
31use crate::data::BlockInfo;
32use crate::data::{DataBlock, FixedWidthDataBlock};
33use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
34use crate::encodings::logical::primitive::miniblock::{
35 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
36};
37use crate::format::pb21::CompressiveEncoding;
38use crate::format::{pb21, ProtobufUtils21};
39use crate::statistics::{GetStat, Stat};
40use bytemuck::{cast_slice, AnyBitPattern};
41
42const LOG_ELEMS_PER_CHUNK: u8 = 10;
43const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
44
45#[derive(Debug, Default)]
46pub struct InlineBitpacking {
47 uncompressed_bit_width: u64,
48}
49
50impl InlineBitpacking {
51 pub fn new(uncompressed_bit_width: u64) -> Self {
52 Self {
53 uncompressed_bit_width,
54 }
55 }
56
57 pub fn from_description(description: &pb21::InlineBitpacking) -> Self {
58 Self {
59 uncompressed_bit_width: description.uncompressed_bits_per_value,
60 }
61 }
62
63 pub fn min_size_bytes(bit_width: u64) -> u64 {
64 (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
65 }
66
67 fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
73 data: FixedWidthDataBlock,
74 ) -> MiniBlockCompressed {
75 let data_buffer = data.data.borrow_to_typed_slice::<T>();
76 let data_buffer = data_buffer.as_ref();
77
78 let bit_widths = data.expect_stat(Stat::BitWidth);
79 let bit_widths_array = bit_widths
80 .as_any()
81 .downcast_ref::<PrimitiveArray<UInt64Type>>()
82 .unwrap();
83
84 let (packed_chunk_sizes, total_size) = bit_widths_array
85 .values()
86 .iter()
87 .map(|&bit_width| {
88 let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
89 (chunk_size, chunk_size + 1)
90 })
91 .fold(
92 (Vec::with_capacity(bit_widths_array.len()), 0),
93 |(mut sizes, total), (size, inc)| {
94 sizes.push(size);
95 (sizes, total + inc)
96 },
97 );
98
99 let mut output: Vec<T> = Vec::with_capacity(total_size);
100 let mut chunks = Vec::with_capacity(bit_widths_array.len());
101
102 for (i, packed_chunk_size) in packed_chunk_sizes
103 .iter()
104 .enumerate()
105 .take(bit_widths_array.len() - 1)
106 {
107 let start_elem = i * ELEMS_PER_CHUNK as usize;
108 let bit_width = bit_widths_array.value(i) as usize;
109 output.push(T::from_usize(bit_width).unwrap());
110 let output_len = output.len();
111 unsafe {
112 output.set_len(output_len + *packed_chunk_size);
113 BitPacking::unchecked_pack(
114 bit_width,
115 &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
116 &mut output[output_len..][..*packed_chunk_size],
117 );
118 }
119 chunks.push(MiniBlockChunk {
120 buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
121 log_num_values: LOG_ELEMS_PER_CHUNK,
122 });
123 }
124
125 let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
127 1024
128 } else {
129 data.num_values % ELEMS_PER_CHUNK
130 };
131 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
132 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
133 &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
134 );
135 let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
136 output.push(T::from_usize(bit_width).unwrap());
137 let output_len = output.len();
138 unsafe {
139 output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
140 BitPacking::unchecked_pack(
141 bit_width,
142 &last_chunk,
143 &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
144 );
145 }
146 chunks.push(MiniBlockChunk {
147 buffer_sizes: vec![
148 ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
149 as u16,
150 ],
151 log_num_values: 0,
152 });
153
154 MiniBlockCompressed {
155 data: vec![LanceBuffer::reinterpret_vec(output)],
156 chunks,
157 num_values: data.num_values,
158 }
159 }
160
161 fn chunk_data(&self, data: FixedWidthDataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
162 assert!(data.bits_per_value % 8 == 0);
163 assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
164 let bits_per_value = data.bits_per_value;
165 let compressed = match bits_per_value {
166 8 => Self::bitpack_chunked::<u8>(data),
167 16 => Self::bitpack_chunked::<u16>(data),
168 32 => Self::bitpack_chunked::<u32>(data),
169 64 => Self::bitpack_chunked::<u64>(data),
170 _ => unreachable!(),
171 };
172 (
173 compressed,
174 ProtobufUtils21::inline_bitpacking(
175 bits_per_value,
176 None,
178 ),
179 )
180 }
181
182 fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
183 data: LanceBuffer,
184 num_values: u64,
185 ) -> Result<DataBlock> {
186 assert!(data.len() >= 8);
187 assert!(num_values <= ELEMS_PER_CHUNK);
188
189 let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
191 let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
192
193 let chunk_in_u8: Vec<u8> = data.to_vec();
195 let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
196 let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
197 let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
198
199 assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
201
202 unsafe {
203 BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
204 }
205
206 decompressed.truncate(num_values as usize);
207 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
208 data: LanceBuffer::reinterpret_vec(decompressed),
209 bits_per_value: uncompressed_bit_width as u64,
210 num_values,
211 block_info: BlockInfo::new(),
212 }))
213 }
214}
215
216impl MiniBlockCompressor for InlineBitpacking {
217 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
218 match chunk {
219 DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
220 _ => Err(Error::InvalidInput {
221 source: format!(
222 "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
223 chunk.name()
224 )
225 .into(),
226 location: location!(),
227 }),
228 }
229 }
230}
231
232impl BlockCompressor for InlineBitpacking {
233 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
234 let fixed_width = data.as_fixed_width().unwrap();
235 let (chunked, _) = self.chunk_data(fixed_width);
236 Ok(chunked.data.into_iter().next().unwrap())
237 }
238}
239
240impl MiniBlockDecompressor for InlineBitpacking {
241 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
242 assert_eq!(data.len(), 1);
243 let data = data.into_iter().next().unwrap();
244 match self.uncompressed_bit_width {
245 8 => Self::unchunk::<u8>(data, num_values),
246 16 => Self::unchunk::<u16>(data, num_values),
247 32 => Self::unchunk::<u32>(data, num_values),
248 64 => Self::unchunk::<u64>(data, num_values),
249 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
250 }
251 }
252}
253
254impl BlockDecompressor for InlineBitpacking {
255 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
256 match self.uncompressed_bit_width {
257 8 => Self::unchunk::<u8>(data, num_values),
258 16 => Self::unchunk::<u16>(data, num_values),
259 32 => Self::unchunk::<u32>(data, num_values),
260 64 => Self::unchunk::<u64>(data, num_values),
261 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
262 }
263 }
264}
265
266fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
275 data: FixedWidthDataBlock,
276 bit_width: usize,
277) -> LanceBuffer {
278 let data_buffer = data.data.borrow_to_typed_slice::<T>();
279 let data_buffer = data_buffer.as_ref();
280
281 let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
282 let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
283 let words_per_chunk =
284 (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
285 #[allow(clippy::uninit_vec)]
286 let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
287 #[allow(clippy::uninit_vec)]
288 unsafe {
289 output.set_len(num_chunks * words_per_chunk);
290 }
291
292 let num_whole_chunks = if last_chunk_is_runt {
293 num_chunks - 1
294 } else {
295 num_chunks
296 };
297
298 for i in 0..num_whole_chunks {
300 let input_start = i * ELEMS_PER_CHUNK as usize;
301 let input_end = input_start + ELEMS_PER_CHUNK as usize;
302 let output_start = i * words_per_chunk;
303 let output_end = output_start + words_per_chunk;
304 unsafe {
305 BitPacking::unchecked_pack(
306 bit_width,
307 &data_buffer[input_start..input_end],
308 &mut output[output_start..output_end],
309 );
310 }
311 }
312
313 if !last_chunk_is_runt {
314 return LanceBuffer::reinterpret_vec(output);
315 }
316
317 let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
319 let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
320
321 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
322 last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
323 let output_start = num_whole_chunks * words_per_chunk;
324 unsafe {
325 BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
326 }
327
328 LanceBuffer::reinterpret_vec(output)
329}
330
331fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
333 data: FixedWidthDataBlock,
334 num_values: usize,
335 bits_per_value: usize,
336) -> FixedWidthDataBlock {
337 let words_per_chunk =
338 (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
339 let compressed_words = data.data.borrow_to_typed_slice::<T>();
340
341 let num_chunks = data.num_values as usize / words_per_chunk;
342 debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
343
344 #[allow(clippy::uninit_vec)]
346 let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
347 #[allow(clippy::uninit_vec)]
348 unsafe {
349 decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
350 }
351
352 for chunk_idx in 0..num_chunks {
353 let input_start = chunk_idx * words_per_chunk;
354 let input_end = input_start + words_per_chunk;
355 let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
356 let output_end = output_start + ELEMS_PER_CHUNK as usize;
357 unsafe {
358 BitPacking::unchecked_unpack(
359 bits_per_value,
360 &compressed_words[input_start..input_end],
361 &mut decompressed[output_start..output_end],
362 );
363 }
364 }
365
366 decompressed.truncate(num_values);
367
368 FixedWidthDataBlock {
369 data: LanceBuffer::reinterpret_vec(decompressed),
370 bits_per_value: data.bits_per_value,
371 num_values: num_values as u64,
372 block_info: BlockInfo::new(),
373 }
374}
375
376#[derive(Debug)]
402pub struct OutOfLineBitpacking {
403 compressed_bit_width: usize,
404}
405
406impl PerValueCompressor for OutOfLineBitpacking {
407 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
408 let fixed_width = data.as_fixed_width().unwrap();
409 let num_values = fixed_width.num_values;
410 let word_size = fixed_width.bits_per_value;
411 let compressed = match word_size {
412 8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
413 16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
414 32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
415 64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
416 _ => panic!("Bitpacking word size must be 8,16,32,64"),
417 };
418 let compressed = FixedWidthDataBlock {
419 data: compressed,
420 bits_per_value: self.compressed_bit_width as u64,
421 num_values,
422 block_info: BlockInfo::new(),
423 };
424 let encoding = ProtobufUtils21::out_of_line_bitpacking(
425 word_size,
426 ProtobufUtils21::flat(
429 self.compressed_bit_width as u64,
430 None,
432 ),
433 );
434 Ok((PerValueDataBlock::Fixed(compressed), encoding))
435 }
436}
437
438impl FixedPerValueDecompressor for OutOfLineBitpacking {
439 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
440 let unpacked = match data.bits_per_value {
441 8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
442 16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
443 32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
444 64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
445 _ => panic!("Bitpacking word size must be 8,16,32,64"),
446 };
447 Ok(DataBlock::FixedWidth(unpacked))
448 }
449
450 fn bits_per_value(&self) -> u64 {
451 self.compressed_bit_width as u64
452 }
453}
454
455#[cfg(test)]
456mod test {
457 use std::{collections::HashMap, sync::Arc};
458
459 use arrow_array::{Int64Array, Int8Array};
460
461 use arrow_schema::DataType;
462
463 use arrow_array::Array;
464
465 use crate::{
466 testing::{check_round_trip_encoding_of_data, TestCases},
467 version::LanceFileVersion,
468 };
469
470 #[test_log::test(tokio::test)]
471 async fn test_miniblock_bitpack() {
472 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
473
474 let arrays = vec![
475 Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
476 Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
477 Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
478 Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
479 Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
480 ];
481 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
482
483 for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
484 let int64_arrays = vec![
485 Int64Array::from(vec![3; 1024]),
486 Int64Array::from(vec![8; 1024]),
487 Int64Array::from(vec![16; 1024]),
488 Int64Array::from(vec![100; 1024]),
489 Int64Array::from(vec![512; 1024]),
490 Int64Array::from(vec![1000; 1024]),
491 Int64Array::from(vec![2000; 1024]),
492 Int64Array::from(vec![-1; 10]),
493 ];
494
495 let mut arrays = vec![];
496 for int64_array in int64_arrays {
497 arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
498 }
499
500 check_round_trip_encoding_of_data(arrays, &test_cases, HashMap::new()).await;
501 }
502 }
503
504 #[test_log::test(tokio::test)]
505 async fn test_bitpack_encoding_verification() {
506 use arrow_array::Int32Array;
507
508 let test_cases = TestCases::default()
510 .with_expected_encoding("inline_bitpacking")
511 .with_file_version(LanceFileVersion::V2_1);
512
513 let mut values = Vec::new();
516 for i in 0..2048 {
517 values.push(i % 16); }
519
520 let arrays = vec![Arc::new(Int32Array::from(values)) as Arc<dyn Array>];
521
522 let mut metadata = HashMap::new();
524 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
525
526 check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
527 }
528}