1use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use snafu::location;
6
7use crate::buffer::LanceBuffer;
8use crate::compression::{
9 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
10};
11use crate::data::{
12 BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
13};
14use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
15use crate::encodings::logical::primitive::miniblock::{
16 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
17 MAX_MINIBLOCK_VALUES,
18};
19use crate::format::pb21::compressive_encoding::Compression;
20use crate::format::pb21::{self, CompressiveEncoding};
21use crate::format::ProtobufUtils21;
22
23use lance_core::{Error, Result};
24
25#[derive(Debug, Default)]
27pub struct ValueEncoder {}
28
29impl ValueEncoder {
30 fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
32 let mut size_bytes = 2 * bytes_per_word;
33 let (mut log_num_vals, mut num_vals) = match values_per_word {
34 1 => (1, 2),
35 8 => (3, 8),
36 _ => unreachable!(),
37 };
38
39 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
41
42 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
43 log_num_vals += 1;
44 size_bytes *= 2;
45 num_vals *= 2;
46 }
47
48 (log_num_vals, num_vals)
49 }
50
51 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
52 let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
57 (data.bits_per_value / 8, 1)
58 } else {
59 (data.bits_per_value, 8)
60 };
61
62 let (log_vals_per_chunk, vals_per_chunk) =
64 Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
65 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
66 debug_assert_eq!(vals_per_chunk % values_per_word, 0);
67 let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
68 let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
69
70 let data_buffer = data.data;
71
72 let mut row_offset = 0;
73 let mut chunks = Vec::with_capacity(num_chunks);
74
75 let mut bytes_counter = 0;
76 loop {
77 if row_offset + vals_per_chunk <= data.num_values {
78 chunks.push(MiniBlockChunk {
79 log_num_values: log_vals_per_chunk as u8,
80 buffer_sizes: vec![bytes_per_chunk],
81 });
82 row_offset += vals_per_chunk;
83 bytes_counter += bytes_per_chunk as u64;
84 } else {
85 let num_bytes = data_buffer.len() as u64 - bytes_counter;
87 let num_bytes = u16::try_from(num_bytes).unwrap();
88 chunks.push(MiniBlockChunk {
89 log_num_values: 0,
90 buffer_sizes: vec![num_bytes],
91 });
92 break;
93 }
94 }
95
96 MiniBlockCompressed {
97 chunks,
98 data: vec![data_buffer],
99 num_values: data.num_values,
100 }
101 }
102}
103
104#[derive(Debug)]
105struct MiniblockFslLayer {
106 validity: Option<LanceBuffer>,
107 dimension: u64,
108}
109
110impl ValueEncoder {
127 fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> CompressiveEncoding {
128 let mut encoding = ProtobufUtils21::flat(bits_per_value, None);
129 for layer in layers.iter().rev() {
130 let has_validity = layer.validity.is_some();
131 let dimension = layer.dimension;
132 encoding = ProtobufUtils21::fsl(dimension, has_validity, encoding);
133 }
134 encoding
135 }
136
137 fn extract_fsl_chunk(
138 data: &FixedWidthDataBlock,
139 layers: &[MiniblockFslLayer],
140 row_offset: usize,
141 num_rows: usize,
142 validity_buffers: &mut [Vec<u8>],
143 ) -> Vec<u16> {
144 let mut row_offset = row_offset;
145 let mut num_values = num_rows;
146 let mut buffer_counter = 0;
147 let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
148 for layer in layers {
149 row_offset *= layer.dimension as usize;
150 num_values *= layer.dimension as usize;
151 if let Some(validity) = &layer.validity {
152 let validity_slice = validity
153 .clone()
154 .bit_slice_le_with_length(row_offset, num_values);
155 validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
156 buffer_sizes.push(validity_slice.len() as u16);
157 buffer_counter += 1;
158 }
159 }
160
161 let bits_in_chunk = data.bits_per_value * num_values as u64;
162 let bytes_in_chunk = bits_in_chunk.div_ceil(8);
163 let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
164 buffer_sizes.push(bytes_in_chunk);
165
166 buffer_sizes
167 }
168
169 fn chunk_fsl(
170 data: FixedWidthDataBlock,
171 layers: Vec<MiniblockFslLayer>,
172 num_rows: u64,
173 ) -> (MiniBlockCompressed, CompressiveEncoding) {
174 let mut ceil_bytes_validity = 0;
176 let mut cum_dim = 1;
177 let mut num_validity_buffers = 0;
178 for layer in &layers {
179 cum_dim *= layer.dimension;
180 if layer.validity.is_some() {
181 ceil_bytes_validity += cum_dim.div_ceil(8);
182 num_validity_buffers += 1;
183 }
184 }
185 let cum_bits_per_value = data.bits_per_value * cum_dim;
187 let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
188 (cum_bits_per_value / 8, 1)
189 } else {
190 (cum_bits_per_value, 8)
191 };
192 let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
193 let (log_rows_per_chunk, rows_per_chunk) =
194 Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
195
196 let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
197
198 let mut chunks = Vec::with_capacity(num_chunks);
200 let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
201 cum_dim = 1;
202 for layer in &layers {
203 cum_dim *= layer.dimension;
204 if let Some(validity) = &layer.validity {
205 let layer_bytes_validity = cum_dim.div_ceil(8);
206 let validity_with_padding =
207 layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
208 debug_assert!(validity_with_padding >= validity.len());
209 validity_buffers.push(Vec::with_capacity(
210 layer_bytes_validity as usize * num_chunks,
211 ));
212 }
213 }
214
215 let mut row_offset = 0;
217 while row_offset + rows_per_chunk <= num_rows {
218 let buffer_sizes = Self::extract_fsl_chunk(
219 &data,
220 &layers,
221 row_offset as usize,
222 rows_per_chunk as usize,
223 &mut validity_buffers,
224 );
225 row_offset += rows_per_chunk;
226 chunks.push(MiniBlockChunk {
227 log_num_values: log_rows_per_chunk as u8,
228 buffer_sizes,
229 })
230 }
231 let rows_in_chunk = num_rows - row_offset;
232 if rows_in_chunk > 0 {
233 let buffer_sizes = Self::extract_fsl_chunk(
234 &data,
235 &layers,
236 row_offset as usize,
237 rows_in_chunk as usize,
238 &mut validity_buffers,
239 );
240 chunks.push(MiniBlockChunk {
241 log_num_values: 0,
242 buffer_sizes,
243 });
244 }
245
246 let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
247 let buffers = validity_buffers
249 .into_iter()
250 .map(LanceBuffer::from)
251 .chain(std::iter::once(data.data))
252 .collect::<Vec<_>>();
253
254 (
255 MiniBlockCompressed {
256 chunks,
257 data: buffers,
258 num_values: num_rows,
259 },
260 encoding,
261 )
262 }
263
264 fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
265 let num_rows = data.num_values();
266 let fsl = data.as_fixed_size_list().unwrap();
267 let mut layers = Vec::new();
268 let mut child = *fsl.child;
269 let mut cur_layer = MiniblockFslLayer {
270 validity: None,
271 dimension: fsl.dimension,
272 };
273 loop {
274 if let DataBlock::Nullable(nullable) = child {
275 cur_layer.validity = Some(nullable.nulls);
276 child = *nullable.data;
277 }
278 match child {
279 DataBlock::FixedSizeList(inner) => {
280 layers.push(cur_layer);
281 cur_layer = MiniblockFslLayer {
282 validity: None,
283 dimension: inner.dimension,
284 };
285 child = *inner.child;
286 }
287 DataBlock::FixedWidth(inner) => {
288 layers.push(cur_layer);
289 return Self::chunk_fsl(inner, layers, num_rows);
290 }
291 _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
292 }
293 }
294 }
295}
296
297struct PerValueFslValidityIter {
298 buffer: LanceBuffer,
299 bits_per_row: usize,
300 offset: usize,
301}
302
303impl ValueEncoder {
308 fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> CompressiveEncoding {
309 let mut inner = fsl.child.as_ref();
310 let mut has_validity = false;
311 inner = match inner {
312 DataBlock::Nullable(nullable) => {
313 has_validity = true;
314 nullable.data.as_ref()
315 }
316 _ => inner,
317 };
318 let inner_encoding = match inner {
319 DataBlock::FixedWidth(fixed_width) => {
320 ProtobufUtils21::flat(fixed_width.bits_per_value, None)
321 }
322 DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
323 _ => unreachable!("Unexpected data block type in value encoder's fsl_to_encoding"),
324 };
325 ProtobufUtils21::fsl(fsl.dimension, has_validity, inner_encoding)
326 }
327
328 fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
329 let encoding = Self::fsl_to_encoding(&fsl);
331 let num_values = fsl.num_values();
332 let mut child = *fsl.child;
333 let mut cum_dim = 1;
334 loop {
335 cum_dim *= fsl.dimension;
336 match child {
337 DataBlock::Nullable(nullable) => {
338 child = *nullable.data;
339 }
340 DataBlock::FixedSizeList(inner) => {
341 child = *inner.child;
342 }
343 DataBlock::FixedWidth(inner) => {
344 let data = FixedWidthDataBlock {
345 bits_per_value: inner.bits_per_value * cum_dim,
346 num_values,
347 data: inner.data,
348 block_info: BlockInfo::new(),
349 };
350 return (PerValueDataBlock::Fixed(data), encoding);
351 }
352 _ => unreachable!(
353 "Unexpected data block type in value encoder's simple_per_value_fsl"
354 ),
355 }
356 }
357 }
358
359 fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
360 let encoding = Self::fsl_to_encoding(&fsl);
362 let num_values = fsl.num_values();
363 let mut bytes_per_row = 0;
364 let mut cum_dim = 1;
365 let mut current = fsl;
366 let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
367 let data_bytes_per_row: usize;
368 let data_buffer: LanceBuffer;
369 loop {
370 cum_dim *= current.dimension;
371 let mut child = *current.child;
372 if let DataBlock::Nullable(nullable) = child {
373 bytes_per_row += cum_dim.div_ceil(8) as usize;
375 validity_iters.push(PerValueFslValidityIter {
376 buffer: nullable.nulls,
377 bits_per_row: cum_dim as usize,
378 offset: 0,
379 });
380 child = *nullable.data;
381 };
382 match child {
383 DataBlock::FixedSizeList(inner) => {
384 current = inner;
385 }
386 DataBlock::FixedWidth(fixed_width) => {
387 data_bytes_per_row =
388 (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
389 bytes_per_row += data_bytes_per_row;
390 data_buffer = fixed_width.data;
391 break;
392 }
393 _ => unreachable!(
394 "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
395 child
396 ),
397 }
398 }
399
400 let bytes_needed = bytes_per_row * num_values as usize;
401 let mut zipped = Vec::with_capacity(bytes_needed);
402 let data_slice = &data_buffer;
403 for i in 0..num_values as usize {
405 for validity in validity_iters.iter_mut() {
406 let validity_slice = validity
407 .buffer
408 .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
409 zipped.extend_from_slice(&validity_slice);
410 validity.offset += validity.bits_per_row;
411 }
412 let start = i * data_bytes_per_row;
413 let end = start + data_bytes_per_row;
414 zipped.extend_from_slice(&data_slice[start..end]);
415 }
416
417 let zipped = LanceBuffer::from(zipped);
418 let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
419 bits_per_value: bytes_per_row as u64 * 8,
420 num_values,
421 data: zipped,
422 block_info: BlockInfo::new(),
423 });
424 (data, encoding)
425 }
426
427 fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
428 if !fsl.child.is_nullable() {
429 Self::simple_per_value_fsl(fsl)
430 } else {
431 Self::nullable_per_value_fsl(fsl)
432 }
433 }
434}
435
436impl BlockCompressor for ValueEncoder {
437 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
438 let data = match data {
439 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
440 _ => unimplemented!(
441 "Cannot compress block of type {} with ValueEncoder",
442 data.name()
443 ),
444 };
445 Ok(data)
446 }
447}
448
449impl MiniBlockCompressor for ValueEncoder {
450 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
451 match chunk {
452 DataBlock::FixedWidth(fixed_width) => {
453 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
454 Ok((Self::chunk_data(fixed_width), encoding))
455 }
456 DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
457 _ => Err(Error::InvalidInput {
458 source: format!(
459 "Cannot compress a data block of type {} with ValueEncoder",
460 chunk.name()
461 )
462 .into(),
463 location: location!(),
464 }),
465 }
466 }
467}
468
469#[derive(Debug)]
470struct ValueFslDesc {
471 dimension: u64,
472 has_validity: bool,
473}
474
475#[derive(Debug)]
478pub struct ValueDecompressor {
479 bits_per_item: u64,
481 bits_per_value: u64,
486 items_per_value: u64,
488 layers: Vec<ValueFslDesc>,
489}
490
491impl ValueDecompressor {
492 pub fn from_flat(description: &pb21::Flat) -> Self {
493 Self {
494 bits_per_item: description.bits_per_value,
495 bits_per_value: description.bits_per_value,
496 items_per_value: 1,
497 layers: Vec::default(),
498 }
499 }
500
501 pub fn from_fsl(mut description: &pb21::FixedSizeList) -> Self {
502 let mut layers = Vec::new();
503 let mut cum_dim = 1;
504 let mut bytes_per_value = 0;
505 loop {
506 layers.push(ValueFslDesc {
507 has_validity: description.has_validity,
508 dimension: description.items_per_value,
509 });
510 cum_dim *= description.items_per_value;
511 if description.has_validity {
512 bytes_per_value += cum_dim.div_ceil(8);
513 }
514 match description
515 .values
516 .as_ref()
517 .unwrap()
518 .compression
519 .as_ref()
520 .unwrap()
521 {
522 Compression::FixedSizeList(inner) => {
523 description = inner;
524 }
525 Compression::Flat(flat) => {
526 let mut bits_per_value = bytes_per_value * 8;
527 bits_per_value += flat.bits_per_value * cum_dim;
528 return Self {
529 bits_per_item: flat.bits_per_value,
530 bits_per_value,
531 items_per_value: cum_dim,
532 layers,
533 };
534 }
535 _ => unreachable!(),
536 }
537 }
538 }
539
540 fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
541 DataBlock::FixedWidth(FixedWidthDataBlock {
542 bits_per_value: self.bits_per_item,
543 num_values,
544 data,
545 block_info: BlockInfo::new(),
546 })
547 }
548}
549
550impl BlockDecompressor for ValueDecompressor {
551 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
552 let block = self.buffer_to_block(data, num_values);
553 assert_eq!(block.num_values(), num_values);
554 Ok(block)
555 }
556}
557
558impl MiniBlockDecompressor for ValueDecompressor {
559 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
560 let num_items = num_values * self.items_per_value;
561 let mut buffer_iter = data.into_iter().rev();
562
563 let data_buf = buffer_iter.next().unwrap();
565 let items = self.buffer_to_block(data_buf, num_items);
566 let mut lists = items;
567
568 for layer in self.layers.iter().rev() {
569 if layer.has_validity {
570 let validity_buf = buffer_iter.next().unwrap();
571 lists = DataBlock::Nullable(NullableDataBlock {
572 data: Box::new(lists),
573 nulls: validity_buf,
574 block_info: BlockInfo::default(),
575 });
576 }
577 lists = DataBlock::FixedSizeList(FixedSizeListBlock {
578 child: Box::new(lists),
579 dimension: layer.dimension,
580 })
581 }
582
583 assert_eq!(lists.num_values(), num_values);
584 Ok(lists)
585 }
586}
587
588struct FslDecompressorValidityBuilder {
589 buffer: BooleanBufferBuilder,
590 bits_per_row: usize,
591 bytes_per_row: usize,
592}
593
594impl ValueDecompressor {
596 fn has_validity(&self) -> bool {
597 self.layers.iter().any(|layer| layer.has_validity)
598 }
599
600 fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
602 let mut cum_dim = 1;
603 for layer in &self.layers {
604 cum_dim *= layer.dimension;
605 }
606 debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
607 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
608 bits_per_value: self.bits_per_item,
609 num_values: num_rows * cum_dim,
610 data: data.data,
611 block_info: BlockInfo::new(),
612 });
613 for layer in self.layers.iter().rev() {
614 block = DataBlock::FixedSizeList(FixedSizeListBlock {
615 child: Box::new(block),
616 dimension: layer.dimension,
617 });
618 }
619 debug_assert_eq!(num_rows, block.num_values());
620 block
621 }
622
623 fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
625 assert_eq!(self.bits_per_item % 8, 0);
627 let bytes_per_item = self.bits_per_item / 8;
628 let mut buffer_builders = Vec::with_capacity(self.layers.len());
629 let mut cum_dim = 1;
630 let mut total_size_bytes = 0;
631 for layer in &self.layers {
633 cum_dim *= layer.dimension as usize;
634 if layer.has_validity {
635 let validity_size_bits = cum_dim;
636 let validity_size_bytes = validity_size_bits.div_ceil(8);
637 total_size_bytes += num_rows * validity_size_bytes;
638 buffer_builders.push(FslDecompressorValidityBuilder {
639 buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
640 bits_per_row: cum_dim,
641 bytes_per_row: validity_size_bytes,
642 })
643 }
644 }
645 let num_items = num_rows * cum_dim;
646 let data_size = num_items * bytes_per_item as usize;
647 total_size_bytes += data_size;
648 let mut data_buffer = Vec::with_capacity(data_size);
649
650 assert_eq!(data.data.len(), total_size_bytes);
651
652 let bytes_per_value = bytes_per_item as usize;
653 let data_bytes_per_row = bytes_per_value * cum_dim;
654
655 let mut data_offset = 0;
657 while data_offset < total_size_bytes {
658 for builder in buffer_builders.iter_mut() {
659 let start = data_offset * 8;
660 let end = start + builder.bits_per_row;
661 builder.buffer.append_packed_range(start..end, &data.data);
662 data_offset += builder.bytes_per_row;
663 }
664 let end = data_offset + data_bytes_per_row;
665 data_buffer.extend_from_slice(&data.data[data_offset..end]);
666 data_offset += data_bytes_per_row;
667 }
668
669 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
671 bits_per_value: self.bits_per_value,
672 num_values: num_items as u64,
673 data: LanceBuffer::from(data_buffer),
674 block_info: BlockInfo::new(),
675 });
676
677 let mut validity_bufs = buffer_builders
678 .into_iter()
679 .rev()
680 .map(|mut b| LanceBuffer::from(b.buffer.finish().into_inner()));
681 for layer in self.layers.iter().rev() {
682 if layer.has_validity {
683 let nullable = NullableDataBlock {
684 data: Box::new(block),
685 nulls: validity_bufs.next().unwrap(),
686 block_info: BlockInfo::new(),
687 };
688 block = DataBlock::Nullable(nullable);
689 }
690 block = DataBlock::FixedSizeList(FixedSizeListBlock {
691 child: Box::new(block),
692 dimension: layer.dimension,
693 });
694 }
695
696 assert_eq!(num_rows, block.num_values() as usize);
697
698 block
699 }
700}
701
702impl FixedPerValueDecompressor for ValueDecompressor {
703 fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
704 if self.has_validity() {
705 Ok(self.unzip_decompress(data, num_rows as usize))
706 } else {
707 Ok(self.simple_decompress(data, num_rows))
708 }
709 }
710
711 fn bits_per_value(&self) -> u64 {
712 self.bits_per_value
713 }
714}
715
716impl PerValueCompressor for ValueEncoder {
717 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
718 let (data, encoding) = match data {
719 DataBlock::FixedWidth(fixed_width) => {
720 let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
721 (PerValueDataBlock::Fixed(fixed_width), encoding)
722 }
723 DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
724 _ => unimplemented!(
725 "Cannot compress block of type {} with ValueEncoder",
726 data.name()
727 ),
728 };
729 Ok((data, encoding))
730 }
731}
732
733#[cfg(test)]
735pub(crate) mod tests {
736 use std::{
737 collections::HashMap,
738 sync::{Arc, LazyLock},
739 };
740
741 use arrow_array::{
742 make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
743 };
744 use arrow_buffer::{BooleanBuffer, NullBuffer};
745 use arrow_schema::{DataType, Field, TimeUnit};
746 use lance_datagen::{array, gen_batch, ArrayGeneratorExt, Dimension, RowCount};
747 use rstest::rstest;
748
749 use crate::{
750 compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
751 data::DataBlock,
752 encodings::{
753 logical::primitive::{
754 fullzip::{PerValueCompressor, PerValueDataBlock},
755 miniblock::MiniBlockCompressor,
756 },
757 physical::value::ValueDecompressor,
758 },
759 format::pb21::compressive_encoding::Compression,
760 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
761 version::LanceFileVersion,
762 };
763
764 use super::ValueEncoder;
765
766 const PRIMITIVE_TYPES: &[DataType] = &[
767 DataType::Null,
768 DataType::FixedSizeBinary(2),
769 DataType::Date32,
770 DataType::Date64,
771 DataType::Int8,
772 DataType::Int16,
773 DataType::Int32,
774 DataType::Int64,
775 DataType::UInt8,
776 DataType::UInt16,
777 DataType::UInt32,
778 DataType::UInt64,
779 DataType::Float16,
780 DataType::Float32,
781 DataType::Float64,
782 DataType::Decimal128(10, 10),
783 DataType::Decimal256(10, 10),
784 DataType::Timestamp(TimeUnit::Nanosecond, None),
785 DataType::Time32(TimeUnit::Second),
786 DataType::Time64(TimeUnit::Nanosecond),
787 DataType::Duration(TimeUnit::Second),
788 ];
792
793 #[test_log::test(tokio::test)]
794 async fn test_simple_value() {
795 let items = Arc::new(Int32Array::from(vec![
796 Some(0),
797 None,
798 Some(2),
799 Some(3),
800 Some(4),
801 Some(5),
802 ]));
803
804 let test_cases = TestCases::default()
805 .with_range(0..3)
806 .with_range(0..2)
807 .with_range(1..3)
808 .with_indices(vec![0, 1, 2])
809 .with_indices(vec![1])
810 .with_indices(vec![2])
811 .with_file_version(LanceFileVersion::V2_1);
812
813 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
814 }
815
816 #[rstest]
817 #[test_log::test(tokio::test)]
818 async fn test_value_primitive(
819 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
820 ) {
821 for data_type in PRIMITIVE_TYPES {
822 log::info!("Testing encoding for {:?}", data_type);
823 let field = Field::new("", data_type.clone(), false);
824 check_round_trip_encoding_random(field, version).await;
825 }
826 }
827
828 static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
829 vec![DataType::FixedSizeList(
830 Arc::new(Field::new("", DataType::Int32, false)),
831 128,
832 )]
833 });
834
835 #[rstest]
836 #[test_log::test(tokio::test)]
837 async fn test_large_primitive(
838 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
839 ) {
840 for data_type in LARGE_TYPES.iter() {
841 log::info!("Testing encoding for {:?}", data_type);
842 let field = Field::new("", data_type.clone(), false);
843 check_round_trip_encoding_random(field, version).await;
844 }
845 }
846
847 #[test_log::test(tokio::test)]
848 async fn test_decimal128_dictionary_encoding() {
849 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
850 let decimals: Vec<i32> = (0..100).collect();
851 let repeated_strings: Vec<_> = decimals
852 .iter()
853 .cycle()
854 .take(decimals.len() * 10000)
855 .map(|&v| Some(v as i128))
856 .collect();
857 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
858 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
859 }
860
861 #[test_log::test(tokio::test)]
862 async fn test_miniblock_stress() {
863 let data1 = (0..100)
867 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
868 .collect::<Vec<_>>();
869
870 let data2 = (0..100)
872 .map(|_| {
873 Arc::new(Int32Array::from_iter((0..100).map(|i| {
874 if i % 2 == 0 {
875 Some(i)
876 } else {
877 None
878 }
879 }))) as Arc<dyn Array>
880 })
881 .collect::<Vec<_>>();
882
883 let _data3 = (0..100)
886 .map(|chunk_idx| {
887 Arc::new(Int32Array::from_iter((0..100).map(|i| {
888 if chunk_idx < 50 {
889 None
890 } else {
891 Some(i)
892 }
893 }))) as Arc<dyn Array>
894 })
895 .collect::<Vec<_>>();
896
897 for data in [data1, data2 ] {
898 for batch_size in [10, 100, 1500, 15000] {
899 let test_cases = TestCases::default()
901 .with_page_sizes(vec![1000, 2000, 3000, 60000])
902 .with_batch_size(batch_size)
903 .with_file_version(LanceFileVersion::V2_1);
904
905 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
906 }
907 }
908 }
909
910 fn create_simple_fsl() -> FixedSizeListArray {
911 let items = Arc::new(Int32Array::from(vec![
913 Some(0),
914 Some(1),
915 Some(2),
916 Some(3),
917 None,
918 None,
919 None,
920 None,
921 Some(8),
922 Some(9),
923 None,
924 Some(11),
925 ]));
926 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
927 let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
928 let inner_list = Arc::new(FixedSizeListArray::new(
929 items_field.clone(),
930 2,
931 items,
932 Some(NullBuffer::new(inner_list_nulls)),
933 ));
934 let inner_list_field = Arc::new(Field::new(
935 "item",
936 DataType::FixedSizeList(items_field, 2),
937 true,
938 ));
939 FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
940 }
941
942 #[test]
943 fn test_fsl_value_compression_miniblock() {
944 let sample_list = create_simple_fsl();
945
946 let starting_data = DataBlock::from_array(sample_list.clone());
947
948 let encoder = ValueEncoder::default();
949 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
950
951 assert_eq!(data.num_values, 3);
952 assert_eq!(data.data.len(), 3);
953 assert_eq!(data.chunks.len(), 1);
954 assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
955 assert_eq!(data.chunks[0].log_num_values, 0);
956
957 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
958 panic!()
959 };
960
961 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
962
963 let decompressed =
964 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
965
966 let decompressed = make_array(
967 decompressed
968 .into_arrow(sample_list.data_type().clone(), true)
969 .unwrap(),
970 );
971
972 assert_eq!(decompressed.as_ref(), &sample_list);
973 }
974
975 #[test]
976 fn test_fsl_value_compression_per_value() {
977 let sample_list = create_simple_fsl();
978
979 let starting_data = DataBlock::from_array(sample_list.clone());
980
981 let encoder = ValueEncoder::default();
982 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
983
984 let PerValueDataBlock::Fixed(data) = data else {
985 panic!()
986 };
987
988 assert_eq!(data.bits_per_value, 144);
989 assert_eq!(data.num_values, 3);
990 assert_eq!(data.data.len(), 18 * 3);
991
992 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
993 panic!()
994 };
995
996 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
997
998 let num_values = data.num_values;
999 let decompressed =
1000 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1001
1002 let decompressed = make_array(
1003 decompressed
1004 .into_arrow(sample_list.data_type().clone(), true)
1005 .unwrap(),
1006 );
1007
1008 assert_eq!(decompressed.as_ref(), &sample_list);
1009 }
1010
1011 fn create_random_fsl() -> Arc<dyn Array> {
1012 let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1014 let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1015 let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1016 let list_three = array::cycle_vec(list_two, Dimension::from(2));
1017
1018 let batch = gen_batch()
1020 .anon_col(list_three)
1021 .into_batch_rows(RowCount::from(8 * 1024))
1022 .unwrap();
1023 batch.column(0).clone()
1024 }
1025
1026 #[test]
1027 fn fsl_value_miniblock_stress() {
1028 let sample_array = create_random_fsl();
1029
1030 let starting_data = DataBlock::from_arrays(
1031 std::slice::from_ref(&sample_array),
1032 sample_array.len() as u64,
1033 );
1034
1035 let encoder = ValueEncoder::default();
1036 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1037
1038 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1039 panic!()
1040 };
1041
1042 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1043
1044 let decompressed =
1045 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1046
1047 let decompressed = make_array(
1048 decompressed
1049 .into_arrow(sample_array.data_type().clone(), true)
1050 .unwrap(),
1051 );
1052
1053 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1054 }
1055
1056 #[test]
1057 fn fsl_value_per_value_stress() {
1058 let sample_array = create_random_fsl();
1059
1060 let starting_data = DataBlock::from_arrays(
1061 std::slice::from_ref(&sample_array),
1062 sample_array.len() as u64,
1063 );
1064
1065 let encoder = ValueEncoder::default();
1066 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1067
1068 let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1069 panic!()
1070 };
1071
1072 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1073
1074 let PerValueDataBlock::Fixed(data) = data else {
1075 panic!()
1076 };
1077
1078 let num_values = data.num_values;
1079 let decompressed =
1080 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1081
1082 let decompressed = make_array(
1083 decompressed
1084 .into_arrow(sample_array.data_type().clone(), true)
1085 .unwrap(),
1086 );
1087
1088 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1089 }
1090
1091 #[test_log::test(tokio::test)]
1092 async fn test_value_encoding_verification() {
1093 use std::collections::HashMap;
1094
1095 let test_cases = TestCases::default()
1096 .with_expected_encoding("flat")
1097 .with_file_version(LanceFileVersion::V2_1);
1098
1099 let mut metadata_explicit = HashMap::new();
1103 metadata_explicit.insert("lance-encoding:compression".to_string(), "none".to_string());
1104 metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1105
1106 let arr_explicit =
1107 Arc::new(Int32Array::from((0..1000).collect::<Vec<i32>>())) as Arc<dyn Array>;
1108 check_round_trip_encoding_of_data(vec![arr_explicit], &test_cases, metadata_explicit).await;
1109
1110 let mut metadata = HashMap::new();
1114 metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1115
1116 let arr_fallback = Arc::new(Int32Array::from(
1117 (0..100).map(|i| i * 73 + 19).collect::<Vec<i32>>(),
1118 )) as Arc<dyn Array>;
1119 check_round_trip_encoding_of_data(vec![arr_fallback], &test_cases, metadata).await;
1120 }
1121}