1use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use arrow_schema::DataType;
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9use snafu::location;
10use std::ops::Range;
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::LanceBuffer;
14use crate::data::{
15 BlockInfo, ConstantDataBlock, DataBlock, FixedSizeListBlock, FixedWidthDataBlock,
16 NullableDataBlock,
17};
18use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor};
19use crate::encoder::{
20 BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
21 PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES,
22};
23use crate::format::pb::{self, ArrayEncoding};
24use crate::format::ProtobufUtils;
25use crate::{
26 decoder::{PageScheduler, PrimitivePageDecoder},
27 encoder::{ArrayEncoder, EncodedArray},
28 EncodingsIo,
29};
30
31use lance_core::{Error, Result};
32
33use super::block_compress::{CompressionConfig, CompressionScheme, GeneralBufferCompressor};
34
35#[derive(Debug, Clone, Copy)]
37pub struct ValuePageScheduler {
38 bytes_per_value: u64,
41 buffer_offset: u64,
42 buffer_size: u64,
43 compression_config: CompressionConfig,
44}
45
46impl ValuePageScheduler {
47 pub fn new(
48 bytes_per_value: u64,
49 buffer_offset: u64,
50 buffer_size: u64,
51 compression_config: CompressionConfig,
52 ) -> Self {
53 Self {
54 bytes_per_value,
55 buffer_offset,
56 buffer_size,
57 compression_config,
58 }
59 }
60}
61
62impl PageScheduler for ValuePageScheduler {
63 fn schedule_ranges(
64 &self,
65 ranges: &[std::ops::Range<u64>],
66 scheduler: &Arc<dyn EncodingsIo>,
67 top_level_row: u64,
68 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
69 let (mut min, mut max) = (u64::MAX, 0);
70 let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
71 ranges
72 .iter()
73 .map(|range| {
74 let start = self.buffer_offset + (range.start * self.bytes_per_value);
75 let end = self.buffer_offset + (range.end * self.bytes_per_value);
76 min = min.min(start);
77 max = max.max(end);
78 start..end
79 })
80 .collect::<Vec<_>>()
81 } else {
82 min = self.buffer_offset;
83 max = self.buffer_offset + self.buffer_size;
84 vec![Range {
87 start: min,
88 end: max,
89 }]
90 };
91
92 trace!(
93 "Scheduling I/O for {} ranges spread across byte range {}..{}",
94 byte_ranges.len(),
95 min,
96 max
97 );
98 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
99 let bytes_per_value = self.bytes_per_value;
100
101 let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
102 ranges
103 .iter()
104 .map(|range| {
105 let start = (range.start * bytes_per_value) as usize;
106 let end = (range.end * bytes_per_value) as usize;
107 start..end
108 })
109 .collect::<Vec<_>>()
110 } else {
111 vec![]
112 };
113
114 let compression_config = self.compression_config;
115 async move {
116 let bytes = bytes.await?;
117
118 Ok(Box::new(ValuePageDecoder {
119 bytes_per_value,
120 data: bytes,
121 uncompressed_data: Arc::new(Mutex::new(None)),
122 uncompressed_range_offsets: range_offsets,
123 compression_config,
124 }) as Box<dyn PrimitivePageDecoder>)
125 }
126 .boxed()
127 }
128}
129
130struct ValuePageDecoder {
131 bytes_per_value: u64,
132 data: Vec<Bytes>,
133 uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
134 uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
135 compression_config: CompressionConfig,
136}
137
138impl ValuePageDecoder {
139 fn decompress(&self) -> Result<Vec<Bytes>> {
140 let bytes_u8: Vec<u8> = self.data[0].to_vec();
142 let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config);
143 let mut uncompressed_bytes: Vec<u8> = Vec::new();
144 buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
145
146 let mut bytes_in_ranges: Vec<Bytes> =
147 Vec::with_capacity(self.uncompressed_range_offsets.len());
148 for range in &self.uncompressed_range_offsets {
149 let start = range.start;
150 let end = range.end;
151 bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
152 }
153 Ok(bytes_in_ranges)
154 }
155
156 fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
157 let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
158 if uncompressed_bytes.is_none() {
159 *uncompressed_bytes = Some(self.decompress()?);
160 }
161 Ok(Arc::clone(&self.uncompressed_data))
162 }
163
164 fn is_compressed(&self) -> bool {
165 !self.uncompressed_range_offsets.is_empty()
166 }
167
168 fn decode_buffers<'a>(
169 &'a self,
170 buffers: impl IntoIterator<Item = &'a Bytes>,
171 mut bytes_to_skip: u64,
172 mut bytes_to_take: u64,
173 ) -> LanceBuffer {
174 let mut dest: Option<Vec<u8>> = None;
175
176 for buf in buffers.into_iter() {
177 let buf_len = buf.len() as u64;
178 if bytes_to_skip > buf_len {
179 bytes_to_skip -= buf_len;
180 } else {
181 let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
182 bytes_to_take -= bytes_to_take_here;
183 let start = bytes_to_skip as usize;
184 let end = start + bytes_to_take_here as usize;
185 let slice = buf.slice(start..end);
186 match (&mut dest, bytes_to_take) {
187 (None, 0) => {
188 return LanceBuffer::from_bytes(slice, self.bytes_per_value);
191 }
192 (None, _) => {
193 dest.replace(Vec::with_capacity(bytes_to_take as usize));
194 }
195 _ => {}
196 }
197 dest.as_mut().unwrap().extend_from_slice(&slice);
198 bytes_to_skip = 0;
199 }
200 }
201 LanceBuffer::from(dest.unwrap_or_default())
202 }
203}
204
205impl PrimitivePageDecoder for ValuePageDecoder {
206 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
207 let bytes_to_skip = rows_to_skip * self.bytes_per_value;
208 let bytes_to_take = num_rows * self.bytes_per_value;
209
210 let data_buffer = if self.is_compressed() {
211 let decoding_data = self.get_uncompressed_bytes()?;
212 let buffers = decoding_data.lock().unwrap();
213 self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
214 } else {
215 self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
216 };
217 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
218 bits_per_value: self.bytes_per_value * 8,
219 data: data_buffer,
220 num_values: num_rows,
221 block_info: BlockInfo::new(),
222 }))
223 }
224}
225
226#[derive(Debug, Default)]
228pub struct ValueEncoder {}
229
230impl ValueEncoder {
231 fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
233 let mut size_bytes = 2 * bytes_per_word;
234 let (mut log_num_vals, mut num_vals) = match values_per_word {
235 1 => (1, 2),
236 8 => (3, 8),
237 _ => unreachable!(),
238 };
239
240 assert!(size_bytes < MAX_MINIBLOCK_BYTES);
242
243 while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
244 log_num_vals += 1;
245 size_bytes *= 2;
246 num_vals *= 2;
247 }
248
249 (log_num_vals, num_vals)
250 }
251
252 fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
253 let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
258 (data.bits_per_value / 8, 1)
259 } else {
260 (data.bits_per_value, 8)
261 };
262
263 let (log_vals_per_chunk, vals_per_chunk) =
265 Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
266 let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
267 debug_assert_eq!(vals_per_chunk % values_per_word, 0);
268 let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
269 let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
270
271 let data_buffer = data.data;
272
273 let mut row_offset = 0;
274 let mut chunks = Vec::with_capacity(num_chunks);
275
276 let mut bytes_counter = 0;
277 loop {
278 if row_offset + vals_per_chunk <= data.num_values {
279 chunks.push(MiniBlockChunk {
280 log_num_values: log_vals_per_chunk as u8,
281 buffer_sizes: vec![bytes_per_chunk],
282 });
283 row_offset += vals_per_chunk;
284 bytes_counter += bytes_per_chunk as u64;
285 } else {
286 let num_bytes = data_buffer.len() as u64 - bytes_counter;
288 let num_bytes = u16::try_from(num_bytes).unwrap();
289 chunks.push(MiniBlockChunk {
290 log_num_values: 0,
291 buffer_sizes: vec![num_bytes],
292 });
293 break;
294 }
295 }
296
297 MiniBlockCompressed {
298 chunks,
299 data: vec![data_buffer],
300 num_values: data.num_values,
301 }
302 }
303}
304
305#[derive(Debug)]
306struct MiniblockFslLayer {
307 validity: Option<LanceBuffer>,
308 dimension: u64,
309}
310
311impl ValueEncoder {
328 fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> ArrayEncoding {
329 let mut encoding = ProtobufUtils::flat_encoding(bits_per_value, 0, None);
330 for layer in layers.iter().rev() {
331 let has_validity = layer.validity.is_some();
332 let dimension = layer.dimension;
333 encoding = ProtobufUtils::fsl_encoding(dimension, encoding, has_validity);
334 }
335 encoding
336 }
337
338 fn extract_fsl_chunk(
339 data: &FixedWidthDataBlock,
340 layers: &[MiniblockFslLayer],
341 row_offset: usize,
342 num_rows: usize,
343 validity_buffers: &mut [Vec<u8>],
344 ) -> Vec<u16> {
345 let mut row_offset = row_offset;
346 let mut num_values = num_rows;
347 let mut buffer_counter = 0;
348 let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
349 for layer in layers {
350 row_offset *= layer.dimension as usize;
351 num_values *= layer.dimension as usize;
352 if let Some(validity) = &layer.validity {
353 let validity_slice = validity
354 .try_clone()
355 .unwrap()
356 .bit_slice_le_with_length(row_offset, num_values);
357 validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
358 buffer_sizes.push(validity_slice.len() as u16);
359 buffer_counter += 1;
360 }
361 }
362
363 let bits_in_chunk = data.bits_per_value * num_values as u64;
364 let bytes_in_chunk = bits_in_chunk.div_ceil(8);
365 let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
366 buffer_sizes.push(bytes_in_chunk);
367
368 buffer_sizes
369 }
370
371 fn chunk_fsl(
372 data: FixedWidthDataBlock,
373 layers: Vec<MiniblockFslLayer>,
374 num_rows: u64,
375 ) -> (MiniBlockCompressed, ArrayEncoding) {
376 let mut ceil_bytes_validity = 0;
378 let mut cum_dim = 1;
379 let mut num_validity_buffers = 0;
380 for layer in &layers {
381 cum_dim *= layer.dimension;
382 if layer.validity.is_some() {
383 ceil_bytes_validity += cum_dim.div_ceil(8);
384 num_validity_buffers += 1;
385 }
386 }
387 let cum_bits_per_value = data.bits_per_value * cum_dim;
389 let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
390 (cum_bits_per_value / 8, 1)
391 } else {
392 (cum_bits_per_value, 8)
393 };
394 let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
395 let (log_rows_per_chunk, rows_per_chunk) =
396 Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
397
398 let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
399
400 let mut chunks = Vec::with_capacity(num_chunks);
402 let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
403 cum_dim = 1;
404 for layer in &layers {
405 cum_dim *= layer.dimension;
406 if let Some(validity) = &layer.validity {
407 let layer_bytes_validity = cum_dim.div_ceil(8);
408 let validity_with_padding =
409 layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
410 debug_assert!(validity_with_padding >= validity.len());
411 validity_buffers.push(Vec::with_capacity(
412 layer_bytes_validity as usize * num_chunks,
413 ));
414 }
415 }
416
417 let mut row_offset = 0;
419 while row_offset + rows_per_chunk <= num_rows {
420 let buffer_sizes = Self::extract_fsl_chunk(
421 &data,
422 &layers,
423 row_offset as usize,
424 rows_per_chunk as usize,
425 &mut validity_buffers,
426 );
427 row_offset += rows_per_chunk;
428 chunks.push(MiniBlockChunk {
429 log_num_values: log_rows_per_chunk as u8,
430 buffer_sizes,
431 })
432 }
433 let rows_in_chunk = num_rows - row_offset;
434 if rows_in_chunk > 0 {
435 let buffer_sizes = Self::extract_fsl_chunk(
436 &data,
437 &layers,
438 row_offset as usize,
439 rows_in_chunk as usize,
440 &mut validity_buffers,
441 );
442 chunks.push(MiniBlockChunk {
443 log_num_values: 0,
444 buffer_sizes,
445 });
446 }
447
448 let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
449 let buffers = validity_buffers
451 .into_iter()
452 .map(LanceBuffer::Owned)
453 .chain(std::iter::once(data.data))
454 .collect::<Vec<_>>();
455
456 (
457 MiniBlockCompressed {
458 chunks,
459 data: buffers,
460 num_values: num_rows,
461 },
462 encoding,
463 )
464 }
465
466 fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, ArrayEncoding) {
467 let num_rows = data.num_values();
468 let fsl = data.as_fixed_size_list().unwrap();
469 let mut layers = Vec::new();
470 let mut child = *fsl.child;
471 let mut cur_layer = MiniblockFslLayer {
472 validity: None,
473 dimension: fsl.dimension,
474 };
475 loop {
476 if let DataBlock::Nullable(nullable) = child {
477 cur_layer.validity = Some(nullable.nulls);
478 child = *nullable.data;
479 }
480 match child {
481 DataBlock::FixedSizeList(inner) => {
482 layers.push(cur_layer);
483 cur_layer = MiniblockFslLayer {
484 validity: None,
485 dimension: inner.dimension,
486 };
487 child = *inner.child;
488 }
489 DataBlock::FixedWidth(inner) => {
490 layers.push(cur_layer);
491 return Self::chunk_fsl(inner, layers, num_rows);
492 }
493 _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
494 }
495 }
496 }
497}
498
499struct PerValueFslValidityIter {
500 buffer: LanceBuffer,
501 bits_per_row: usize,
502 offset: usize,
503}
504
505impl ValueEncoder {
510 fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> ArrayEncoding {
511 let mut inner = fsl.child.as_ref();
512 let mut has_validity = false;
513 inner = match inner {
514 DataBlock::Nullable(nullable) => {
515 has_validity = true;
516 nullable.data.as_ref()
517 }
518 _ => inner,
519 };
520 let inner_encoding = match inner {
521 DataBlock::FixedWidth(fixed_width) => {
522 ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
523 }
524 DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
525 _ => unreachable!("Unexpected data block type in value encoder's fsl_to_encoding"),
526 };
527 ProtobufUtils::fsl_encoding(fsl.dimension, inner_encoding, has_validity)
528 }
529
530 fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
531 let encoding = Self::fsl_to_encoding(&fsl);
533 let num_values = fsl.num_values();
534 let mut child = *fsl.child;
535 let mut cum_dim = 1;
536 loop {
537 cum_dim *= fsl.dimension;
538 match child {
539 DataBlock::Nullable(nullable) => {
540 child = *nullable.data;
541 }
542 DataBlock::FixedSizeList(inner) => {
543 child = *inner.child;
544 }
545 DataBlock::FixedWidth(inner) => {
546 let data = FixedWidthDataBlock {
547 bits_per_value: inner.bits_per_value * cum_dim,
548 num_values,
549 data: inner.data,
550 block_info: BlockInfo::new(),
551 };
552 return (PerValueDataBlock::Fixed(data), encoding);
553 }
554 _ => unreachable!(
555 "Unexpected data block type in value encoder's simple_per_value_fsl"
556 ),
557 }
558 }
559 }
560
561 fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
562 let encoding = Self::fsl_to_encoding(&fsl);
564 let num_values = fsl.num_values();
565 let mut bytes_per_row = 0;
566 let mut cum_dim = 1;
567 let mut current = fsl;
568 let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
569 let data_bytes_per_row: usize;
570 let data_buffer: LanceBuffer;
571 loop {
572 cum_dim *= current.dimension;
573 let mut child = *current.child;
574 if let DataBlock::Nullable(nullable) = child {
575 bytes_per_row += cum_dim.div_ceil(8) as usize;
577 validity_iters.push(PerValueFslValidityIter {
578 buffer: nullable.nulls,
579 bits_per_row: cum_dim as usize,
580 offset: 0,
581 });
582 child = *nullable.data;
583 };
584 match child {
585 DataBlock::FixedSizeList(inner) => {
586 current = inner;
587 }
588 DataBlock::FixedWidth(fixed_width) => {
589 data_bytes_per_row =
590 (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
591 bytes_per_row += data_bytes_per_row;
592 data_buffer = fixed_width.data;
593 break;
594 }
595 _ => unreachable!(
596 "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
597 child
598 ),
599 }
600 }
601
602 let bytes_needed = bytes_per_row * num_values as usize;
603 let mut zipped = Vec::with_capacity(bytes_needed);
604 let data_slice = &data_buffer;
605 for i in 0..num_values as usize {
607 for validity in validity_iters.iter_mut() {
608 let validity_slice = validity
609 .buffer
610 .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
611 zipped.extend_from_slice(&validity_slice);
612 validity.offset += validity.bits_per_row;
613 }
614 let start = i * data_bytes_per_row;
615 let end = start + data_bytes_per_row;
616 zipped.extend_from_slice(&data_slice[start..end]);
617 }
618
619 let zipped = LanceBuffer::Owned(zipped);
620 let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
621 bits_per_value: bytes_per_row as u64 * 8,
622 num_values,
623 data: zipped,
624 block_info: BlockInfo::new(),
625 });
626 (data, encoding)
627 }
628
629 fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
630 if !fsl.child.is_nullable() {
631 Self::simple_per_value_fsl(fsl)
632 } else {
633 Self::nullable_per_value_fsl(fsl)
634 }
635 }
636}
637
638impl BlockCompressor for ValueEncoder {
639 fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
640 let data = match data {
641 DataBlock::FixedWidth(fixed_width) => fixed_width.data,
642 _ => unimplemented!(
643 "Cannot compress block of type {} with ValueEncoder",
644 data.name()
645 ),
646 };
647 Ok(data)
648 }
649}
650
651impl ArrayEncoder for ValueEncoder {
652 fn encode(
653 &self,
654 data: DataBlock,
655 _data_type: &DataType,
656 buffer_index: &mut u32,
657 ) -> Result<EncodedArray> {
658 let index = *buffer_index;
659 *buffer_index += 1;
660
661 let encoding = match &data {
662 DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
663 fixed_width.bits_per_value,
664 index,
665 None,
666 )),
667 _ => Err(Error::InvalidInput {
668 source: format!(
669 "Cannot encode a data block of type {} with ValueEncoder",
670 data.name()
671 )
672 .into(),
673 location: location!(),
674 }),
675 }?;
676 Ok(EncodedArray { data, encoding })
677 }
678}
679
680impl MiniBlockCompressor for ValueEncoder {
681 fn compress(
682 &self,
683 chunk: DataBlock,
684 ) -> Result<(
685 crate::encoder::MiniBlockCompressed,
686 crate::format::pb::ArrayEncoding,
687 )> {
688 match chunk {
689 DataBlock::FixedWidth(fixed_width) => {
690 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
691 Ok((Self::chunk_data(fixed_width), encoding))
692 }
693 DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
694 _ => Err(Error::InvalidInput {
695 source: format!(
696 "Cannot compress a data block of type {} with ValueEncoder",
697 chunk.name()
698 )
699 .into(),
700 location: location!(),
701 }),
702 }
703 }
704}
705
706#[derive(Debug)]
708pub struct ConstantDecompressor {
709 scalar: LanceBuffer,
710}
711
712impl ConstantDecompressor {
713 pub fn new(scalar: LanceBuffer) -> Self {
714 Self {
715 scalar: scalar.into_borrowed(),
716 }
717 }
718}
719
720impl BlockDecompressor for ConstantDecompressor {
721 fn decompress(&self, _data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
722 Ok(DataBlock::Constant(ConstantDataBlock {
723 data: self.scalar.try_clone().unwrap(),
724 num_values,
725 }))
726 }
727}
728
729#[derive(Debug)]
730struct ValueFslDesc {
731 dimension: u64,
732 has_validity: bool,
733}
734
735#[derive(Debug)]
738pub struct ValueDecompressor {
739 bits_per_item: u64,
741 bits_per_value: u64,
746 items_per_value: u64,
748 layers: Vec<ValueFslDesc>,
749}
750
751impl ValueDecompressor {
752 pub fn from_flat(description: &pb::Flat) -> Self {
753 Self {
754 bits_per_item: description.bits_per_value,
755 bits_per_value: description.bits_per_value,
756 items_per_value: 1,
757 layers: Vec::default(),
758 }
759 }
760
761 pub fn from_fsl(mut description: &pb::FixedSizeList) -> Self {
762 let mut layers = Vec::new();
763 let mut cum_dim = 1;
764 let mut bytes_per_value = 0;
765 loop {
766 layers.push(ValueFslDesc {
767 has_validity: description.has_validity,
768 dimension: description.dimension as u64,
769 });
770 cum_dim *= description.dimension as u64;
771 if description.has_validity {
772 bytes_per_value += cum_dim.div_ceil(8);
773 }
774 match description
775 .items
776 .as_ref()
777 .unwrap()
778 .array_encoding
779 .as_ref()
780 .unwrap()
781 {
782 pb::array_encoding::ArrayEncoding::FixedSizeList(inner) => {
783 description = inner;
784 }
785 pb::array_encoding::ArrayEncoding::Flat(flat) => {
786 let mut bits_per_value = bytes_per_value * 8;
787 bits_per_value += flat.bits_per_value * cum_dim;
788 return Self {
789 bits_per_item: flat.bits_per_value,
790 bits_per_value,
791 items_per_value: cum_dim,
792 layers,
793 };
794 }
795 _ => unreachable!(),
796 }
797 }
798 }
799
800 fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
801 DataBlock::FixedWidth(FixedWidthDataBlock {
802 bits_per_value: self.bits_per_item,
803 num_values,
804 data,
805 block_info: BlockInfo::new(),
806 })
807 }
808}
809
810impl BlockDecompressor for ValueDecompressor {
811 fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
812 let block = self.buffer_to_block(data, num_values);
813 assert_eq!(block.num_values(), num_values);
814 Ok(block)
815 }
816}
817
818impl MiniBlockDecompressor for ValueDecompressor {
819 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
820 let num_items = num_values * self.items_per_value;
821 let mut buffer_iter = data.into_iter().rev();
822
823 let data_buf = buffer_iter.next().unwrap();
825 let items = self.buffer_to_block(data_buf, num_items);
826 let mut lists = items;
827
828 for layer in self.layers.iter().rev() {
829 if layer.has_validity {
830 let validity_buf = buffer_iter.next().unwrap();
831 lists = DataBlock::Nullable(NullableDataBlock {
832 data: Box::new(lists),
833 nulls: validity_buf,
834 block_info: BlockInfo::default(),
835 });
836 }
837 lists = DataBlock::FixedSizeList(FixedSizeListBlock {
838 child: Box::new(lists),
839 dimension: layer.dimension,
840 })
841 }
842
843 assert_eq!(lists.num_values(), num_values);
844 Ok(lists)
845 }
846}
847
848struct FslDecompressorValidityBuilder {
849 buffer: BooleanBufferBuilder,
850 bits_per_row: usize,
851 bytes_per_row: usize,
852}
853
854impl ValueDecompressor {
856 fn has_validity(&self) -> bool {
857 self.layers.iter().any(|layer| layer.has_validity)
858 }
859
860 fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
862 let mut cum_dim = 1;
863 for layer in &self.layers {
864 cum_dim *= layer.dimension;
865 }
866 debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
867 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
868 bits_per_value: self.bits_per_item,
869 num_values: num_rows * cum_dim,
870 data: data.data,
871 block_info: BlockInfo::new(),
872 });
873 for layer in self.layers.iter().rev() {
874 block = DataBlock::FixedSizeList(FixedSizeListBlock {
875 child: Box::new(block),
876 dimension: layer.dimension,
877 });
878 }
879 debug_assert_eq!(num_rows, block.num_values());
880 block
881 }
882
883 fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
885 assert_eq!(self.bits_per_item % 8, 0);
887 let bytes_per_item = self.bits_per_item / 8;
888 let mut buffer_builders = Vec::with_capacity(self.layers.len());
889 let mut cum_dim = 1;
890 let mut total_size_bytes = 0;
891 for layer in &self.layers {
893 cum_dim *= layer.dimension as usize;
894 if layer.has_validity {
895 let validity_size_bits = cum_dim;
896 let validity_size_bytes = validity_size_bits.div_ceil(8);
897 total_size_bytes += num_rows * validity_size_bytes;
898 buffer_builders.push(FslDecompressorValidityBuilder {
899 buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
900 bits_per_row: cum_dim,
901 bytes_per_row: validity_size_bytes,
902 })
903 }
904 }
905 let num_items = num_rows * cum_dim;
906 let data_size = num_items * bytes_per_item as usize;
907 total_size_bytes += data_size;
908 let mut data_buffer = Vec::with_capacity(data_size);
909
910 assert_eq!(data.data.len(), total_size_bytes);
911
912 let bytes_per_value = bytes_per_item as usize;
913 let data_bytes_per_row = bytes_per_value * cum_dim;
914
915 let mut data_offset = 0;
917 while data_offset < total_size_bytes {
918 for builder in buffer_builders.iter_mut() {
919 let start = data_offset * 8;
920 let end = start + builder.bits_per_row;
921 builder.buffer.append_packed_range(start..end, &data.data);
922 data_offset += builder.bytes_per_row;
923 }
924 let end = data_offset + data_bytes_per_row;
925 data_buffer.extend_from_slice(&data.data[data_offset..end]);
926 data_offset += data_bytes_per_row;
927 }
928
929 let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
931 bits_per_value: self.bits_per_value,
932 num_values: num_items as u64,
933 data: LanceBuffer::Owned(data_buffer),
934 block_info: BlockInfo::new(),
935 });
936
937 let mut validity_bufs = buffer_builders
938 .into_iter()
939 .rev()
940 .map(|mut b| LanceBuffer::Borrowed(b.buffer.finish().into_inner()));
941 for layer in self.layers.iter().rev() {
942 if layer.has_validity {
943 let nullable = NullableDataBlock {
944 data: Box::new(block),
945 nulls: validity_bufs.next().unwrap(),
946 block_info: BlockInfo::new(),
947 };
948 block = DataBlock::Nullable(nullable);
949 }
950 block = DataBlock::FixedSizeList(FixedSizeListBlock {
951 child: Box::new(block),
952 dimension: layer.dimension,
953 });
954 }
955
956 assert_eq!(num_rows, block.num_values() as usize);
957
958 block
959 }
960}
961
962impl FixedPerValueDecompressor for ValueDecompressor {
963 fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
964 if self.has_validity() {
965 Ok(self.unzip_decompress(data, num_rows as usize))
966 } else {
967 Ok(self.simple_decompress(data, num_rows))
968 }
969 }
970
971 fn bits_per_value(&self) -> u64 {
972 self.bits_per_value
973 }
974}
975
976impl PerValueCompressor for ValueEncoder {
977 fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
978 let (data, encoding) = match data {
979 DataBlock::FixedWidth(fixed_width) => {
980 let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
981 (PerValueDataBlock::Fixed(fixed_width), encoding)
982 }
983 DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
984 _ => unimplemented!(
985 "Cannot compress block of type {} with ValueEncoder",
986 data.name()
987 ),
988 };
989 Ok((data, encoding))
990 }
991}
992
993#[cfg(test)]
995pub(crate) mod tests {
996 use std::{collections::HashMap, sync::Arc};
997
998 use arrow_array::{
999 make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
1000 };
1001 use arrow_buffer::{BooleanBuffer, NullBuffer};
1002 use arrow_schema::{DataType, Field, TimeUnit};
1003 use lance_datagen::{array, gen, ArrayGeneratorExt, Dimension, RowCount};
1004 use rstest::rstest;
1005
1006 use crate::{
1007 data::DataBlock,
1008 decoder::{FixedPerValueDecompressor, MiniBlockDecompressor},
1009 encoder::{MiniBlockCompressor, PerValueCompressor, PerValueDataBlock},
1010 encodings::physical::value::ValueDecompressor,
1011 format::pb,
1012 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1013 version::LanceFileVersion,
1014 };
1015
1016 use super::ValueEncoder;
1017
1018 const PRIMITIVE_TYPES: &[DataType] = &[
1019 DataType::Null,
1020 DataType::FixedSizeBinary(2),
1021 DataType::Date32,
1022 DataType::Date64,
1023 DataType::Int8,
1024 DataType::Int16,
1025 DataType::Int32,
1026 DataType::Int64,
1027 DataType::UInt8,
1028 DataType::UInt16,
1029 DataType::UInt32,
1030 DataType::UInt64,
1031 DataType::Float16,
1032 DataType::Float32,
1033 DataType::Float64,
1034 DataType::Decimal128(10, 10),
1035 DataType::Decimal256(10, 10),
1036 DataType::Timestamp(TimeUnit::Nanosecond, None),
1037 DataType::Time32(TimeUnit::Second),
1038 DataType::Time64(TimeUnit::Nanosecond),
1039 DataType::Duration(TimeUnit::Second),
1040 ];
1044
1045 #[test_log::test(tokio::test)]
1046 async fn test_simple_value() {
1047 let items = Arc::new(Int32Array::from(vec![
1048 Some(0),
1049 None,
1050 Some(2),
1051 Some(3),
1052 Some(4),
1053 Some(5),
1054 ]));
1055
1056 let test_cases = TestCases::default()
1057 .with_range(0..3)
1058 .with_range(0..2)
1059 .with_range(1..3)
1060 .with_indices(vec![0, 1, 2])
1061 .with_indices(vec![1])
1062 .with_indices(vec![2])
1063 .with_file_version(LanceFileVersion::V2_1);
1064
1065 check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
1066 }
1067
1068 #[rstest]
1069 #[test_log::test(tokio::test)]
1070 async fn test_value_primitive(
1071 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1072 ) {
1073 for data_type in PRIMITIVE_TYPES {
1074 log::info!("Testing encoding for {:?}", data_type);
1075 let field = Field::new("", data_type.clone(), false);
1076 check_round_trip_encoding_random(field, version).await;
1077 }
1078 }
1079
1080 lazy_static::lazy_static! {
1081 static ref LARGE_TYPES: Vec<DataType> = vec![DataType::FixedSizeList(
1082 Arc::new(Field::new("", DataType::Int32, false)),
1083 128,
1084 )];
1085 }
1086
1087 #[rstest]
1088 #[test_log::test(tokio::test)]
1089 async fn test_large_primitive(
1090 #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1091 ) {
1092 for data_type in LARGE_TYPES.iter() {
1093 log::info!("Testing encoding for {:?}", data_type);
1094 let field = Field::new("", data_type.clone(), false);
1095 check_round_trip_encoding_random(field, version).await;
1096 }
1097 }
1098
1099 #[test_log::test(tokio::test)]
1100 async fn test_decimal128_dictionary_encoding() {
1101 let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1102 let decimals: Vec<i32> = (0..100).collect();
1103 let repeated_strings: Vec<_> = decimals
1104 .iter()
1105 .cycle()
1106 .take(decimals.len() * 10000)
1107 .map(|&v| Some(v as i128))
1108 .collect();
1109 let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
1110 check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
1111 }
1112
1113 #[test_log::test(tokio::test)]
1114 async fn test_miniblock_stress() {
1115 let data1 = (0..100)
1119 .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
1120 .collect::<Vec<_>>();
1121
1122 let data2 = (0..100)
1124 .map(|_| {
1125 Arc::new(Int32Array::from_iter((0..100).map(|i| {
1126 if i % 2 == 0 {
1127 Some(i)
1128 } else {
1129 None
1130 }
1131 }))) as Arc<dyn Array>
1132 })
1133 .collect::<Vec<_>>();
1134
1135 let _data3 = (0..100)
1138 .map(|chunk_idx| {
1139 Arc::new(Int32Array::from_iter((0..100).map(|i| {
1140 if chunk_idx < 50 {
1141 None
1142 } else {
1143 Some(i)
1144 }
1145 }))) as Arc<dyn Array>
1146 })
1147 .collect::<Vec<_>>();
1148
1149 for data in [data1, data2 ] {
1150 for batch_size in [10, 100, 1500, 15000] {
1151 let test_cases = TestCases::default()
1153 .with_page_sizes(vec![1000, 2000, 3000, 60000])
1154 .with_batch_size(batch_size)
1155 .with_file_version(LanceFileVersion::V2_1);
1156
1157 check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
1158 }
1159 }
1160 }
1161
1162 fn create_simple_fsl() -> FixedSizeListArray {
1163 let items = Arc::new(Int32Array::from(vec![
1165 Some(0),
1166 Some(1),
1167 Some(2),
1168 Some(3),
1169 None,
1170 None,
1171 None,
1172 None,
1173 Some(8),
1174 Some(9),
1175 None,
1176 Some(11),
1177 ]));
1178 let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1179 let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1180 let inner_list = Arc::new(FixedSizeListArray::new(
1181 items_field.clone(),
1182 2,
1183 items,
1184 Some(NullBuffer::new(inner_list_nulls)),
1185 ));
1186 let inner_list_field = Arc::new(Field::new(
1187 "item",
1188 DataType::FixedSizeList(items_field, 2),
1189 true,
1190 ));
1191 FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
1192 }
1193
1194 #[test]
1195 fn test_fsl_value_compression_miniblock() {
1196 let sample_list = create_simple_fsl();
1197
1198 let starting_data = DataBlock::from_array(sample_list.clone());
1199
1200 let encoder = ValueEncoder::default();
1201 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1202
1203 assert_eq!(data.num_values, 3);
1204 assert_eq!(data.data.len(), 3);
1205 assert_eq!(data.chunks.len(), 1);
1206 assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
1207 assert_eq!(data.chunks[0].log_num_values, 0);
1208
1209 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1210 compression.array_encoding.unwrap()
1211 else {
1212 panic!()
1213 };
1214
1215 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1216
1217 let decompressed =
1218 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1219
1220 let decompressed = make_array(
1221 decompressed
1222 .into_arrow(sample_list.data_type().clone(), true)
1223 .unwrap(),
1224 );
1225
1226 assert_eq!(decompressed.as_ref(), &sample_list);
1227 }
1228
1229 #[test]
1230 fn test_fsl_value_compression_per_value() {
1231 let sample_list = create_simple_fsl();
1232
1233 let starting_data = DataBlock::from_array(sample_list.clone());
1234
1235 let encoder = ValueEncoder::default();
1236 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1237
1238 let PerValueDataBlock::Fixed(data) = data else {
1239 panic!()
1240 };
1241
1242 assert_eq!(data.bits_per_value, 144);
1243 assert_eq!(data.num_values, 3);
1244 assert_eq!(data.data.len(), 18 * 3);
1245
1246 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1247 compression.array_encoding.unwrap()
1248 else {
1249 panic!()
1250 };
1251
1252 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1253
1254 let num_values = data.num_values;
1255 let decompressed =
1256 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1257
1258 let decompressed = make_array(
1259 decompressed
1260 .into_arrow(sample_list.data_type().clone(), true)
1261 .unwrap(),
1262 );
1263
1264 assert_eq!(decompressed.as_ref(), &sample_list);
1265 }
1266
1267 fn create_random_fsl() -> Arc<dyn Array> {
1268 let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1270 let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1271 let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1272 let list_three = array::cycle_vec(list_two, Dimension::from(2));
1273
1274 let batch = gen()
1276 .anon_col(list_three)
1277 .into_batch_rows(RowCount::from(8 * 1024))
1278 .unwrap();
1279 batch.column(0).clone()
1280 }
1281
1282 #[test]
1283 fn fsl_value_miniblock_stress() {
1284 let sample_array = create_random_fsl();
1285
1286 let starting_data =
1287 DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1288
1289 let encoder = ValueEncoder::default();
1290 let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1291
1292 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1293 compression.array_encoding.unwrap()
1294 else {
1295 panic!()
1296 };
1297
1298 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1299
1300 let decompressed =
1301 MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1302
1303 let decompressed = make_array(
1304 decompressed
1305 .into_arrow(sample_array.data_type().clone(), true)
1306 .unwrap(),
1307 );
1308
1309 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1310 }
1311
1312 #[test]
1313 fn fsl_value_per_value_stress() {
1314 let sample_array = create_random_fsl();
1315
1316 let starting_data =
1317 DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1318
1319 let encoder = ValueEncoder::default();
1320 let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1321
1322 let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1323 compression.array_encoding.unwrap()
1324 else {
1325 panic!()
1326 };
1327
1328 let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1329
1330 let PerValueDataBlock::Fixed(data) = data else {
1331 panic!()
1332 };
1333
1334 let num_values = data.num_values;
1335 let decompressed =
1336 FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1337
1338 let decompressed = make_array(
1339 decompressed
1340 .into_arrow(sample_array.data_type().clone(), true)
1341 .unwrap(),
1342 );
1343
1344 assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1345 }
1346}