1use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use snafu::location;
6
7use crate::buffer::LanceBuffer;
8use crate::compression::{
9 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
10};
11use crate::data::{
12 BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
13};
14use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
15use crate::encodings::logical::primitive::miniblock::{
16 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
17 MAX_MINIBLOCK_VALUES,
18};
19use crate::format::pb21::compressive_encoding::Compression;
20use crate::format::pb21::{self, CompressiveEncoding};
21use crate::format::ProtobufUtils21;
22
23use lance_core::{Error, Result};
24
25#[derive(Debug, Default)]
27pub struct ValueEncoder {}
28
29impl ValueEncoder {
30 fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
32 let mut size_bytes = 2 * bytes_per_word;
33 let (mut log_num_vals, mut num_vals) = match values_per_word {
34 1 => (1, 2),
35 8 => (3, 8),
36 _ => unreachable!(),
37 };
38
39 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
41
42 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
43 log_num_vals += 1;
44 size_bytes *= 2;
45 num_vals *= 2;
46 }
47
48 (log_num_vals, num_vals)
49 }
50
51 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
52 let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
57 (data.bits_per_value / 8, 1)
58 } else {
59 (data.bits_per_value, 8)
60 };
61
62 let (log_vals_per_chunk, vals_per_chunk) =
64 Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
65 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
66 debug_assert_eq!(vals_per_chunk % values_per_word, 0);
67 let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
68 let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
69 debug_assert!(bytes_per_chunk > 0);
70
71 let data_buffer = data.data;
72
73 let mut row_offset = 0;
74 let mut chunks = Vec::with_capacity(num_chunks);
75
76 let mut bytes_counter = 0;
77 loop {
78 if row_offset + vals_per_chunk <= data.num_values {
79 chunks.push(MiniBlockChunk {
81 log_num_values: log_vals_per_chunk as u8,
82 buffer_sizes: vec![bytes_per_chunk],
83 });
84 row_offset += vals_per_chunk;
85 bytes_counter += bytes_per_chunk as u64;
86 } else if row_offset < data.num_values {
87 let num_bytes = data_buffer.len() as u64 - bytes_counter;
89 let num_bytes = u16::try_from(num_bytes).unwrap();
90 chunks.push(MiniBlockChunk {
91 log_num_values: 0,
92 buffer_sizes: vec![num_bytes],
93 });
94 break;
95 } else {
96 break;
98 }
99 }
100
101 debug_assert_eq!(chunks.len(), num_chunks);
102
103 MiniBlockCompressed {
104 chunks,
105 data: vec![data_buffer],
106 num_values: data.num_values,
107 }
108 }
109}
110
111#[derive(Debug)]
112struct MiniblockFslLayer {
113 validity: Option<LanceBuffer>,
114 dimension: u64,
115}
116
117impl ValueEncoder {
134 fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> CompressiveEncoding {
135 let mut encoding = ProtobufUtils21::flat(bits_per_value, None);
136 for layer in layers.iter().rev() {
137 let has_validity = layer.validity.is_some();
138 let dimension = layer.dimension;
139 encoding = ProtobufUtils21::fsl(dimension, has_validity, encoding);
140 }
141 encoding
142 }
143
144 fn extract_fsl_chunk(
145 data: &FixedWidthDataBlock,
146 layers: &[MiniblockFslLayer],
147 row_offset: usize,
148 num_rows: usize,
149 validity_buffers: &mut [Vec<u8>],
150 ) -> Vec<u16> {
151 let mut row_offset = row_offset;
152 let mut num_values = num_rows;
153 let mut buffer_counter = 0;
154 let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
155 for layer in layers {
156 row_offset *= layer.dimension as usize;
157 num_values *= layer.dimension as usize;
158 if let Some(validity) = &layer.validity {
159 let validity_slice = validity
160 .clone()
161 .bit_slice_le_with_length(row_offset, num_values);
162 validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
163 buffer_sizes.push(validity_slice.len() as u16);
164 buffer_counter += 1;
165 }
166 }
167
168 let bits_in_chunk = data.bits_per_value * num_values as u64;
169 let bytes_in_chunk = bits_in_chunk.div_ceil(8);
170 let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
171 debug_assert!(bytes_in_chunk > 0);
172 buffer_sizes.push(bytes_in_chunk);
173
174 buffer_sizes
175 }
176
177 fn chunk_fsl(
178 data: FixedWidthDataBlock,
179 layers: Vec<MiniblockFslLayer>,
180 num_rows: u64,
181 ) -> (MiniBlockCompressed, CompressiveEncoding) {
182 let mut ceil_bytes_validity = 0;
184 let mut cum_dim = 1;
185 let mut num_validity_buffers = 0;
186 for layer in &layers {
187 cum_dim *= layer.dimension;
188 if layer.validity.is_some() {
189 ceil_bytes_validity += cum_dim.div_ceil(8);
190 num_validity_buffers += 1;
191 }
192 }
193 let cum_bits_per_value = data.bits_per_value * cum_dim;
195 let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
196 (cum_bits_per_value / 8, 1)
197 } else {
198 (cum_bits_per_value, 8)
199 };
200 let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
201 let (log_rows_per_chunk, rows_per_chunk) =
202 Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
203
204 let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
205
206 let mut chunks = Vec::with_capacity(num_chunks);
208 let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
209 cum_dim = 1;
210 for layer in &layers {
211 cum_dim *= layer.dimension;
212 if let Some(validity) = &layer.validity {
213 let layer_bytes_validity = cum_dim.div_ceil(8);
214 let validity_with_padding =
215 layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
216 debug_assert!(validity_with_padding >= validity.len());
217 validity_buffers.push(Vec::with_capacity(
218 layer_bytes_validity as usize * num_chunks,
219 ));
220 }
221 }
222
223 let mut row_offset = 0;
225 while row_offset + rows_per_chunk <= num_rows {
226 let buffer_sizes = Self::extract_fsl_chunk(
227 &data,
228 &layers,
229 row_offset as usize,
230 rows_per_chunk as usize,
231 &mut validity_buffers,
232 );
233 row_offset += rows_per_chunk;
234 chunks.push(MiniBlockChunk {
235 log_num_values: log_rows_per_chunk as u8,
236 buffer_sizes,
237 })
238 }
239 let rows_in_chunk = num_rows - row_offset;
240 if rows_in_chunk > 0 {
241 let buffer_sizes = Self::extract_fsl_chunk(
242 &data,
243 &layers,
244 row_offset as usize,
245 rows_in_chunk as usize,
246 &mut validity_buffers,
247 );
248 chunks.push(MiniBlockChunk {
249 log_num_values: 0,
250 buffer_sizes,
251 });
252 }
253
254 let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
255 let buffers = validity_buffers
257 .into_iter()
258 .map(LanceBuffer::from)
259 .chain(std::iter::once(data.data))
260 .collect::<Vec<_>>();
261
262 (
263 MiniBlockCompressed {
264 chunks,
265 data: buffers,
266 num_values: num_rows,
267 },
268 encoding,
269 )
270 }
271
272 fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
273 let num_rows = data.num_values();
274 let fsl = data.as_fixed_size_list().unwrap();
275 let mut layers = Vec::new();
276 let mut child = *fsl.child;
277 let mut cur_layer = MiniblockFslLayer {
278 validity: None,
279 dimension: fsl.dimension,
280 };
281 loop {
282 if let DataBlock::Nullable(nullable) = child {
283 cur_layer.validity = Some(nullable.nulls);
284 child = *nullable.data;
285 }
286 match child {
287 DataBlock::FixedSizeList(inner) => {
288 layers.push(cur_layer);
289 cur_layer = MiniblockFslLayer {
290 validity: None,
291 dimension: inner.dimension,
292 };
293 child = *inner.child;
294 }
295 DataBlock::FixedWidth(inner) => {
296 layers.push(cur_layer);
297 return Self::chunk_fsl(inner, layers, num_rows);
298 }
299 _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
300 }
301 }
302 }
303}
304
305struct PerValueFslValidityIter {
306 buffer: LanceBuffer,
307 bits_per_row: usize,
308 offset: usize,
309}
310
311impl ValueEncoder {
316 fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> CompressiveEncoding {
317 let mut inner = fsl.child.as_ref();
318 let mut has_validity = false;
319 inner = match inner {
320 DataBlock::Nullable(nullable) => {
321 has_validity = true;
322 nullable.data.as_ref()
323 }
324 DataBlock::AllNull(_) => {
325 return ProtobufUtils21::constant(None);
326 }
327 _ => inner,
328 };
329 let inner_encoding = match inner {
330 DataBlock::FixedWidth(fixed_width) => {
331 ProtobufUtils21::flat(fixed_width.bits_per_value, None)
332 }
333 DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
334 _ => unreachable!(
335 "Unexpected data block type in value encoder's fsl_to_encoding: {}",
336 inner.name()
337 ),
338 };
339 ProtobufUtils21::fsl(fsl.dimension, has_validity, inner_encoding)
340 }
341
342 fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
343 let encoding = Self::fsl_to_encoding(&fsl);
345 let num_values = fsl.num_values();
346 let mut child = *fsl.child;
347 let mut cum_dim = 1;
348 loop {
349 cum_dim *= fsl.dimension;
350 match child {
351 DataBlock::Nullable(nullable) => {
352 child = *nullable.data;
353 }
354 DataBlock::FixedSizeList(inner) => {
355 child = *inner.child;
356 }
357 DataBlock::FixedWidth(inner) => {
358 let data = FixedWidthDataBlock {
359 bits_per_value: inner.bits_per_value * cum_dim,
360 num_values,
361 data: inner.data,
362 block_info: BlockInfo::new(),
363 };
364 return (PerValueDataBlock::Fixed(data), encoding);
365 }
366 _ => unreachable!(
367 "Unexpected data block type in value encoder's simple_per_value_fsl"
368 ),
369 }
370 }
371 }
372
373 fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
374 let encoding = Self::fsl_to_encoding(&fsl);
376 let num_values = fsl.num_values();
377 let mut bytes_per_row = 0;
378 let mut cum_dim = 1;
379 let mut current = fsl;
380 let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
381 let data_bytes_per_row: usize;
382 let data_buffer: LanceBuffer;
383 loop {
384 cum_dim *= current.dimension;
385 let mut child = *current.child;
386 if let DataBlock::Nullable(nullable) = child {
387 bytes_per_row += cum_dim.div_ceil(8) as usize;
389 validity_iters.push(PerValueFslValidityIter {
390 buffer: nullable.nulls,
391 bits_per_row: cum_dim as usize,
392 offset: 0,
393 });
394 child = *nullable.data;
395 };
396 match child {
397 DataBlock::FixedSizeList(inner) => {
398 current = inner;
399 }
400 DataBlock::FixedWidth(fixed_width) => {
401 data_bytes_per_row =
402 (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
403 bytes_per_row += data_bytes_per_row;
404 data_buffer = fixed_width.data;
405 break;
406 }
407 DataBlock::AllNull(_) => {
408 data_bytes_per_row = 0;
409 data_buffer = LanceBuffer::empty();
410 break;
411 }
412 _ => unreachable!(
413 "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
414 child
415 ),
416 }
417 }
418
419 let bytes_needed = bytes_per_row * num_values as usize;
420 let mut zipped = Vec::with_capacity(bytes_needed);
421 let data_slice = &data_buffer;
422 for i in 0..num_values as usize {
424 for validity in validity_iters.iter_mut() {
425 let validity_slice = validity
426 .buffer
427 .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
428 zipped.extend_from_slice(&validity_slice);
429 validity.offset += validity.bits_per_row;
430 }
431 let start = i * data_bytes_per_row;
432 let end = start + data_bytes_per_row;
433 zipped.extend_from_slice(&data_slice[start..end]);
434 }
435
436 let zipped = LanceBuffer::from(zipped);
437 let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
438 bits_per_value: bytes_per_row as u64 * 8,
439 num_values,
440 data: zipped,
441 block_info: BlockInfo::new(),
442 });
443 (data, encoding)
444 }
445
446 fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
447 if !fsl.child.is_nullable() {
448 Self::simple_per_value_fsl(fsl)
449 } else {
450 Self::nullable_per_value_fsl(fsl)
451 }
452 }
453}
454
455impl BlockCompressor for ValueEncoder {
456 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
457 let data = match data {
458 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
459 _ => unimplemented!(
460 "Cannot compress block of type {} with ValueEncoder",
461 data.name()
462 ),
463 };
464 Ok(data)
465 }
466}
467
468impl MiniBlockCompressor for ValueEncoder {
469 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
470 match chunk {
471 DataBlock::FixedWidth(fixed_width) => {
472 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
473 Ok((Self::chunk_data(fixed_width), encoding))
474 }
475 DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
476 _ => Err(Error::InvalidInput {
477 source: format!(
478 "Cannot compress a data block of type {} with ValueEncoder",
479 chunk.name()
480 )
481 .into(),
482 location: location!(),
483 }),
484 }
485 }
486}
487
488#[derive(Debug)]
489struct ValueFslDesc {
490 dimension: u64,
491 has_validity: bool,
492}
493
494#[derive(Debug)]
497pub struct ValueDecompressor {
498 bits_per_item: u64,
500 bits_per_value: u64,
505 items_per_value: u64,
507 layers: Vec<ValueFslDesc>,
508}
509
510impl ValueDecompressor {
511 pub fn from_flat(description: &pb21::Flat) -> Self {
512 Self {
513 bits_per_item: description.bits_per_value,
514 bits_per_value: description.bits_per_value,
515 items_per_value: 1,
516 layers: Vec::default(),
517 }
518 }
519
520 pub fn from_fsl(mut description: &pb21::FixedSizeList) -> Self {
521 let mut layers = Vec::new();
522 let mut cum_dim = 1;
523 let mut bytes_per_value = 0;
524 loop {
525 layers.push(ValueFslDesc {
526 has_validity: description.has_validity,
527 dimension: description.items_per_value,
528 });
529 cum_dim *= description.items_per_value;
530 if description.has_validity {
531 bytes_per_value += cum_dim.div_ceil(8);
532 }
533 match description
534 .values
535 .as_ref()
536 .unwrap()
537 .compression
538 .as_ref()
539 .unwrap()
540 {
541 Compression::FixedSizeList(inner) => {
542 description = inner;
543 }
544 Compression::Flat(flat) => {
545 let mut bits_per_value = bytes_per_value * 8;
546 bits_per_value += flat.bits_per_value * cum_dim;
547 return Self {
548 bits_per_item: flat.bits_per_value,
549 bits_per_value,
550 items_per_value: cum_dim,
551 layers,
552 };
553 }
554 _ => unreachable!(),
555 }
556 }
557 }
558
559 fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
560 DataBlock::FixedWidth(FixedWidthDataBlock {
561 bits_per_value: self.bits_per_item,
562 num_values,
563 data,
564 block_info: BlockInfo::new(),
565 })
566 }
567}
568
569impl BlockDecompressor for ValueDecompressor {
570 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
571 let block = self.buffer_to_block(data, num_values);
572 assert_eq!(block.num_values(), num_values);
573 Ok(block)
574 }
575}
576
577impl MiniBlockDecompressor for ValueDecompressor {
578 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
579 let num_items = num_values * self.items_per_value;
580 let mut buffer_iter = data.into_iter().rev();
581
582 let data_buf = buffer_iter.next().unwrap();
584 let items = self.buffer_to_block(data_buf, num_items);
585 let mut lists = items;
586
587 for layer in self.layers.iter().rev() {
588 if layer.has_validity {
589 let validity_buf = buffer_iter.next().unwrap();
590 lists = DataBlock::Nullable(NullableDataBlock {
591 data: Box::new(lists),
592 nulls: validity_buf,
593 block_info: BlockInfo::default(),
594 });
595 }
596 lists = DataBlock::FixedSizeList(FixedSizeListBlock {
597 child: Box::new(lists),
598 dimension: layer.dimension,
599 })
600 }
601
602 assert_eq!(lists.num_values(), num_values);
603 Ok(lists)
604 }
605}
606
607struct FslDecompressorValidityBuilder {
608 buffer: BooleanBufferBuilder,
609 bits_per_row: usize,
610 bytes_per_row: usize,
611}
612
613impl ValueDecompressor {
615 fn has_validity(&self) -> bool {
616 self.layers.iter().any(|layer| layer.has_validity)
617 }
618
619 fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
621 let mut cum_dim = 1;
622 for layer in &self.layers {
623 cum_dim *= layer.dimension;
624 }
625 debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
626 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
627 bits_per_value: self.bits_per_item,
628 num_values: num_rows * cum_dim,
629 data: data.data,
630 block_info: BlockInfo::new(),
631 });
632 for layer in self.layers.iter().rev() {
633 block = DataBlock::FixedSizeList(FixedSizeListBlock {
634 child: Box::new(block),
635 dimension: layer.dimension,
636 });
637 }
638 debug_assert_eq!(num_rows, block.num_values());
639 block
640 }
641
642 fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
644 assert_eq!(self.bits_per_item % 8, 0);
646 let bytes_per_item = self.bits_per_item / 8;
647 let mut buffer_builders = Vec::with_capacity(self.layers.len());
648 let mut cum_dim = 1;
649 let mut total_size_bytes = 0;
650 for layer in &self.layers {
652 cum_dim *= layer.dimension as usize;
653 if layer.has_validity {
654 let validity_size_bits = cum_dim;
655 let validity_size_bytes = validity_size_bits.div_ceil(8);
656 total_size_bytes += num_rows * validity_size_bytes;
657 buffer_builders.push(FslDecompressorValidityBuilder {
658 buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
659 bits_per_row: cum_dim,
660 bytes_per_row: validity_size_bytes,
661 })
662 }
663 }
664 let num_items = num_rows * cum_dim;
665 let data_size = num_items * bytes_per_item as usize;
666 total_size_bytes += data_size;
667 let mut data_buffer = Vec::with_capacity(data_size);
668
669 assert_eq!(data.data.len(), total_size_bytes);
670
671 let bytes_per_value = bytes_per_item as usize;
672 let data_bytes_per_row = bytes_per_value * cum_dim;
673
674 let mut data_offset = 0;
676 while data_offset < total_size_bytes {
677 for builder in buffer_builders.iter_mut() {
678 let start = data_offset * 8;
679 let end = start + builder.bits_per_row;
680 builder.buffer.append_packed_range(start..end, &data.data);
681 data_offset += builder.bytes_per_row;
682 }
683 let end = data_offset + data_bytes_per_row;
684 data_buffer.extend_from_slice(&data.data[data_offset..end]);
685 data_offset += data_bytes_per_row;
686 }
687
688 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
690 bits_per_value: self.bits_per_item,
691 num_values: num_items as u64,
692 data: LanceBuffer::from(data_buffer),
693 block_info: BlockInfo::new(),
694 });
695
696 let mut validity_bufs = buffer_builders
697 .into_iter()
698 .rev()
699 .map(|mut b| LanceBuffer::from(b.buffer.finish().into_inner()));
700 for layer in self.layers.iter().rev() {
701 if layer.has_validity {
702 let nullable = NullableDataBlock {
703 data: Box::new(block),
704 nulls: validity_bufs.next().unwrap(),
705 block_info: BlockInfo::new(),
706 };
707 block = DataBlock::Nullable(nullable);
708 }
709 block = DataBlock::FixedSizeList(FixedSizeListBlock {
710 child: Box::new(block),
711 dimension: layer.dimension,
712 });
713 }
714
715 assert_eq!(num_rows, block.num_values() as usize);
716
717 block
718 }
719}
720
721impl FixedPerValueDecompressor for ValueDecompressor {
722 fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
723 if self.has_validity() {
724 Ok(self.unzip_decompress(data, num_rows as usize))
725 } else {
726 Ok(self.simple_decompress(data, num_rows))
727 }
728 }
729
730 fn bits_per_value(&self) -> u64 {
731 self.bits_per_value
732 }
733}
734
735impl PerValueCompressor for ValueEncoder {
736 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
737 let (data, encoding) = match data {
738 DataBlock::FixedWidth(fixed_width) => {
739 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
740 (PerValueDataBlock::Fixed(fixed_width), encoding)
741 }
742 DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
743 _ => unimplemented!(
744 "Cannot compress block of type {} with ValueEncoder",
745 data.name()
746 ),
747 };
748 Ok((data, encoding))
749 }
750}
751
752#[cfg(test)]
754pub(crate) mod tests {
755 use std::{
756 collections::HashMap,
757 sync::{Arc, LazyLock},
758 };
759
760 use arrow_array::{
761 make_array, new_null_array, types::UInt32Type, Array, ArrayRef, Decimal128Array,
762 FixedSizeListArray, Int32Array, ListArray, UInt8Array,
763 };
764 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
765 use arrow_schema::{DataType, Field, TimeUnit};
766 use lance_datagen::{array, gen_batch, ArrayGeneratorExt, Dimension, RowCount};
767
768 use crate::{
769 compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
770 data::DataBlock,
771 encodings::{
772 logical::primitive::{
773 fullzip::{PerValueCompressor, PerValueDataBlock},
774 miniblock::MiniBlockCompressor,
775 },
776 physical::value::ValueDecompressor,
777 },
778 format::pb21::compressive_encoding::Compression,
779 testing::{
780 check_basic_random, check_round_trip_encoding_generated,
781 check_round_trip_encoding_of_data, FnArrayGeneratorProvider, TestCases,
782 },
783 version::LanceFileVersion,
784 };
785
786 use super::ValueEncoder;
787
788 const PRIMITIVE_TYPES: &[DataType] = &[
789 DataType::Null,
790 DataType::FixedSizeBinary(2),
791 DataType::Date32,
792 DataType::Date64,
793 DataType::Int8,
794 DataType::Int16,
795 DataType::Int32,
796 DataType::Int64,
797 DataType::UInt8,
798 DataType::UInt16,
799 DataType::UInt32,
800 DataType::UInt64,
801 DataType::Float16,
802 DataType::Float32,
803 DataType::Float64,
804 DataType::Decimal128(10, 10),
805 DataType::Decimal256(10, 10),
806 DataType::Timestamp(TimeUnit::Nanosecond, None),
807 DataType::Time32(TimeUnit::Second),
808 DataType::Time64(TimeUnit::Nanosecond),
809 DataType::Duration(TimeUnit::Second),
810 ];
814
815 #[test_log::test(tokio::test)]
816 async fn test_simple_value() {
817 let items = Arc::new(Int32Array::from(vec![
818 Some(0),
819 None,
820 Some(2),
821 Some(3),
822 Some(4),
823 Some(5),
824 ]));
825
826 let test_cases = TestCases::default()
827 .with_range(0..3)
828 .with_range(0..2)
829 .with_range(1..3)
830 .with_indices(vec![0, 1, 2])
831 .with_indices(vec![1])
832 .with_indices(vec![2])
833 .with_min_file_version(LanceFileVersion::V2_1);
834
835 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
836 }
837
838 #[test_log::test(tokio::test)]
839 async fn test_simple_range() {
840 let items = Arc::new(Int32Array::from_iter((0..5000).map(|i| {
841 if i % 2 == 0 {
842 Some(i)
843 } else {
844 None
845 }
846 })));
847
848 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
849
850 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
851 }
852
853 #[test_log::test(tokio::test)]
854 async fn test_value_primitive() {
855 for data_type in PRIMITIVE_TYPES {
856 log::info!("Testing encoding for {:?}", data_type);
857 let field = Field::new("", data_type.clone(), false);
858 check_basic_random(field).await;
859 }
860 }
861
862 static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
863 vec![DataType::FixedSizeList(
864 Arc::new(Field::new("", DataType::Int32, false)),
865 128,
866 )]
867 });
868
869 #[test_log::test(tokio::test)]
870 async fn test_large_primitive() {
871 for data_type in LARGE_TYPES.iter() {
872 log::info!("Testing encoding for {:?}", data_type);
873 let field = Field::new("", data_type.clone(), false);
874 check_basic_random(field).await;
875 }
876 }
877
878 #[test_log::test(tokio::test)]
879 async fn test_decimal128_dictionary_encoding() {
880 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
881 let decimals: Vec<i32> = (0..100).collect();
882 let repeated_strings: Vec<_> = decimals
883 .iter()
884 .cycle()
885 .take(decimals.len() * 10000)
886 .map(|&v| Some(v as i128))
887 .collect();
888 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
889 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
890 }
891
892 #[test_log::test(tokio::test)]
893 async fn test_miniblock_stress() {
894 let data1 = (0..100)
898 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
899 .collect::<Vec<_>>();
900
901 let data2 = (0..100)
903 .map(|_| {
904 Arc::new(Int32Array::from_iter((0..100).map(|i| {
905 if i % 2 == 0 {
906 Some(i)
907 } else {
908 None
909 }
910 }))) as Arc<dyn Array>
911 })
912 .collect::<Vec<_>>();
913
914 let _data3 = (0..100)
917 .map(|chunk_idx| {
918 Arc::new(Int32Array::from_iter((0..100).map(|i| {
919 if chunk_idx < 50 {
920 None
921 } else {
922 Some(i)
923 }
924 }))) as Arc<dyn Array>
925 })
926 .collect::<Vec<_>>();
927
928 for data in [data1, data2 ] {
929 for batch_size in [10, 100, 1500, 15000] {
930 let test_cases = TestCases::default()
932 .with_page_sizes(vec![1000, 2000, 3000, 60000])
933 .with_batch_size(batch_size)
934 .with_min_file_version(LanceFileVersion::V2_1);
935
936 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
937 }
938 }
939 }
940
941 fn create_simple_fsl() -> FixedSizeListArray {
942 let items = Arc::new(Int32Array::from(vec![
944 Some(0),
945 Some(1),
946 Some(2),
947 Some(3),
948 None,
949 None,
950 None,
951 None,
952 Some(8),
953 Some(9),
954 None,
955 Some(11),
956 ]));
957 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
958 let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
959 let inner_list = Arc::new(FixedSizeListArray::new(
960 items_field.clone(),
961 2,
962 items,
963 Some(NullBuffer::new(inner_list_nulls)),
964 ));
965 let inner_list_field = Arc::new(Field::new(
966 "item",
967 DataType::FixedSizeList(items_field, 2),
968 true,
969 ));
970 FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
971 }
972
973 #[test]
974 fn test_fsl_value_compression_miniblock() {
975 let sample_list = create_simple_fsl();
976
977 let starting_data = DataBlock::from_array(sample_list.clone());
978
979 let encoder = ValueEncoder::default();
980 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
981
982 assert_eq!(data.num_values, 3);
983 assert_eq!(data.data.len(), 3);
984 assert_eq!(data.chunks.len(), 1);
985 assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
986 assert_eq!(data.chunks[0].log_num_values, 0);
987
988 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
989 panic!()
990 };
991
992 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
993
994 let decompressed =
995 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
996
997 let decompressed = make_array(
998 decompressed
999 .into_arrow(sample_list.data_type().clone(), true)
1000 .unwrap(),
1001 );
1002
1003 assert_eq!(decompressed.as_ref(), &sample_list);
1004 }
1005
1006 #[test]
1007 fn test_fsl_value_compression_per_value() {
1008 let sample_list = create_simple_fsl();
1009
1010 let starting_data = DataBlock::from_array(sample_list.clone());
1011
1012 let encoder = ValueEncoder::default();
1013 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1014
1015 let PerValueDataBlock::Fixed(data) = data else {
1016 panic!()
1017 };
1018
1019 assert_eq!(data.bits_per_value, 144);
1020 assert_eq!(data.num_values, 3);
1021 assert_eq!(data.data.len(), 18 * 3);
1022
1023 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1024 panic!()
1025 };
1026
1027 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1028
1029 let num_values = data.num_values;
1030 let decompressed =
1031 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1032
1033 let decompressed = make_array(
1034 decompressed
1035 .into_arrow(sample_list.data_type().clone(), true)
1036 .unwrap(),
1037 );
1038
1039 assert_eq!(decompressed.as_ref(), &sample_list);
1040 }
1041
1042 #[test_log::test(tokio::test)]
1043 async fn test_fsl_all_null() {
1044 let items = new_null_array(&DataType::Int32, 12);
1045 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1046 let list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1047 let list_array =
1048 FixedSizeListArray::new(items_field, 2, items, Some(NullBuffer::new(list_nulls)));
1049
1050 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
1051
1052 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1053 .await;
1054 }
1055
1056 #[test_log::test(tokio::test)]
1057 async fn regress_list_fsl() {
1058 let offsets = ScalarBuffer::<i32>::from(vec![0, 393, 755, 1156, 1536]);
1061 let data = UInt8Array::from(vec![0; 1536 * 16]);
1062 let fsl_field = Arc::new(Field::new("item", DataType::UInt8, true));
1063 let fsl = FixedSizeListArray::new(fsl_field, 16, Arc::new(data), None);
1064 let list_field = Arc::new(Field::new("item", fsl.data_type().clone(), false));
1065 let list_arr = ListArray::new(list_field, OffsetBuffer::new(offsets), Arc::new(fsl), None);
1066
1067 let test_cases = TestCases::default()
1068 .with_min_file_version(LanceFileVersion::V2_1)
1069 .with_batch_size(1);
1070
1071 check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1072 .await;
1073 }
1074
1075 fn create_random_fsl() -> Arc<dyn Array> {
1076 let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1078 let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1079 let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1080 let list_three = array::cycle_vec(list_two, Dimension::from(2));
1081
1082 let batch = gen_batch()
1084 .anon_col(list_three)
1085 .into_batch_rows(RowCount::from(8 * 1024))
1086 .unwrap();
1087 batch.column(0).clone()
1088 }
1089
1090 #[test]
1091 fn fsl_value_miniblock_stress() {
1092 let sample_array = create_random_fsl();
1093
1094 let starting_data = DataBlock::from_arrays(
1095 std::slice::from_ref(&sample_array),
1096 sample_array.len() as u64,
1097 );
1098
1099 let encoder = ValueEncoder::default();
1100 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1101
1102 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1103 panic!()
1104 };
1105
1106 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1107
1108 let decompressed =
1109 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1110
1111 let decompressed = make_array(
1112 decompressed
1113 .into_arrow(sample_array.data_type().clone(), true)
1114 .unwrap(),
1115 );
1116
1117 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1118 }
1119
1120 #[test]
1121 fn fsl_value_per_value_stress() {
1122 let sample_array = create_random_fsl();
1123
1124 let starting_data = DataBlock::from_arrays(
1125 std::slice::from_ref(&sample_array),
1126 sample_array.len() as u64,
1127 );
1128
1129 let encoder = ValueEncoder::default();
1130 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1131
1132 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1133 panic!()
1134 };
1135
1136 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1137
1138 let PerValueDataBlock::Fixed(data) = data else {
1139 panic!()
1140 };
1141
1142 let num_values = data.num_values;
1143 let decompressed =
1144 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1145
1146 let decompressed = make_array(
1147 decompressed
1148 .into_arrow(sample_array.data_type().clone(), true)
1149 .unwrap(),
1150 );
1151
1152 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1153 }
1154
1155 #[test_log::test(tokio::test)]
1156 async fn test_fsl_nullable_items() {
1157 let datagen = Box::new(FnArrayGeneratorProvider::new(move || {
1158 lance_datagen::array::rand_vec_nullable::<UInt32Type>(Dimension::from(128), 0.5)
1159 }));
1160
1161 let field = Field::new(
1162 "",
1163 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt32, true)), 128),
1164 false,
1165 );
1166 check_round_trip_encoding_generated(field, datagen, TestCases::default()).await;
1167 }
1168
1169 #[test_log::test(tokio::test)]
1170 async fn test_value_encoding_verification() {
1171 use std::collections::HashMap;
1172
1173 let test_cases = TestCases::default()
1174 .with_expected_encoding("flat")
1175 .with_min_file_version(LanceFileVersion::V2_1);
1176
1177 let mut metadata_explicit = HashMap::new();
1181 metadata_explicit.insert("lance-encoding:compression".to_string(), "none".to_string());
1182 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1183
1184 let arr_explicit =
1185 Arc::new(Int32Array::from((0..1000).collect::<Vec<i32>>())) as Arc<dyn Array>;
1186 check_round_trip_encoding_of_data(vec![arr_explicit], &test_cases, metadata_explicit).await;
1187
1188 let mut metadata = HashMap::new();
1192 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1193
1194 let arr_fallback = Arc::new(Int32Array::from(
1195 (0..100).map(|i| i * 73 + 19).collect::<Vec<i32>>(),
1196 )) as Arc<dyn Array>;
1197 check_round_trip_encoding_of_data(vec![arr_fallback], &test_cases, metadata).await;
1198 }
1199
1200 #[test_log::test(tokio::test)]
1201 async fn test_mixed_page_validity() {
1202 let no_nulls = Arc::new(Int32Array::from_iter_values([1, 2]));
1203 let has_nulls = Arc::new(Int32Array::from_iter([Some(3), None, Some(5)]));
1204
1205 let test_cases = TestCases::default().with_page_sizes(vec![1]);
1206 check_round_trip_encoding_of_data(vec![no_nulls, has_nulls], &test_cases, HashMap::new())
1207 .await;
1208 }
1209}