1use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use snafu::location;
6
7use crate::buffer::LanceBuffer;
8use crate::compression::{
9 BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
10};
11use crate::data::{
12 BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
13};
14use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
15use crate::encodings::logical::primitive::miniblock::{
16 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
17 MAX_MINIBLOCK_VALUES,
18};
19use crate::format::pb::{self, ArrayEncoding};
20use crate::format::ProtobufUtils;
21
22use lance_core::{Error, Result};
23
24#[derive(Debug, Default)]
26pub struct ValueEncoder {}
27
28impl ValueEncoder {
29 fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
31 let mut size_bytes = 2 * bytes_per_word;
32 let (mut log_num_vals, mut num_vals) = match values_per_word {
33 1 => (1, 2),
34 8 => (3, 8),
35 _ => unreachable!(),
36 };
37
38 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
40
41 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
42 log_num_vals += 1;
43 size_bytes *= 2;
44 num_vals *= 2;
45 }
46
47 (log_num_vals, num_vals)
48 }
49
50 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
51 let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
56 (data.bits_per_value / 8, 1)
57 } else {
58 (data.bits_per_value, 8)
59 };
60
61 let (log_vals_per_chunk, vals_per_chunk) =
63 Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
64 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
65 debug_assert_eq!(vals_per_chunk % values_per_word, 0);
66 let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
67 let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
68
69 let data_buffer = data.data;
70
71 let mut row_offset = 0;
72 let mut chunks = Vec::with_capacity(num_chunks);
73
74 let mut bytes_counter = 0;
75 loop {
76 if row_offset + vals_per_chunk <= data.num_values {
77 chunks.push(MiniBlockChunk {
78 log_num_values: log_vals_per_chunk as u8,
79 buffer_sizes: vec![bytes_per_chunk],
80 });
81 row_offset += vals_per_chunk;
82 bytes_counter += bytes_per_chunk as u64;
83 } else {
84 let num_bytes = data_buffer.len() as u64 - bytes_counter;
86 let num_bytes = u16::try_from(num_bytes).unwrap();
87 chunks.push(MiniBlockChunk {
88 log_num_values: 0,
89 buffer_sizes: vec![num_bytes],
90 });
91 break;
92 }
93 }
94
95 MiniBlockCompressed {
96 chunks,
97 data: vec![data_buffer],
98 num_values: data.num_values,
99 }
100 }
101}
102
103#[derive(Debug)]
104struct MiniblockFslLayer {
105 validity: Option<LanceBuffer>,
106 dimension: u64,
107}
108
109impl ValueEncoder {
126 fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> ArrayEncoding {
127 let mut encoding = ProtobufUtils::flat_encoding(bits_per_value, 0, None);
128 for layer in layers.iter().rev() {
129 let has_validity = layer.validity.is_some();
130 let dimension = layer.dimension;
131 encoding = ProtobufUtils::fsl_encoding(dimension, encoding, has_validity);
132 }
133 encoding
134 }
135
136 fn extract_fsl_chunk(
137 data: &FixedWidthDataBlock,
138 layers: &[MiniblockFslLayer],
139 row_offset: usize,
140 num_rows: usize,
141 validity_buffers: &mut [Vec<u8>],
142 ) -> Vec<u16> {
143 let mut row_offset = row_offset;
144 let mut num_values = num_rows;
145 let mut buffer_counter = 0;
146 let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
147 for layer in layers {
148 row_offset *= layer.dimension as usize;
149 num_values *= layer.dimension as usize;
150 if let Some(validity) = &layer.validity {
151 let validity_slice = validity
152 .try_clone()
153 .unwrap()
154 .bit_slice_le_with_length(row_offset, num_values);
155 validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
156 buffer_sizes.push(validity_slice.len() as u16);
157 buffer_counter += 1;
158 }
159 }
160
161 let bits_in_chunk = data.bits_per_value * num_values as u64;
162 let bytes_in_chunk = bits_in_chunk.div_ceil(8);
163 let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
164 buffer_sizes.push(bytes_in_chunk);
165
166 buffer_sizes
167 }
168
169 fn chunk_fsl(
170 data: FixedWidthDataBlock,
171 layers: Vec<MiniblockFslLayer>,
172 num_rows: u64,
173 ) -> (MiniBlockCompressed, ArrayEncoding) {
174 let mut ceil_bytes_validity = 0;
176 let mut cum_dim = 1;
177 let mut num_validity_buffers = 0;
178 for layer in &layers {
179 cum_dim *= layer.dimension;
180 if layer.validity.is_some() {
181 ceil_bytes_validity += cum_dim.div_ceil(8);
182 num_validity_buffers += 1;
183 }
184 }
185 let cum_bits_per_value = data.bits_per_value * cum_dim;
187 let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
188 (cum_bits_per_value / 8, 1)
189 } else {
190 (cum_bits_per_value, 8)
191 };
192 let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
193 let (log_rows_per_chunk, rows_per_chunk) =
194 Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
195
196 let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
197
198 let mut chunks = Vec::with_capacity(num_chunks);
200 let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
201 cum_dim = 1;
202 for layer in &layers {
203 cum_dim *= layer.dimension;
204 if let Some(validity) = &layer.validity {
205 let layer_bytes_validity = cum_dim.div_ceil(8);
206 let validity_with_padding =
207 layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
208 debug_assert!(validity_with_padding >= validity.len());
209 validity_buffers.push(Vec::with_capacity(
210 layer_bytes_validity as usize * num_chunks,
211 ));
212 }
213 }
214
215 let mut row_offset = 0;
217 while row_offset + rows_per_chunk <= num_rows {
218 let buffer_sizes = Self::extract_fsl_chunk(
219 &data,
220 &layers,
221 row_offset as usize,
222 rows_per_chunk as usize,
223 &mut validity_buffers,
224 );
225 row_offset += rows_per_chunk;
226 chunks.push(MiniBlockChunk {
227 log_num_values: log_rows_per_chunk as u8,
228 buffer_sizes,
229 })
230 }
231 let rows_in_chunk = num_rows - row_offset;
232 if rows_in_chunk > 0 {
233 let buffer_sizes = Self::extract_fsl_chunk(
234 &data,
235 &layers,
236 row_offset as usize,
237 rows_in_chunk as usize,
238 &mut validity_buffers,
239 );
240 chunks.push(MiniBlockChunk {
241 log_num_values: 0,
242 buffer_sizes,
243 });
244 }
245
246 let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
247 let buffers = validity_buffers
249 .into_iter()
250 .map(LanceBuffer::Owned)
251 .chain(std::iter::once(data.data))
252 .collect::<Vec<_>>();
253
254 (
255 MiniBlockCompressed {
256 chunks,
257 data: buffers,
258 num_values: num_rows,
259 },
260 encoding,
261 )
262 }
263
264 fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, ArrayEncoding) {
265 let num_rows = data.num_values();
266 let fsl = data.as_fixed_size_list().unwrap();
267 let mut layers = Vec::new();
268 let mut child = *fsl.child;
269 let mut cur_layer = MiniblockFslLayer {
270 validity: None,
271 dimension: fsl.dimension,
272 };
273 loop {
274 if let DataBlock::Nullable(nullable) = child {
275 cur_layer.validity = Some(nullable.nulls);
276 child = *nullable.data;
277 }
278 match child {
279 DataBlock::FixedSizeList(inner) => {
280 layers.push(cur_layer);
281 cur_layer = MiniblockFslLayer {
282 validity: None,
283 dimension: inner.dimension,
284 };
285 child = *inner.child;
286 }
287 DataBlock::FixedWidth(inner) => {
288 layers.push(cur_layer);
289 return Self::chunk_fsl(inner, layers, num_rows);
290 }
291 _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
292 }
293 }
294 }
295}
296
297struct PerValueFslValidityIter {
298 buffer: LanceBuffer,
299 bits_per_row: usize,
300 offset: usize,
301}
302
303impl ValueEncoder {
308 fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> ArrayEncoding {
309 let mut inner = fsl.child.as_ref();
310 let mut has_validity = false;
311 inner = match inner {
312 DataBlock::Nullable(nullable) => {
313 has_validity = true;
314 nullable.data.as_ref()
315 }
316 _ => inner,
317 };
318 let inner_encoding = match inner {
319 DataBlock::FixedWidth(fixed_width) => {
320 ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
321 }
322 DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
323 _ => unreachable!("Unexpected data block type in value encoder's fsl_to_encoding"),
324 };
325 ProtobufUtils::fsl_encoding(fsl.dimension, inner_encoding, has_validity)
326 }
327
328 fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
329 let encoding = Self::fsl_to_encoding(&fsl);
331 let num_values = fsl.num_values();
332 let mut child = *fsl.child;
333 let mut cum_dim = 1;
334 loop {
335 cum_dim *= fsl.dimension;
336 match child {
337 DataBlock::Nullable(nullable) => {
338 child = *nullable.data;
339 }
340 DataBlock::FixedSizeList(inner) => {
341 child = *inner.child;
342 }
343 DataBlock::FixedWidth(inner) => {
344 let data = FixedWidthDataBlock {
345 bits_per_value: inner.bits_per_value * cum_dim,
346 num_values,
347 data: inner.data,
348 block_info: BlockInfo::new(),
349 };
350 return (PerValueDataBlock::Fixed(data), encoding);
351 }
352 _ => unreachable!(
353 "Unexpected data block type in value encoder's simple_per_value_fsl"
354 ),
355 }
356 }
357 }
358
359 fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
360 let encoding = Self::fsl_to_encoding(&fsl);
362 let num_values = fsl.num_values();
363 let mut bytes_per_row = 0;
364 let mut cum_dim = 1;
365 let mut current = fsl;
366 let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
367 let data_bytes_per_row: usize;
368 let data_buffer: LanceBuffer;
369 loop {
370 cum_dim *= current.dimension;
371 let mut child = *current.child;
372 if let DataBlock::Nullable(nullable) = child {
373 bytes_per_row += cum_dim.div_ceil(8) as usize;
375 validity_iters.push(PerValueFslValidityIter {
376 buffer: nullable.nulls,
377 bits_per_row: cum_dim as usize,
378 offset: 0,
379 });
380 child = *nullable.data;
381 };
382 match child {
383 DataBlock::FixedSizeList(inner) => {
384 current = inner;
385 }
386 DataBlock::FixedWidth(fixed_width) => {
387 data_bytes_per_row =
388 (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
389 bytes_per_row += data_bytes_per_row;
390 data_buffer = fixed_width.data;
391 break;
392 }
393 _ => unreachable!(
394 "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
395 child
396 ),
397 }
398 }
399
400 let bytes_needed = bytes_per_row * num_values as usize;
401 let mut zipped = Vec::with_capacity(bytes_needed);
402 let data_slice = &data_buffer;
403 for i in 0..num_values as usize {
405 for validity in validity_iters.iter_mut() {
406 let validity_slice = validity
407 .buffer
408 .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
409 zipped.extend_from_slice(&validity_slice);
410 validity.offset += validity.bits_per_row;
411 }
412 let start = i * data_bytes_per_row;
413 let end = start + data_bytes_per_row;
414 zipped.extend_from_slice(&data_slice[start..end]);
415 }
416
417 let zipped = LanceBuffer::Owned(zipped);
418 let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
419 bits_per_value: bytes_per_row as u64 * 8,
420 num_values,
421 data: zipped,
422 block_info: BlockInfo::new(),
423 });
424 (data, encoding)
425 }
426
427 fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
428 if !fsl.child.is_nullable() {
429 Self::simple_per_value_fsl(fsl)
430 } else {
431 Self::nullable_per_value_fsl(fsl)
432 }
433 }
434}
435
436impl BlockCompressor for ValueEncoder {
437 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
438 let data = match data {
439 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
440 _ => unimplemented!(
441 "Cannot compress block of type {} with ValueEncoder",
442 data.name()
443 ),
444 };
445 Ok(data)
446 }
447}
448
449impl MiniBlockCompressor for ValueEncoder {
450 fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
451 match chunk {
452 DataBlock::FixedWidth(fixed_width) => {
453 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
454 Ok((Self::chunk_data(fixed_width), encoding))
455 }
456 DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
457 _ => Err(Error::InvalidInput {
458 source: format!(
459 "Cannot compress a data block of type {} with ValueEncoder",
460 chunk.name()
461 )
462 .into(),
463 location: location!(),
464 }),
465 }
466 }
467}
468
469#[derive(Debug)]
470struct ValueFslDesc {
471 dimension: u64,
472 has_validity: bool,
473}
474
475#[derive(Debug)]
478pub struct ValueDecompressor {
479 bits_per_item: u64,
481 bits_per_value: u64,
486 items_per_value: u64,
488 layers: Vec<ValueFslDesc>,
489}
490
491impl ValueDecompressor {
492 pub fn from_flat(description: &pb::Flat) -> Self {
493 Self {
494 bits_per_item: description.bits_per_value,
495 bits_per_value: description.bits_per_value,
496 items_per_value: 1,
497 layers: Vec::default(),
498 }
499 }
500
501 pub fn from_fsl(mut description: &pb::FixedSizeList) -> Self {
502 let mut layers = Vec::new();
503 let mut cum_dim = 1;
504 let mut bytes_per_value = 0;
505 loop {
506 layers.push(ValueFslDesc {
507 has_validity: description.has_validity,
508 dimension: description.dimension as u64,
509 });
510 cum_dim *= description.dimension as u64;
511 if description.has_validity {
512 bytes_per_value += cum_dim.div_ceil(8);
513 }
514 match description
515 .items
516 .as_ref()
517 .unwrap()
518 .array_encoding
519 .as_ref()
520 .unwrap()
521 {
522 pb::array_encoding::ArrayEncoding::FixedSizeList(inner) => {
523 description = inner;
524 }
525 pb::array_encoding::ArrayEncoding::Flat(flat) => {
526 let mut bits_per_value = bytes_per_value * 8;
527 bits_per_value += flat.bits_per_value * cum_dim;
528 return Self {
529 bits_per_item: flat.bits_per_value,
530 bits_per_value,
531 items_per_value: cum_dim,
532 layers,
533 };
534 }
535 _ => unreachable!(),
536 }
537 }
538 }
539
540 fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
541 DataBlock::FixedWidth(FixedWidthDataBlock {
542 bits_per_value: self.bits_per_item,
543 num_values,
544 data,
545 block_info: BlockInfo::new(),
546 })
547 }
548}
549
550impl BlockDecompressor for ValueDecompressor {
551 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
552 let block = self.buffer_to_block(data, num_values);
553 assert_eq!(block.num_values(), num_values);
554 Ok(block)
555 }
556}
557
558impl MiniBlockDecompressor for ValueDecompressor {
559 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
560 let num_items = num_values * self.items_per_value;
561 let mut buffer_iter = data.into_iter().rev();
562
563 let data_buf = buffer_iter.next().unwrap();
565 let items = self.buffer_to_block(data_buf, num_items);
566 let mut lists = items;
567
568 for layer in self.layers.iter().rev() {
569 if layer.has_validity {
570 let validity_buf = buffer_iter.next().unwrap();
571 lists = DataBlock::Nullable(NullableDataBlock {
572 data: Box::new(lists),
573 nulls: validity_buf,
574 block_info: BlockInfo::default(),
575 });
576 }
577 lists = DataBlock::FixedSizeList(FixedSizeListBlock {
578 child: Box::new(lists),
579 dimension: layer.dimension,
580 })
581 }
582
583 assert_eq!(lists.num_values(), num_values);
584 Ok(lists)
585 }
586}
587
588struct FslDecompressorValidityBuilder {
589 buffer: BooleanBufferBuilder,
590 bits_per_row: usize,
591 bytes_per_row: usize,
592}
593
594impl ValueDecompressor {
596 fn has_validity(&self) -> bool {
597 self.layers.iter().any(|layer| layer.has_validity)
598 }
599
600 fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
602 let mut cum_dim = 1;
603 for layer in &self.layers {
604 cum_dim *= layer.dimension;
605 }
606 debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
607 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
608 bits_per_value: self.bits_per_item,
609 num_values: num_rows * cum_dim,
610 data: data.data,
611 block_info: BlockInfo::new(),
612 });
613 for layer in self.layers.iter().rev() {
614 block = DataBlock::FixedSizeList(FixedSizeListBlock {
615 child: Box::new(block),
616 dimension: layer.dimension,
617 });
618 }
619 debug_assert_eq!(num_rows, block.num_values());
620 block
621 }
622
623 fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
625 assert_eq!(self.bits_per_item % 8, 0);
627 let bytes_per_item = self.bits_per_item / 8;
628 let mut buffer_builders = Vec::with_capacity(self.layers.len());
629 let mut cum_dim = 1;
630 let mut total_size_bytes = 0;
631 for layer in &self.layers {
633 cum_dim *= layer.dimension as usize;
634 if layer.has_validity {
635 let validity_size_bits = cum_dim;
636 let validity_size_bytes = validity_size_bits.div_ceil(8);
637 total_size_bytes += num_rows * validity_size_bytes;
638 buffer_builders.push(FslDecompressorValidityBuilder {
639 buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
640 bits_per_row: cum_dim,
641 bytes_per_row: validity_size_bytes,
642 })
643 }
644 }
645 let num_items = num_rows * cum_dim;
646 let data_size = num_items * bytes_per_item as usize;
647 total_size_bytes += data_size;
648 let mut data_buffer = Vec::with_capacity(data_size);
649
650 assert_eq!(data.data.len(), total_size_bytes);
651
652 let bytes_per_value = bytes_per_item as usize;
653 let data_bytes_per_row = bytes_per_value * cum_dim;
654
655 let mut data_offset = 0;
657 while data_offset < total_size_bytes {
658 for builder in buffer_builders.iter_mut() {
659 let start = data_offset * 8;
660 let end = start + builder.bits_per_row;
661 builder.buffer.append_packed_range(start..end, &data.data);
662 data_offset += builder.bytes_per_row;
663 }
664 let end = data_offset + data_bytes_per_row;
665 data_buffer.extend_from_slice(&data.data[data_offset..end]);
666 data_offset += data_bytes_per_row;
667 }
668
669 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
671 bits_per_value: self.bits_per_value,
672 num_values: num_items as u64,
673 data: LanceBuffer::Owned(data_buffer),
674 block_info: BlockInfo::new(),
675 });
676
677 let mut validity_bufs = buffer_builders
678 .into_iter()
679 .rev()
680 .map(|mut b| LanceBuffer::Borrowed(b.buffer.finish().into_inner()));
681 for layer in self.layers.iter().rev() {
682 if layer.has_validity {
683 let nullable = NullableDataBlock {
684 data: Box::new(block),
685 nulls: validity_bufs.next().unwrap(),
686 block_info: BlockInfo::new(),
687 };
688 block = DataBlock::Nullable(nullable);
689 }
690 block = DataBlock::FixedSizeList(FixedSizeListBlock {
691 child: Box::new(block),
692 dimension: layer.dimension,
693 });
694 }
695
696 assert_eq!(num_rows, block.num_values() as usize);
697
698 block
699 }
700}
701
702impl FixedPerValueDecompressor for ValueDecompressor {
703 fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
704 if self.has_validity() {
705 Ok(self.unzip_decompress(data, num_rows as usize))
706 } else {
707 Ok(self.simple_decompress(data, num_rows))
708 }
709 }
710
711 fn bits_per_value(&self) -> u64 {
712 self.bits_per_value
713 }
714}
715
716impl PerValueCompressor for ValueEncoder {
717 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
718 let (data, encoding) = match data {
719 DataBlock::FixedWidth(fixed_width) => {
720 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
721 (PerValueDataBlock::Fixed(fixed_width), encoding)
722 }
723 DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
724 _ => unimplemented!(
725 "Cannot compress block of type {} with ValueEncoder",
726 data.name()
727 ),
728 };
729 Ok((data, encoding))
730 }
731}
732
733#[cfg(test)]
735pub(crate) mod tests {
736 use std::{
737 collections::HashMap,
738 sync::{Arc, LazyLock},
739 };
740
741 use arrow_array::{
742 make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
743 };
744 use arrow_buffer::{BooleanBuffer, NullBuffer};
745 use arrow_schema::{DataType, Field, TimeUnit};
746 use lance_datagen::{array, gen, ArrayGeneratorExt, Dimension, RowCount};
747 use rstest::rstest;
748
749 use crate::{
750 compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
751 data::DataBlock,
752 encodings::{
753 logical::primitive::{
754 fullzip::{PerValueCompressor, PerValueDataBlock},
755 miniblock::MiniBlockCompressor,
756 },
757 physical::value::ValueDecompressor,
758 },
759 format::pb,
760 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
761 version::LanceFileVersion,
762 };
763
764 use super::ValueEncoder;
765
766 const PRIMITIVE_TYPES: &[DataType] = &[
767 DataType::Null,
768 DataType::FixedSizeBinary(2),
769 DataType::Date32,
770 DataType::Date64,
771 DataType::Int8,
772 DataType::Int16,
773 DataType::Int32,
774 DataType::Int64,
775 DataType::UInt8,
776 DataType::UInt16,
777 DataType::UInt32,
778 DataType::UInt64,
779 DataType::Float16,
780 DataType::Float32,
781 DataType::Float64,
782 DataType::Decimal128(10, 10),
783 DataType::Decimal256(10, 10),
784 DataType::Timestamp(TimeUnit::Nanosecond, None),
785 DataType::Time32(TimeUnit::Second),
786 DataType::Time64(TimeUnit::Nanosecond),
787 DataType::Duration(TimeUnit::Second),
788 ];
792
793 #[test_log::test(tokio::test)]
794 async fn test_simple_value() {
795 let items = Arc::new(Int32Array::from(vec![
796 Some(0),
797 None,
798 Some(2),
799 Some(3),
800 Some(4),
801 Some(5),
802 ]));
803
804 let test_cases = TestCases::default()
805 .with_range(0..3)
806 .with_range(0..2)
807 .with_range(1..3)
808 .with_indices(vec![0, 1, 2])
809 .with_indices(vec![1])
810 .with_indices(vec![2])
811 .with_file_version(LanceFileVersion::V2_1);
812
813 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
814 }
815
816 #[rstest]
817 #[test_log::test(tokio::test)]
818 async fn test_value_primitive(
819 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
820 ) {
821 for data_type in PRIMITIVE_TYPES {
822 log::info!("Testing encoding for {:?}", data_type);
823 let field = Field::new("", data_type.clone(), false);
824 check_round_trip_encoding_random(field, version).await;
825 }
826 }
827
828 static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
829 vec![DataType::FixedSizeList(
830 Arc::new(Field::new("", DataType::Int32, false)),
831 128,
832 )]
833 });
834
835 #[rstest]
836 #[test_log::test(tokio::test)]
837 async fn test_large_primitive(
838 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
839 ) {
840 for data_type in LARGE_TYPES.iter() {
841 log::info!("Testing encoding for {:?}", data_type);
842 let field = Field::new("", data_type.clone(), false);
843 check_round_trip_encoding_random(field, version).await;
844 }
845 }
846
847 #[test_log::test(tokio::test)]
848 async fn test_decimal128_dictionary_encoding() {
849 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
850 let decimals: Vec<i32> = (0..100).collect();
851 let repeated_strings: Vec<_> = decimals
852 .iter()
853 .cycle()
854 .take(decimals.len() * 10000)
855 .map(|&v| Some(v as i128))
856 .collect();
857 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
858 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
859 }
860
861 #[test_log::test(tokio::test)]
862 async fn test_miniblock_stress() {
863 let data1 = (0..100)
867 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
868 .collect::<Vec<_>>();
869
870 let data2 = (0..100)
872 .map(|_| {
873 Arc::new(Int32Array::from_iter((0..100).map(|i| {
874 if i % 2 == 0 {
875 Some(i)
876 } else {
877 None
878 }
879 }))) as Arc<dyn Array>
880 })
881 .collect::<Vec<_>>();
882
883 let _data3 = (0..100)
886 .map(|chunk_idx| {
887 Arc::new(Int32Array::from_iter((0..100).map(|i| {
888 if chunk_idx < 50 {
889 None
890 } else {
891 Some(i)
892 }
893 }))) as Arc<dyn Array>
894 })
895 .collect::<Vec<_>>();
896
897 for data in [data1, data2 ] {
898 for batch_size in [10, 100, 1500, 15000] {
899 let test_cases = TestCases::default()
901 .with_page_sizes(vec![1000, 2000, 3000, 60000])
902 .with_batch_size(batch_size)
903 .with_file_version(LanceFileVersion::V2_1);
904
905 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
906 }
907 }
908 }
909
910 fn create_simple_fsl() -> FixedSizeListArray {
911 let items = Arc::new(Int32Array::from(vec![
913 Some(0),
914 Some(1),
915 Some(2),
916 Some(3),
917 None,
918 None,
919 None,
920 None,
921 Some(8),
922 Some(9),
923 None,
924 Some(11),
925 ]));
926 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
927 let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
928 let inner_list = Arc::new(FixedSizeListArray::new(
929 items_field.clone(),
930 2,
931 items,
932 Some(NullBuffer::new(inner_list_nulls)),
933 ));
934 let inner_list_field = Arc::new(Field::new(
935 "item",
936 DataType::FixedSizeList(items_field, 2),
937 true,
938 ));
939 FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
940 }
941
942 #[test]
943 fn test_fsl_value_compression_miniblock() {
944 let sample_list = create_simple_fsl();
945
946 let starting_data = DataBlock::from_array(sample_list.clone());
947
948 let encoder = ValueEncoder::default();
949 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
950
951 assert_eq!(data.num_values, 3);
952 assert_eq!(data.data.len(), 3);
953 assert_eq!(data.chunks.len(), 1);
954 assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
955 assert_eq!(data.chunks[0].log_num_values, 0);
956
957 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
958 compression.array_encoding.unwrap()
959 else {
960 panic!()
961 };
962
963 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
964
965 let decompressed =
966 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
967
968 let decompressed = make_array(
969 decompressed
970 .into_arrow(sample_list.data_type().clone(), true)
971 .unwrap(),
972 );
973
974 assert_eq!(decompressed.as_ref(), &sample_list);
975 }
976
977 #[test]
978 fn test_fsl_value_compression_per_value() {
979 let sample_list = create_simple_fsl();
980
981 let starting_data = DataBlock::from_array(sample_list.clone());
982
983 let encoder = ValueEncoder::default();
984 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
985
986 let PerValueDataBlock::Fixed(data) = data else {
987 panic!()
988 };
989
990 assert_eq!(data.bits_per_value, 144);
991 assert_eq!(data.num_values, 3);
992 assert_eq!(data.data.len(), 18 * 3);
993
994 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
995 compression.array_encoding.unwrap()
996 else {
997 panic!()
998 };
999
1000 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1001
1002 let num_values = data.num_values;
1003 let decompressed =
1004 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1005
1006 let decompressed = make_array(
1007 decompressed
1008 .into_arrow(sample_list.data_type().clone(), true)
1009 .unwrap(),
1010 );
1011
1012 assert_eq!(decompressed.as_ref(), &sample_list);
1013 }
1014
1015 fn create_random_fsl() -> Arc<dyn Array> {
1016 let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1018 let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1019 let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1020 let list_three = array::cycle_vec(list_two, Dimension::from(2));
1021
1022 let batch = gen()
1024 .anon_col(list_three)
1025 .into_batch_rows(RowCount::from(8 * 1024))
1026 .unwrap();
1027 batch.column(0).clone()
1028 }
1029
1030 #[test]
1031 fn fsl_value_miniblock_stress() {
1032 let sample_array = create_random_fsl();
1033
1034 let starting_data =
1035 DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1036
1037 let encoder = ValueEncoder::default();
1038 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1039
1040 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1041 compression.array_encoding.unwrap()
1042 else {
1043 panic!()
1044 };
1045
1046 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1047
1048 let decompressed =
1049 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1050
1051 let decompressed = make_array(
1052 decompressed
1053 .into_arrow(sample_array.data_type().clone(), true)
1054 .unwrap(),
1055 );
1056
1057 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1058 }
1059
1060 #[test]
1061 fn fsl_value_per_value_stress() {
1062 let sample_array = create_random_fsl();
1063
1064 let starting_data =
1065 DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1066
1067 let encoder = ValueEncoder::default();
1068 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1069
1070 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1071 compression.array_encoding.unwrap()
1072 else {
1073 panic!()
1074 };
1075
1076 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1077
1078 let PerValueDataBlock::Fixed(data) = data else {
1079 panic!()
1080 };
1081
1082 let num_values = data.num_values;
1083 let decompressed =
1084 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1085
1086 let decompressed = make_array(
1087 decompressed
1088 .into_arrow(sample_array.data_type().clone(), true)
1089 .unwrap(),
1090 );
1091
1092 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1093 }
1094}