1use arrow::datatypes::UInt64Type;
19use arrow_array::{Array, PrimitiveArray};
20use arrow_buffer::ArrowNativeType;
21use byteorder::{ByteOrder, LittleEndian};
22use snafu::location;
23
24use lance_core::{Error, Result};
25
26use crate::buffer::LanceBuffer;
27use crate::compression::{
28 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
29};
30use crate::compression_algo::fastlanes::BitPacking;
31use crate::data::BlockInfo;
32use crate::data::{DataBlock, FixedWidthDataBlock};
33use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
34use crate::encodings::logical::primitive::miniblock::{
35 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
36};
37use crate::format::{pb, ProtobufUtils};
38use crate::statistics::{GetStat, Stat};
39use bytemuck::{cast_slice, AnyBitPattern};
40
41const LOG_ELEMS_PER_CHUNK: u8 = 10;
42const ELEMS_PER_CHUNK: u64 = 1 << LOG_ELEMS_PER_CHUNK;
43
44#[derive(Debug, Default)]
45pub struct InlineBitpacking {
46 uncompressed_bit_width: u64,
47}
48
49impl InlineBitpacking {
50 pub fn new(uncompressed_bit_width: u64) -> Self {
51 Self {
52 uncompressed_bit_width,
53 }
54 }
55
56 pub fn from_description(description: &pb::InlineBitpacking) -> Self {
57 Self {
58 uncompressed_bit_width: description.uncompressed_bits_per_value,
59 }
60 }
61
62 pub fn min_size_bytes(bit_width: u64) -> u64 {
63 (ELEMS_PER_CHUNK * bit_width).div_ceil(8)
64 }
65
66 fn bitpack_chunked<T: ArrowNativeType + BitPacking>(
72 mut data: FixedWidthDataBlock,
73 ) -> MiniBlockCompressed {
74 let data_buffer = data.data.borrow_to_typed_slice::<T>();
75 let data_buffer = data_buffer.as_ref();
76
77 let bit_widths = data.expect_stat(Stat::BitWidth);
78 let bit_widths_array = bit_widths
79 .as_any()
80 .downcast_ref::<PrimitiveArray<UInt64Type>>()
81 .unwrap();
82
83 let (packed_chunk_sizes, total_size) = bit_widths_array
84 .values()
85 .iter()
86 .map(|&bit_width| {
87 let chunk_size = ((1024 * bit_width) / data.bits_per_value) as usize;
88 (chunk_size, chunk_size + 1)
89 })
90 .fold(
91 (Vec::with_capacity(bit_widths_array.len()), 0),
92 |(mut sizes, total), (size, inc)| {
93 sizes.push(size);
94 (sizes, total + inc)
95 },
96 );
97
98 let mut output: Vec<T> = Vec::with_capacity(total_size);
99 let mut chunks = Vec::with_capacity(bit_widths_array.len());
100
101 for (i, packed_chunk_size) in packed_chunk_sizes
102 .iter()
103 .enumerate()
104 .take(bit_widths_array.len() - 1)
105 {
106 let start_elem = i * ELEMS_PER_CHUNK as usize;
107 let bit_width = bit_widths_array.value(i) as usize;
108 output.push(T::from_usize(bit_width).unwrap());
109 let output_len = output.len();
110 unsafe {
111 output.set_len(output_len + *packed_chunk_size);
112 BitPacking::unchecked_pack(
113 bit_width,
114 &data_buffer[start_elem..][..ELEMS_PER_CHUNK as usize],
115 &mut output[output_len..][..*packed_chunk_size],
116 );
117 }
118 chunks.push(MiniBlockChunk {
119 buffer_sizes: vec![((1 + *packed_chunk_size) * std::mem::size_of::<T>()) as u16],
120 log_num_values: LOG_ELEMS_PER_CHUNK,
121 });
122 }
123
124 let last_chunk_elem_num = if data.num_values % ELEMS_PER_CHUNK == 0 {
126 1024
127 } else {
128 data.num_values % ELEMS_PER_CHUNK
129 };
130 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
131 last_chunk[..last_chunk_elem_num as usize].clone_from_slice(
132 &data_buffer[data.num_values as usize - last_chunk_elem_num as usize..],
133 );
134 let bit_width = bit_widths_array.value(bit_widths_array.len() - 1) as usize;
135 output.push(T::from_usize(bit_width).unwrap());
136 let output_len = output.len();
137 unsafe {
138 output.set_len(output_len + packed_chunk_sizes[bit_widths_array.len() - 1]);
139 BitPacking::unchecked_pack(
140 bit_width,
141 &last_chunk,
142 &mut output[output_len..][..packed_chunk_sizes[bit_widths_array.len() - 1]],
143 );
144 }
145 chunks.push(MiniBlockChunk {
146 buffer_sizes: vec![
147 ((1 + packed_chunk_sizes[bit_widths_array.len() - 1]) * std::mem::size_of::<T>())
148 as u16,
149 ],
150 log_num_values: 0,
151 });
152
153 MiniBlockCompressed {
154 data: vec![LanceBuffer::reinterpret_vec(output)],
155 chunks,
156 num_values: data.num_values,
157 }
158 }
159
160 fn chunk_data(
161 &self,
162 data: FixedWidthDataBlock,
163 ) -> (MiniBlockCompressed, crate::format::pb::ArrayEncoding) {
164 assert!(data.bits_per_value % 8 == 0);
165 assert_eq!(data.bits_per_value, self.uncompressed_bit_width);
166 let bits_per_value = data.bits_per_value;
167 let compressed = match bits_per_value {
168 8 => Self::bitpack_chunked::<u8>(data),
169 16 => Self::bitpack_chunked::<u16>(data),
170 32 => Self::bitpack_chunked::<u32>(data),
171 64 => Self::bitpack_chunked::<u64>(data),
172 _ => unreachable!(),
173 };
174 (compressed, ProtobufUtils::inline_bitpacking(bits_per_value))
175 }
176
177 fn unchunk<T: ArrowNativeType + BitPacking + AnyBitPattern>(
178 data: LanceBuffer,
179 num_values: u64,
180 ) -> Result<DataBlock> {
181 assert!(data.len() >= 8);
182 assert!(num_values <= ELEMS_PER_CHUNK);
183
184 let uncompressed_bit_width = std::mem::size_of::<T>() * 8;
186 let mut decompressed = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
187
188 let chunk_in_u8: Vec<u8> = data.to_vec();
190 let bit_width_bytes = &chunk_in_u8[..std::mem::size_of::<T>()];
191 let bit_width_value = LittleEndian::read_uint(bit_width_bytes, std::mem::size_of::<T>());
192 let chunk = cast_slice(&chunk_in_u8[std::mem::size_of::<T>()..]);
193
194 assert!(std::mem::size_of_val(chunk) == (bit_width_value * ELEMS_PER_CHUNK) as usize / 8);
196
197 unsafe {
198 BitPacking::unchecked_unpack(bit_width_value as usize, chunk, &mut decompressed);
199 }
200
201 decompressed.truncate(num_values as usize);
202 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
203 data: LanceBuffer::reinterpret_vec(decompressed),
204 bits_per_value: uncompressed_bit_width as u64,
205 num_values,
206 block_info: BlockInfo::new(),
207 }))
208 }
209}
210
211impl MiniBlockCompressor for InlineBitpacking {
212 fn compress(
213 &self,
214 chunk: DataBlock,
215 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
216 match chunk {
217 DataBlock::FixedWidth(fixed_width) => Ok(self.chunk_data(fixed_width)),
218 _ => Err(Error::InvalidInput {
219 source: format!(
220 "Cannot compress a data block of type {} with BitpackMiniBlockEncoder",
221 chunk.name()
222 )
223 .into(),
224 location: location!(),
225 }),
226 }
227 }
228}
229
230impl BlockCompressor for InlineBitpacking {
231 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
232 let fixed_width = data.as_fixed_width().unwrap();
233 let (chunked, _) = self.chunk_data(fixed_width);
234 Ok(chunked.data.into_iter().next().unwrap())
235 }
236}
237
238impl MiniBlockDecompressor for InlineBitpacking {
239 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
240 assert_eq!(data.len(), 1);
241 let data = data.into_iter().next().unwrap();
242 match self.uncompressed_bit_width {
243 8 => Self::unchunk::<u8>(data, num_values),
244 16 => Self::unchunk::<u16>(data, num_values),
245 32 => Self::unchunk::<u32>(data, num_values),
246 64 => Self::unchunk::<u64>(data, num_values),
247 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
248 }
249 }
250}
251
252impl BlockDecompressor for InlineBitpacking {
253 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
254 match self.uncompressed_bit_width {
255 8 => Self::unchunk::<u8>(data, num_values),
256 16 => Self::unchunk::<u16>(data, num_values),
257 32 => Self::unchunk::<u32>(data, num_values),
258 64 => Self::unchunk::<u64>(data, num_values),
259 _ => unimplemented!("Bitpacking word size must be 8, 16, 32, or 64"),
260 }
261 }
262}
263
264fn bitpack_out_of_line<T: ArrowNativeType + BitPacking>(
273 mut data: FixedWidthDataBlock,
274 bit_width: usize,
275) -> LanceBuffer {
276 let data_buffer = data.data.borrow_to_typed_slice::<T>();
277 let data_buffer = data_buffer.as_ref();
278
279 let num_chunks = data_buffer.len().div_ceil(ELEMS_PER_CHUNK as usize);
280 let last_chunk_is_runt = data_buffer.len() % ELEMS_PER_CHUNK as usize != 0;
281 let words_per_chunk =
282 (ELEMS_PER_CHUNK as usize * bit_width).div_ceil(data.bits_per_value as usize);
283 #[allow(clippy::uninit_vec)]
284 let mut output: Vec<T> = Vec::with_capacity(num_chunks * words_per_chunk);
285 #[allow(clippy::uninit_vec)]
286 unsafe {
287 output.set_len(num_chunks * words_per_chunk);
288 }
289
290 let num_whole_chunks = if last_chunk_is_runt {
291 num_chunks - 1
292 } else {
293 num_chunks
294 };
295
296 for i in 0..num_whole_chunks {
298 let input_start = i * ELEMS_PER_CHUNK as usize;
299 let input_end = input_start + ELEMS_PER_CHUNK as usize;
300 let output_start = i * words_per_chunk;
301 let output_end = output_start + words_per_chunk;
302 unsafe {
303 BitPacking::unchecked_pack(
304 bit_width,
305 &data_buffer[input_start..input_end],
306 &mut output[output_start..output_end],
307 );
308 }
309 }
310
311 if !last_chunk_is_runt {
312 return LanceBuffer::reinterpret_vec(output);
313 }
314
315 let remaining_items = data_buffer.len() % ELEMS_PER_CHUNK as usize;
317 let last_chunk_start = num_whole_chunks * ELEMS_PER_CHUNK as usize;
318
319 let mut last_chunk: Vec<T> = vec![T::from_usize(0).unwrap(); ELEMS_PER_CHUNK as usize];
320 last_chunk[..remaining_items].clone_from_slice(&data_buffer[last_chunk_start..]);
321 let output_start = num_whole_chunks * words_per_chunk;
322 unsafe {
323 BitPacking::unchecked_pack(bit_width, &last_chunk, &mut output[output_start..]);
324 }
325
326 LanceBuffer::reinterpret_vec(output)
327}
328
329fn unpack_out_of_line<T: ArrowNativeType + BitPacking>(
331 mut data: FixedWidthDataBlock,
332 num_values: usize,
333 bits_per_value: usize,
334) -> FixedWidthDataBlock {
335 let words_per_chunk =
336 (ELEMS_PER_CHUNK as usize * bits_per_value).div_ceil(data.bits_per_value as usize);
337 let compressed_words = data.data.borrow_to_typed_slice::<T>();
338
339 let num_chunks = data.num_values as usize / words_per_chunk;
340 debug_assert_eq!(data.num_values as usize % words_per_chunk, 0);
341
342 #[allow(clippy::uninit_vec)]
344 let mut decompressed = Vec::with_capacity(num_chunks * ELEMS_PER_CHUNK as usize);
345 #[allow(clippy::uninit_vec)]
346 unsafe {
347 decompressed.set_len(num_chunks * ELEMS_PER_CHUNK as usize);
348 }
349
350 for chunk_idx in 0..num_chunks {
351 let input_start = chunk_idx * words_per_chunk;
352 let input_end = input_start + words_per_chunk;
353 let output_start = chunk_idx * ELEMS_PER_CHUNK as usize;
354 let output_end = output_start + ELEMS_PER_CHUNK as usize;
355 unsafe {
356 BitPacking::unchecked_unpack(
357 bits_per_value,
358 &compressed_words[input_start..input_end],
359 &mut decompressed[output_start..output_end],
360 );
361 }
362 }
363
364 decompressed.truncate(num_values);
365
366 FixedWidthDataBlock {
367 data: LanceBuffer::reinterpret_vec(decompressed),
368 bits_per_value: data.bits_per_value,
369 num_values: num_values as u64,
370 block_info: BlockInfo::new(),
371 }
372}
373
374#[derive(Debug)]
400pub struct OutOfLineBitpacking {
401 compressed_bit_width: usize,
402}
403
404impl PerValueCompressor for OutOfLineBitpacking {
405 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
406 let fixed_width = data.as_fixed_width().unwrap();
407 let num_values = fixed_width.num_values;
408 let word_size = fixed_width.bits_per_value;
409 let compressed = match word_size {
410 8 => bitpack_out_of_line::<u8>(fixed_width, self.compressed_bit_width),
411 16 => bitpack_out_of_line::<u16>(fixed_width, self.compressed_bit_width),
412 32 => bitpack_out_of_line::<u32>(fixed_width, self.compressed_bit_width),
413 64 => bitpack_out_of_line::<u64>(fixed_width, self.compressed_bit_width),
414 _ => panic!("Bitpacking word size must be 8,16,32,64"),
415 };
416 let compressed = FixedWidthDataBlock {
417 data: compressed,
418 bits_per_value: self.compressed_bit_width as u64,
419 num_values,
420 block_info: BlockInfo::new(),
421 };
422 let encoding =
423 ProtobufUtils::out_of_line_bitpacking(word_size, self.compressed_bit_width as u64);
424 Ok((PerValueDataBlock::Fixed(compressed), encoding))
425 }
426}
427
428impl FixedPerValueDecompressor for OutOfLineBitpacking {
429 fn decompress(&self, data: FixedWidthDataBlock, num_values: u64) -> Result<DataBlock> {
430 let unpacked = match data.bits_per_value {
431 8 => unpack_out_of_line::<u8>(data, num_values as usize, self.compressed_bit_width),
432 16 => unpack_out_of_line::<u16>(data, num_values as usize, self.compressed_bit_width),
433 32 => unpack_out_of_line::<u32>(data, num_values as usize, self.compressed_bit_width),
434 64 => unpack_out_of_line::<u64>(data, num_values as usize, self.compressed_bit_width),
435 _ => panic!("Bitpacking word size must be 8,16,32,64"),
436 };
437 Ok(DataBlock::FixedWidth(unpacked))
438 }
439
440 fn bits_per_value(&self) -> u64 {
441 self.compressed_bit_width as u64
442 }
443}
444
445#[cfg(test)]
446mod test {
447 use std::{collections::HashMap, sync::Arc};
448
449 use arrow_array::{Int64Array, Int8Array};
450
451 use arrow_schema::DataType;
452
453 use arrow_array::Array;
454
455 use crate::{
456 testing::{check_round_trip_encoding_of_data, TestCases},
457 version::LanceFileVersion,
458 };
459
460 #[test_log::test(tokio::test)]
461 async fn test_miniblock_bitpack() {
462 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
463
464 let mut metadata = HashMap::new();
465 metadata.insert(
466 "lance-encoding:compression".to_string(),
467 "bitpacking".to_string(),
468 );
469
470 let arrays = vec![
471 Arc::new(Int8Array::from(vec![100; 1024])) as Arc<dyn Array>,
472 Arc::new(Int8Array::from(vec![1; 1024])) as Arc<dyn Array>,
473 Arc::new(Int8Array::from(vec![16; 1024])) as Arc<dyn Array>,
474 Arc::new(Int8Array::from(vec![-1; 1024])) as Arc<dyn Array>,
475 Arc::new(Int8Array::from(vec![5; 1])) as Arc<dyn Array>,
476 ];
477 check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
478
479 for data_type in [DataType::Int16, DataType::Int32, DataType::Int64] {
480 let int64_arrays = vec![
481 Int64Array::from(vec![3; 1024]),
482 Int64Array::from(vec![8; 1024]),
483 Int64Array::from(vec![16; 1024]),
484 Int64Array::from(vec![100; 1024]),
485 Int64Array::from(vec![512; 1024]),
486 Int64Array::from(vec![1000; 1024]),
487 Int64Array::from(vec![2000; 1024]),
488 Int64Array::from(vec![-1; 10]),
489 ];
490
491 let mut arrays = vec![];
492 for int64_array in int64_arrays {
493 arrays.push(arrow_cast::cast(&int64_array, &data_type).unwrap());
494 }
495
496 let mut metadata = HashMap::new();
497 metadata.insert(
498 "lance-encoding:compression".to_string(),
499 "bitpacking".to_string(),
500 );
501 check_round_trip_encoding_of_data(arrays, &test_cases, metadata).await;
502 }
503 }
504}