1use arrow_buffer::{BooleanBufferBuilder, bit_util};
5
6use crate::buffer::LanceBuffer;
7use crate::compression::{
8 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
9};
10use crate::data::{
11 BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
12};
13use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
14use crate::encodings::logical::primitive::miniblock::{
15 MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed,
16 MiniBlockCompressor,
17};
18use crate::format::ProtobufUtils21;
19use crate::format::pb21::compressive_encoding::Compression;
20use crate::format::pb21::{self, CompressiveEncoding};
21
22use lance_core::{Error, Result};
23
24#[derive(Debug, Default)]
26pub struct ValueEncoder {}
27
28impl ValueEncoder {
29 fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
31 let mut size_bytes = 2 * bytes_per_word;
32 let (mut log_num_vals, mut num_vals) = match values_per_word {
33 1 => (1, 2),
34 8 => (3, 8),
35 _ => unreachable!(),
36 };
37
38 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
40
41 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
42 log_num_vals += 1;
43 size_bytes *= 2;
44 num_vals *= 2;
45 }
46
47 (log_num_vals, num_vals)
48 }
49
50 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
51 let (bytes_per_word, values_per_word) = if data.bits_per_value.is_multiple_of(8) {
56 (data.bits_per_value / 8, 1)
57 } else {
58 (data.bits_per_value, 8)
59 };
60
61 let (log_vals_per_chunk, vals_per_chunk) =
63 Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
64 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
65 debug_assert_eq!(vals_per_chunk % values_per_word, 0);
66 let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
67 let bytes_per_chunk = u32::try_from(bytes_per_chunk).unwrap();
68 debug_assert!(bytes_per_chunk > 0);
69
70 let data_buffer = data.data;
71
72 let mut row_offset = 0;
73 let mut chunks = Vec::with_capacity(num_chunks);
74
75 let mut bytes_counter = 0;
76 loop {
77 if row_offset + vals_per_chunk <= data.num_values {
78 chunks.push(MiniBlockChunk {
80 log_num_values: log_vals_per_chunk as u8,
81 buffer_sizes: vec![bytes_per_chunk],
82 });
83 row_offset += vals_per_chunk;
84 bytes_counter += bytes_per_chunk as u64;
85 } else if row_offset < data.num_values {
86 let num_bytes = data_buffer.len() as u64 - bytes_counter;
88 let num_bytes = u32::try_from(num_bytes).unwrap();
89 chunks.push(MiniBlockChunk {
90 log_num_values: 0,
91 buffer_sizes: vec![num_bytes],
92 });
93 break;
94 } else {
95 break;
97 }
98 }
99
100 debug_assert_eq!(chunks.len(), num_chunks);
101
102 MiniBlockCompressed {
103 chunks,
104 data: vec![data_buffer],
105 num_values: data.num_values,
106 }
107 }
108}
109
110#[derive(Debug)]
111struct MiniblockFslLayer {
112 validity: Option<LanceBuffer>,
113 dimension: u64,
114}
115
116impl ValueEncoder {
133 fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> CompressiveEncoding {
134 let mut encoding = ProtobufUtils21::flat(bits_per_value, None);
135 for layer in layers.iter().rev() {
136 let has_validity = layer.validity.is_some();
137 let dimension = layer.dimension;
138 encoding = ProtobufUtils21::fsl(dimension, has_validity, encoding);
139 }
140 encoding
141 }
142
143 fn extract_fsl_chunk(
144 data: &FixedWidthDataBlock,
145 layers: &[MiniblockFslLayer],
146 row_offset: usize,
147 num_rows: usize,
148 validity_buffers: &mut [Vec<u8>],
149 ) -> Vec<u32> {
150 let mut row_offset = row_offset;
151 let mut num_values = num_rows;
152 let mut buffer_counter = 0;
153 let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
154 for layer in layers {
155 row_offset *= layer.dimension as usize;
156 num_values *= layer.dimension as usize;
157 if let Some(validity) = &layer.validity {
158 let validity_slice = validity
159 .clone()
160 .bit_slice_le_with_length(row_offset, num_values);
161 validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
162 buffer_sizes.push(validity_slice.len() as u32);
163 buffer_counter += 1;
164 }
165 }
166
167 let bits_in_chunk = data.bits_per_value * num_values as u64;
168 let bytes_in_chunk = bits_in_chunk.div_ceil(8);
169 let bytes_in_chunk = u32::try_from(bytes_in_chunk).unwrap();
170 debug_assert!(bytes_in_chunk > 0);
171 buffer_sizes.push(bytes_in_chunk);
172
173 buffer_sizes
174 }
175
176 fn chunk_fsl(
177 data: FixedWidthDataBlock,
178 layers: Vec<MiniblockFslLayer>,
179 num_rows: u64,
180 ) -> (MiniBlockCompressed, CompressiveEncoding) {
181 let mut ceil_bytes_validity = 0;
183 let mut cum_dim = 1;
184 let mut num_validity_buffers = 0;
185 for layer in &layers {
186 cum_dim *= layer.dimension;
187 if layer.validity.is_some() {
188 ceil_bytes_validity += cum_dim.div_ceil(8);
189 num_validity_buffers += 1;
190 }
191 }
192 let cum_bits_per_value = data.bits_per_value * cum_dim;
194 let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value.is_multiple_of(8) {
195 (cum_bits_per_value / 8, 1)
196 } else {
197 (cum_bits_per_value, 8)
198 };
199 let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
200 let (log_rows_per_chunk, rows_per_chunk) =
201 Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
202
203 let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
204
205 let mut chunks = Vec::with_capacity(num_chunks);
207 let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
208 cum_dim = 1;
209 for layer in &layers {
210 cum_dim *= layer.dimension;
211 if let Some(validity) = &layer.validity {
212 let layer_bytes_validity = cum_dim.div_ceil(8);
213 let validity_with_padding =
214 layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
215 debug_assert!(validity_with_padding >= validity.len());
216 validity_buffers.push(Vec::with_capacity(
217 layer_bytes_validity as usize * num_chunks,
218 ));
219 }
220 }
221
222 let mut row_offset = 0;
224 while row_offset + rows_per_chunk <= num_rows {
225 let buffer_sizes = Self::extract_fsl_chunk(
226 &data,
227 &layers,
228 row_offset as usize,
229 rows_per_chunk as usize,
230 &mut validity_buffers,
231 );
232 row_offset += rows_per_chunk;
233 chunks.push(MiniBlockChunk {
234 log_num_values: log_rows_per_chunk as u8,
235 buffer_sizes,
236 })
237 }
238 let rows_in_chunk = num_rows - row_offset;
239 if rows_in_chunk > 0 {
240 let buffer_sizes = Self::extract_fsl_chunk(
241 &data,
242 &layers,
243 row_offset as usize,
244 rows_in_chunk as usize,
245 &mut validity_buffers,
246 );
247 chunks.push(MiniBlockChunk {
248 log_num_values: 0,
249 buffer_sizes,
250 });
251 }
252
253 let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
254 let buffers = validity_buffers
256 .into_iter()
257 .map(LanceBuffer::from)
258 .chain(std::iter::once(data.data))
259 .collect::<Vec<_>>();
260
261 (
262 MiniBlockCompressed {
263 chunks,
264 data: buffers,
265 num_values: num_rows,
266 },
267 encoding,
268 )
269 }
270
271 fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
272 let num_rows = data.num_values();
273 let fsl = data.as_fixed_size_list().unwrap();
274 let mut layers = Vec::new();
275 let mut child = *fsl.child;
276 let mut cur_layer = MiniblockFslLayer {
277 validity: None,
278 dimension: fsl.dimension,
279 };
280 loop {
281 if let DataBlock::Nullable(nullable) = child {
282 cur_layer.validity = Some(nullable.nulls);
283 child = *nullable.data;
284 }
285 match child {
286 DataBlock::FixedSizeList(inner) => {
287 layers.push(cur_layer);
288 cur_layer = MiniblockFslLayer {
289 validity: None,
290 dimension: inner.dimension,
291 };
292 child = *inner.child;
293 }
294 DataBlock::FixedWidth(inner) => {
295 layers.push(cur_layer);
296 return Self::chunk_fsl(inner, layers, num_rows);
297 }
298 _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
299 }
300 }
301 }
302}
303
304struct PerValueFslValidityIter {
305 buffer: LanceBuffer,
306 bits_per_row: usize,
307 offset: usize,
308}
309
310impl ValueEncoder {
315 fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> CompressiveEncoding {
316 let mut inner = fsl.child.as_ref();
317 let mut has_validity = false;
318 inner = match inner {
319 DataBlock::Nullable(nullable) => {
320 has_validity = true;
321 nullable.data.as_ref()
322 }
323 DataBlock::AllNull(_) => {
324 return ProtobufUtils21::constant(None);
325 }
326 _ => inner,
327 };
328 let inner_encoding = match inner {
329 DataBlock::FixedWidth(fixed_width) => {
330 ProtobufUtils21::flat(fixed_width.bits_per_value, None)
331 }
332 DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
333 _ => unreachable!(
334 "Unexpected data block type in value encoder's fsl_to_encoding: {}",
335 inner.name()
336 ),
337 };
338 ProtobufUtils21::fsl(fsl.dimension, has_validity, inner_encoding)
339 }
340
341 fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
342 let encoding = Self::fsl_to_encoding(&fsl);
344 let num_values = fsl.num_values();
345 let mut child = *fsl.child;
346 let mut cum_dim = 1;
347 loop {
348 cum_dim *= fsl.dimension;
349 match child {
350 DataBlock::Nullable(nullable) => {
351 child = *nullable.data;
352 }
353 DataBlock::FixedSizeList(inner) => {
354 child = *inner.child;
355 }
356 DataBlock::FixedWidth(inner) => {
357 let data = FixedWidthDataBlock {
358 bits_per_value: inner.bits_per_value * cum_dim,
359 num_values,
360 data: inner.data,
361 block_info: BlockInfo::new(),
362 };
363 return (PerValueDataBlock::Fixed(data), encoding);
364 }
365 _ => unreachable!(
366 "Unexpected data block type in value encoder's simple_per_value_fsl"
367 ),
368 }
369 }
370 }
371
372 fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
373 let encoding = Self::fsl_to_encoding(&fsl);
375 let num_values = fsl.num_values();
376 let mut bytes_per_row = 0;
377 let mut cum_dim = 1;
378 let mut current = fsl;
379 let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
380 let data_bytes_per_row: usize;
381 let data_buffer: LanceBuffer;
382 loop {
383 cum_dim *= current.dimension;
384 let mut child = *current.child;
385 if let DataBlock::Nullable(nullable) = child {
386 bytes_per_row += cum_dim.div_ceil(8) as usize;
388 validity_iters.push(PerValueFslValidityIter {
389 buffer: nullable.nulls,
390 bits_per_row: cum_dim as usize,
391 offset: 0,
392 });
393 child = *nullable.data;
394 };
395 match child {
396 DataBlock::FixedSizeList(inner) => {
397 current = inner;
398 }
399 DataBlock::FixedWidth(fixed_width) => {
400 data_bytes_per_row =
401 (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
402 bytes_per_row += data_bytes_per_row;
403 data_buffer = fixed_width.data;
404 break;
405 }
406 DataBlock::AllNull(_) => {
407 data_bytes_per_row = 0;
408 data_buffer = LanceBuffer::empty();
409 break;
410 }
411 _ => unreachable!(
412 "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
413 child
414 ),
415 }
416 }
417
418 let bytes_needed = bytes_per_row * num_values as usize;
419 let mut zipped = Vec::with_capacity(bytes_needed);
420 let data_slice = &data_buffer;
421 for i in 0..num_values as usize {
423 for validity in validity_iters.iter_mut() {
424 let validity_slice = validity
425 .buffer
426 .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
427 zipped.extend_from_slice(&validity_slice);
428 validity.offset += validity.bits_per_row;
429 }
430 let start = i * data_bytes_per_row;
431 let end = start + data_bytes_per_row;
432 zipped.extend_from_slice(&data_slice[start..end]);
433 }
434
435 let zipped = LanceBuffer::from(zipped);
436 let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
437 bits_per_value: bytes_per_row as u64 * 8,
438 num_values,
439 data: zipped,
440 block_info: BlockInfo::new(),
441 });
442 (data, encoding)
443 }
444
445 fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
446 if !fsl.child.is_nullable() {
447 Self::simple_per_value_fsl(fsl)
448 } else {
449 Self::nullable_per_value_fsl(fsl)
450 }
451 }
452}
453
454impl BlockCompressor for ValueEncoder {
455 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
456 let data = match data {
457 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
458 _ => unimplemented!(
459 "Cannot compress block of type {} with ValueEncoder",
460 data.name()
461 ),
462 };
463 Ok(data)
464 }
465}
466
467impl MiniBlockCompressor for ValueEncoder {
468 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
469 match chunk {
470 DataBlock::FixedWidth(fixed_width) => {
471 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
472 Ok((Self::chunk_data(fixed_width), encoding))
473 }
474 DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
475 _ => Err(Error::invalid_input_source(
476 format!(
477 "Cannot compress a data block of type {} with ValueEncoder",
478 chunk.name()
479 )
480 .into(),
481 )),
482 }
483 }
484}
485
486#[derive(Debug)]
487struct ValueFslDesc {
488 dimension: u64,
489 has_validity: bool,
490}
491
492#[derive(Debug)]
495pub struct ValueDecompressor {
496 bits_per_item: u64,
498 bits_per_value: u64,
503 items_per_value: u64,
505 layers: Vec<ValueFslDesc>,
506}
507
508impl ValueDecompressor {
509 pub fn from_flat(description: &pb21::Flat) -> Self {
510 Self {
511 bits_per_item: description.bits_per_value,
512 bits_per_value: description.bits_per_value,
513 items_per_value: 1,
514 layers: Vec::default(),
515 }
516 }
517
518 pub fn from_fsl(mut description: &pb21::FixedSizeList) -> Self {
519 let mut layers = Vec::new();
520 let mut cum_dim = 1;
521 let mut bytes_per_value = 0;
522 loop {
523 layers.push(ValueFslDesc {
524 has_validity: description.has_validity,
525 dimension: description.items_per_value,
526 });
527 cum_dim *= description.items_per_value;
528 if description.has_validity {
529 bytes_per_value += cum_dim.div_ceil(8);
530 }
531 match description
532 .values
533 .as_ref()
534 .unwrap()
535 .compression
536 .as_ref()
537 .unwrap()
538 {
539 Compression::FixedSizeList(inner) => {
540 description = inner;
541 }
542 Compression::Flat(flat) => {
543 let mut bits_per_value = bytes_per_value * 8;
544 bits_per_value += flat.bits_per_value * cum_dim;
545 return Self {
546 bits_per_item: flat.bits_per_value,
547 bits_per_value,
548 items_per_value: cum_dim,
549 layers,
550 };
551 }
552 _ => unreachable!(),
553 }
554 }
555 }
556
557 fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
558 DataBlock::FixedWidth(FixedWidthDataBlock {
559 bits_per_value: self.bits_per_item,
560 num_values,
561 data,
562 block_info: BlockInfo::new(),
563 })
564 }
565}
566
567impl BlockDecompressor for ValueDecompressor {
568 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
569 let block = self.buffer_to_block(data, num_values);
570 assert_eq!(block.num_values(), num_values);
571 Ok(block)
572 }
573}
574
575impl MiniBlockDecompressor for ValueDecompressor {
576 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
577 let num_items = num_values * self.items_per_value;
578 let mut buffer_iter = data.into_iter().rev();
579
580 let data_buf = buffer_iter.next().unwrap();
582 let items = self.buffer_to_block(data_buf, num_items);
583 let mut lists = items;
584
585 for layer in self.layers.iter().rev() {
586 if layer.has_validity {
587 let validity_buf = buffer_iter.next().unwrap();
588 lists = DataBlock::Nullable(NullableDataBlock {
589 data: Box::new(lists),
590 nulls: validity_buf,
591 block_info: BlockInfo::default(),
592 });
593 }
594 lists = DataBlock::FixedSizeList(FixedSizeListBlock {
595 child: Box::new(lists),
596 dimension: layer.dimension,
597 })
598 }
599
600 assert_eq!(lists.num_values(), num_values);
601 Ok(lists)
602 }
603}
604
605struct FslDecompressorValidityBuilder {
606 buffer: BooleanBufferBuilder,
607 bits_per_row: usize,
608 bytes_per_row: usize,
609}
610
611impl ValueDecompressor {
613 fn has_validity(&self) -> bool {
614 self.layers.iter().any(|layer| layer.has_validity)
615 }
616
617 fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
619 let mut cum_dim = 1;
620 for layer in &self.layers {
621 cum_dim *= layer.dimension;
622 }
623 debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
624 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
625 bits_per_value: self.bits_per_item,
626 num_values: num_rows * cum_dim,
627 data: data.data,
628 block_info: BlockInfo::new(),
629 });
630 for layer in self.layers.iter().rev() {
631 block = DataBlock::FixedSizeList(FixedSizeListBlock {
632 child: Box::new(block),
633 dimension: layer.dimension,
634 });
635 }
636 debug_assert_eq!(num_rows, block.num_values());
637 block
638 }
639
640 fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
642 assert_eq!(self.bits_per_item % 8, 0);
644 let bytes_per_item = self.bits_per_item / 8;
645 let mut buffer_builders = Vec::with_capacity(self.layers.len());
646 let mut cum_dim = 1;
647 let mut total_size_bytes = 0;
648 for layer in &self.layers {
650 cum_dim *= layer.dimension as usize;
651 if layer.has_validity {
652 let validity_size_bits = cum_dim;
653 let validity_size_bytes = validity_size_bits.div_ceil(8);
654 total_size_bytes += num_rows * validity_size_bytes;
655 buffer_builders.push(FslDecompressorValidityBuilder {
656 buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
657 bits_per_row: cum_dim,
658 bytes_per_row: validity_size_bytes,
659 })
660 }
661 }
662 let num_items = num_rows * cum_dim;
663 let data_size = num_items * bytes_per_item as usize;
664 total_size_bytes += data_size;
665 let mut data_buffer = Vec::with_capacity(data_size);
666
667 assert_eq!(data.data.len(), total_size_bytes);
668
669 let bytes_per_value = bytes_per_item as usize;
670 let data_bytes_per_row = bytes_per_value * cum_dim;
671
672 let mut data_offset = 0;
674 while data_offset < total_size_bytes {
675 for builder in buffer_builders.iter_mut() {
676 let start = data_offset * 8;
677 let end = start + builder.bits_per_row;
678 builder.buffer.append_packed_range(start..end, &data.data);
679 data_offset += builder.bytes_per_row;
680 }
681 let end = data_offset + data_bytes_per_row;
682 data_buffer.extend_from_slice(&data.data[data_offset..end]);
683 data_offset += data_bytes_per_row;
684 }
685
686 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
688 bits_per_value: self.bits_per_item,
689 num_values: num_items as u64,
690 data: LanceBuffer::from(data_buffer),
691 block_info: BlockInfo::new(),
692 });
693
694 let mut validity_bufs = buffer_builders
695 .into_iter()
696 .rev()
697 .map(|mut b| LanceBuffer::from(b.buffer.finish().into_inner()));
698 for layer in self.layers.iter().rev() {
699 if layer.has_validity {
700 let nullable = NullableDataBlock {
701 data: Box::new(block),
702 nulls: validity_bufs.next().unwrap(),
703 block_info: BlockInfo::new(),
704 };
705 block = DataBlock::Nullable(nullable);
706 }
707 block = DataBlock::FixedSizeList(FixedSizeListBlock {
708 child: Box::new(block),
709 dimension: layer.dimension,
710 });
711 }
712
713 assert_eq!(num_rows, block.num_values() as usize);
714
715 block
716 }
717}
718
719impl FixedPerValueDecompressor for ValueDecompressor {
720 fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
721 if self.has_validity() {
722 Ok(self.unzip_decompress(data, num_rows as usize))
723 } else {
724 Ok(self.simple_decompress(data, num_rows))
725 }
726 }
727
728 fn bits_per_value(&self) -> u64 {
729 self.bits_per_value
730 }
731}
732
733impl PerValueCompressor for ValueEncoder {
734 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
735 let (data, encoding) = match data {
736 DataBlock::FixedWidth(fixed_width) => {
737 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
738 (PerValueDataBlock::Fixed(fixed_width), encoding)
739 }
740 DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
741 _ => unimplemented!(
742 "Cannot compress block of type {} with ValueEncoder",
743 data.name()
744 ),
745 };
746 Ok((data, encoding))
747 }
748}
749
750#[cfg(test)]
752pub(crate) mod tests {
753 use std::{
754 collections::HashMap,
755 sync::{Arc, LazyLock},
756 };
757
758 use arrow_array::{
759 Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array, ListArray, UInt8Array,
760 make_array, new_null_array, types::UInt32Type,
761 };
762 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
763 use arrow_schema::{DataType, Field, TimeUnit};
764 use lance_datagen::{ArrayGeneratorExt, Dimension, RowCount, array, gen_batch};
765
766 use crate::{
767 compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
768 data::DataBlock,
769 encodings::{
770 logical::primitive::{
771 fullzip::{PerValueCompressor, PerValueDataBlock},
772 miniblock::MiniBlockCompressor,
773 },
774 physical::value::ValueDecompressor,
775 },
776 format::pb21::compressive_encoding::Compression,
777 testing::{
778 FnArrayGeneratorProvider, TestCases, check_basic_random,
779 check_round_trip_encoding_generated, check_round_trip_encoding_of_data,
780 },
781 version::LanceFileVersion,
782 };
783
784 use super::ValueEncoder;
785
786 const PRIMITIVE_TYPES: &[DataType] = &[
787 DataType::Null,
788 DataType::FixedSizeBinary(2),
789 DataType::Date32,
790 DataType::Date64,
791 DataType::Int8,
792 DataType::Int16,
793 DataType::Int32,
794 DataType::Int64,
795 DataType::UInt8,
796 DataType::UInt16,
797 DataType::UInt32,
798 DataType::UInt64,
799 DataType::Float16,
800 DataType::Float32,
801 DataType::Float64,
802 DataType::Decimal128(10, 10),
803 DataType::Decimal256(10, 10),
804 DataType::Timestamp(TimeUnit::Nanosecond, None),
805 DataType::Time32(TimeUnit::Second),
806 DataType::Time64(TimeUnit::Nanosecond),
807 DataType::Duration(TimeUnit::Second),
808 ];
812
813 #[test_log::test(tokio::test)]
814 async fn test_simple_value() {
815 let items = Arc::new(Int32Array::from(vec![
816 Some(0),
817 None,
818 Some(2),
819 Some(3),
820 Some(4),
821 Some(5),
822 ]));
823
824 let test_cases = TestCases::default()
825 .with_range(0..3)
826 .with_range(0..2)
827 .with_range(1..3)
828 .with_indices(vec![0, 1, 2])
829 .with_indices(vec![1])
830 .with_indices(vec![2])
831 .with_min_file_version(LanceFileVersion::V2_1);
832
833 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
834 }
835
836 #[test_log::test(tokio::test)]
837 async fn test_simple_range() {
838 let items = Arc::new(Int32Array::from_iter(
839 (0..5000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
840 ));
841
842 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
843
844 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
845 }
846
847 #[test_log::test(tokio::test)]
848 async fn test_value_primitive() {
849 for data_type in PRIMITIVE_TYPES {
850 log::info!("Testing encoding for {:?}", data_type);
851 let field = Field::new("", data_type.clone(), false);
852 check_basic_random(field).await;
853 }
854 }
855
856 static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
857 vec![DataType::FixedSizeList(
858 Arc::new(Field::new("", DataType::Int32, false)),
859 128,
860 )]
861 });
862
863 #[test_log::test(tokio::test)]
864 async fn test_large_primitive() {
865 for data_type in LARGE_TYPES.iter() {
866 log::info!("Testing encoding for {:?}", data_type);
867 let field = Field::new("", data_type.clone(), false);
868 check_basic_random(field).await;
869 }
870 }
871
872 #[test_log::test(tokio::test)]
873 async fn test_decimal128_dictionary_encoding() {
874 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
875 let decimals: Vec<i32> = (0..100).collect();
876 let repeated_strings: Vec<_> = decimals
877 .iter()
878 .cycle()
879 .take(decimals.len() * 10000)
880 .map(|&v| Some(v as i128))
881 .collect();
882 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
883 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
884 }
885
886 #[test_log::test(tokio::test)]
887 async fn test_miniblock_stress() {
888 let data1 = (0..100)
892 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
893 .collect::<Vec<_>>();
894
895 let data2 = (0..100)
897 .map(|_| {
898 Arc::new(Int32Array::from_iter(
899 (0..100).map(|i| if i % 2 == 0 { Some(i) } else { None }),
900 )) as Arc<dyn Array>
901 })
902 .collect::<Vec<_>>();
903
904 let _data3 = (0..100)
907 .map(|chunk_idx| {
908 Arc::new(Int32Array::from_iter(
909 (0..100).map(|i| if chunk_idx < 50 { None } else { Some(i) }),
910 )) as Arc<dyn Array>
911 })
912 .collect::<Vec<_>>();
913
914 for data in [data1, data2 ] {
915 for batch_size in [10, 100, 1500, 15000] {
916 let test_cases = TestCases::default()
918 .with_page_sizes(vec![1000, 2000, 3000, 60000])
919 .with_batch_size(batch_size)
920 .with_min_file_version(LanceFileVersion::V2_1);
921
922 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
923 }
924 }
925 }
926
927 fn create_simple_fsl() -> FixedSizeListArray {
928 let items = Arc::new(Int32Array::from(vec![
930 Some(0),
931 Some(1),
932 Some(2),
933 Some(3),
934 None,
935 None,
936 None,
937 None,
938 Some(8),
939 Some(9),
940 None,
941 Some(11),
942 ]));
943 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
944 let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
945 let inner_list = Arc::new(FixedSizeListArray::new(
946 items_field.clone(),
947 2,
948 items,
949 Some(NullBuffer::new(inner_list_nulls)),
950 ));
951 let inner_list_field = Arc::new(Field::new(
952 "item",
953 DataType::FixedSizeList(items_field, 2),
954 true,
955 ));
956 FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
957 }
958
959 #[test]
960 fn test_fsl_value_compression_miniblock() {
961 let sample_list = create_simple_fsl();
962
963 let starting_data = DataBlock::from_array(sample_list.clone());
964
965 let encoder = ValueEncoder::default();
966 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
967
968 assert_eq!(data.num_values, 3);
969 assert_eq!(data.data.len(), 3);
970 assert_eq!(data.chunks.len(), 1);
971 assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
972 assert_eq!(data.chunks[0].log_num_values, 0);
973
974 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
975 panic!()
976 };
977
978 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
979
980 let decompressed =
981 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
982
983 let decompressed = make_array(
984 decompressed
985 .into_arrow(sample_list.data_type().clone(), true)
986 .unwrap(),
987 );
988
989 assert_eq!(decompressed.as_ref(), &sample_list);
990 }
991
992 #[test]
993 fn test_fsl_value_compression_per_value() {
994 let sample_list = create_simple_fsl();
995
996 let starting_data = DataBlock::from_array(sample_list.clone());
997
998 let encoder = ValueEncoder::default();
999 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1000
1001 let PerValueDataBlock::Fixed(data) = data else {
1002 panic!()
1003 };
1004
1005 assert_eq!(data.bits_per_value, 144);
1006 assert_eq!(data.num_values, 3);
1007 assert_eq!(data.data.len(), 18 * 3);
1008
1009 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1010 panic!()
1011 };
1012
1013 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1014
1015 let num_values = data.num_values;
1016 let decompressed =
1017 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1018
1019 let decompressed = make_array(
1020 decompressed
1021 .into_arrow(sample_list.data_type().clone(), true)
1022 .unwrap(),
1023 );
1024
1025 assert_eq!(decompressed.as_ref(), &sample_list);
1026 }
1027
1028 #[test_log::test(tokio::test)]
1029 async fn test_fsl_all_null() {
1030 let items = new_null_array(&DataType::Int32, 12);
1031 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1032 let list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1033 let list_array =
1034 FixedSizeListArray::new(items_field, 2, items, Some(NullBuffer::new(list_nulls)));
1035
1036 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
1037
1038 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1039 .await;
1040 }
1041
1042 #[test_log::test(tokio::test)]
1043 async fn regress_list_fsl() {
1044 let offsets = ScalarBuffer::<i32>::from(vec![0, 393, 755, 1156, 1536]);
1047 let data = UInt8Array::from(vec![0; 1536 * 16]);
1048 let fsl_field = Arc::new(Field::new("item", DataType::UInt8, true));
1049 let fsl = FixedSizeListArray::new(fsl_field, 16, Arc::new(data), None);
1050 let list_field = Arc::new(Field::new("item", fsl.data_type().clone(), false));
1051 let list_arr = ListArray::new(list_field, OffsetBuffer::new(offsets), Arc::new(fsl), None);
1052
1053 let test_cases = TestCases::default()
1054 .with_min_file_version(LanceFileVersion::V2_1)
1055 .with_batch_size(1);
1056
1057 check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1058 .await;
1059 }
1060
1061 fn create_random_fsl() -> Arc<dyn Array> {
1062 let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1064 let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1065 let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1066 let list_three = array::cycle_vec(list_two, Dimension::from(2));
1067
1068 let batch = gen_batch()
1070 .anon_col(list_three)
1071 .into_batch_rows(RowCount::from(8 * 1024))
1072 .unwrap();
1073 batch.column(0).clone()
1074 }
1075
1076 #[test]
1077 fn fsl_value_miniblock_stress() {
1078 let sample_array = create_random_fsl();
1079
1080 let starting_data = DataBlock::from_arrays(
1081 std::slice::from_ref(&sample_array),
1082 sample_array.len() as u64,
1083 );
1084
1085 let encoder = ValueEncoder::default();
1086 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1087
1088 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1089 panic!()
1090 };
1091
1092 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1093
1094 let decompressed =
1095 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1096
1097 let decompressed = make_array(
1098 decompressed
1099 .into_arrow(sample_array.data_type().clone(), true)
1100 .unwrap(),
1101 );
1102
1103 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1104 }
1105
1106 #[test]
1107 fn fsl_value_per_value_stress() {
1108 let sample_array = create_random_fsl();
1109
1110 let starting_data = DataBlock::from_arrays(
1111 std::slice::from_ref(&sample_array),
1112 sample_array.len() as u64,
1113 );
1114
1115 let encoder = ValueEncoder::default();
1116 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1117
1118 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1119 panic!()
1120 };
1121
1122 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1123
1124 let PerValueDataBlock::Fixed(data) = data else {
1125 panic!()
1126 };
1127
1128 let num_values = data.num_values;
1129 let decompressed =
1130 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1131
1132 let decompressed = make_array(
1133 decompressed
1134 .into_arrow(sample_array.data_type().clone(), true)
1135 .unwrap(),
1136 );
1137
1138 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1139 }
1140
1141 #[test_log::test(tokio::test)]
1142 async fn test_fsl_nullable_items() {
1143 let datagen = Box::new(FnArrayGeneratorProvider::new(move || {
1144 lance_datagen::array::rand_vec_nullable::<UInt32Type>(Dimension::from(128), 0.5)
1145 }));
1146
1147 let field = Field::new(
1148 "",
1149 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt32, true)), 128),
1150 false,
1151 );
1152 check_round_trip_encoding_generated(field, datagen, TestCases::default()).await;
1153 }
1154
1155 #[test_log::test(tokio::test)]
1156 async fn test_value_encoding_verification() {
1157 use std::collections::HashMap;
1158
1159 let test_cases = TestCases::default()
1160 .with_expected_encoding("flat")
1161 .with_min_file_version(LanceFileVersion::V2_1);
1162
1163 let mut metadata_explicit = HashMap::new();
1167 metadata_explicit.insert("lance-encoding:compression".to_string(), "none".to_string());
1168 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1169
1170 let arr_explicit =
1171 Arc::new(Int32Array::from((0..1000).collect::<Vec<i32>>())) as Arc<dyn Array>;
1172 check_round_trip_encoding_of_data(vec![arr_explicit], &test_cases, metadata_explicit).await;
1173
1174 let mut metadata = HashMap::new();
1178 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1179
1180 let arr_fallback = Arc::new(Int32Array::from(
1181 (0..100).map(|i| i * 73 + 19).collect::<Vec<i32>>(),
1182 )) as Arc<dyn Array>;
1183 check_round_trip_encoding_of_data(vec![arr_fallback], &test_cases, metadata).await;
1184 }
1185
1186 #[test_log::test(tokio::test)]
1187 async fn test_mixed_page_validity() {
1188 let no_nulls = Arc::new(Int32Array::from_iter_values([1, 2]));
1189 let has_nulls = Arc::new(Int32Array::from_iter([Some(3), None, Some(5)]));
1190
1191 let test_cases = TestCases::default().with_page_sizes(vec![1]);
1192 check_round_trip_encoding_of_data(vec![no_nulls, has_nulls], &test_cases, HashMap::new())
1193 .await;
1194 }
1195}