lance_encoding/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use snafu::location;
6
7use crate::buffer::LanceBuffer;
8use crate::compression::{
9    BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
10};
11use crate::data::{
12    BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
13};
14use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
15use crate::encodings::logical::primitive::miniblock::{
16    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
17    MAX_MINIBLOCK_VALUES,
18};
19use crate::format::pb::{self, ArrayEncoding};
20use crate::format::ProtobufUtils;
21
22use lance_core::{Error, Result};
23
24/// A compression strategy that writes fixed-width data as-is (no compression)
25#[derive(Debug, Default)]
26pub struct ValueEncoder {}
27
28impl ValueEncoder {
29    /// Use the largest chunk we can smaller than 4KiB
30    fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
31        let mut size_bytes = 2 * bytes_per_word;
32        let (mut log_num_vals, mut num_vals) = match values_per_word {
33            1 => (1, 2),
34            8 => (3, 8),
35            _ => unreachable!(),
36        };
37
38        // If the type is so wide that we can't even fit 2 values we shouldn't be here
39        assert!(size_bytes < MAX_MINIBLOCK_BYTES);
40
41        while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
42            log_num_vals += 1;
43            size_bytes *= 2;
44            num_vals *= 2;
45        }
46
47        (log_num_vals, num_vals)
48    }
49
50    fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
51        // Usually there are X bytes per value.  However, when working with boolean
52        // or FSL<boolean> we might have some number of bits per value that isn't
53        // divisible by 8.  In this case, to avoid chunking in the middle of a byte
54        // we calculate how many 8-value words we can fit in a chunk.
55        let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
56            (data.bits_per_value / 8, 1)
57        } else {
58            (data.bits_per_value, 8)
59        };
60
61        // Aim for 4KiB chunks
62        let (log_vals_per_chunk, vals_per_chunk) =
63            Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
64        let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
65        debug_assert_eq!(vals_per_chunk % values_per_word, 0);
66        let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
67        let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
68
69        let data_buffer = data.data;
70
71        let mut row_offset = 0;
72        let mut chunks = Vec::with_capacity(num_chunks);
73
74        let mut bytes_counter = 0;
75        loop {
76            if row_offset + vals_per_chunk <= data.num_values {
77                chunks.push(MiniBlockChunk {
78                    log_num_values: log_vals_per_chunk as u8,
79                    buffer_sizes: vec![bytes_per_chunk],
80                });
81                row_offset += vals_per_chunk;
82                bytes_counter += bytes_per_chunk as u64;
83            } else {
84                // Final chunk, special values
85                let num_bytes = data_buffer.len() as u64 - bytes_counter;
86                let num_bytes = u16::try_from(num_bytes).unwrap();
87                chunks.push(MiniBlockChunk {
88                    log_num_values: 0,
89                    buffer_sizes: vec![num_bytes],
90                });
91                break;
92            }
93        }
94
95        MiniBlockCompressed {
96            chunks,
97            data: vec![data_buffer],
98            num_values: data.num_values,
99        }
100    }
101}
102
103#[derive(Debug)]
104struct MiniblockFslLayer {
105    validity: Option<LanceBuffer>,
106    dimension: u64,
107}
108
109/// This impl deals with encoding FSL<FSL<...<FSL<FixedWidth>>>> data as a mini-block compressor.
110/// The tricky part of FSL data is that we want to include inner validity buffers (we don't want these
111/// to be part of the rep-def because that usually ends up being more expensive).
112///
113/// The resulting mini-block will, instead of having a single buffer, have X + 1 buffers where X is
114/// the number of FSL layers that contain validity.
115///
116/// In the simple case where there is no validity inside the FSL layers, all we are doing here is flattening
117/// the FSL layers into a single buffer.
118///
119/// Also: We don't allow a row to be broken across chunks.  This typically isn't too big of a deal since we
120/// are usually dealing with relatively small vectors if we are using mini-block.
121///
122/// Note: when we do have validity we have to make copies of the validity buffers because they are bit buffers
123/// and we need to bit slice them which requires copies or offsets.  Paying the price at write time to make
124/// the copies is better than paying the price at read time to do the bit slicing.
125impl ValueEncoder {
126    fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> ArrayEncoding {
127        let mut encoding = ProtobufUtils::flat_encoding(bits_per_value, 0, None);
128        for layer in layers.iter().rev() {
129            let has_validity = layer.validity.is_some();
130            let dimension = layer.dimension;
131            encoding = ProtobufUtils::fsl_encoding(dimension, encoding, has_validity);
132        }
133        encoding
134    }
135
136    fn extract_fsl_chunk(
137        data: &FixedWidthDataBlock,
138        layers: &[MiniblockFslLayer],
139        row_offset: usize,
140        num_rows: usize,
141        validity_buffers: &mut [Vec<u8>],
142    ) -> Vec<u16> {
143        let mut row_offset = row_offset;
144        let mut num_values = num_rows;
145        let mut buffer_counter = 0;
146        let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
147        for layer in layers {
148            row_offset *= layer.dimension as usize;
149            num_values *= layer.dimension as usize;
150            if let Some(validity) = &layer.validity {
151                let validity_slice = validity
152                    .try_clone()
153                    .unwrap()
154                    .bit_slice_le_with_length(row_offset, num_values);
155                validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
156                buffer_sizes.push(validity_slice.len() as u16);
157                buffer_counter += 1;
158            }
159        }
160
161        let bits_in_chunk = data.bits_per_value * num_values as u64;
162        let bytes_in_chunk = bits_in_chunk.div_ceil(8);
163        let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
164        buffer_sizes.push(bytes_in_chunk);
165
166        buffer_sizes
167    }
168
169    fn chunk_fsl(
170        data: FixedWidthDataBlock,
171        layers: Vec<MiniblockFslLayer>,
172        num_rows: u64,
173    ) -> (MiniBlockCompressed, ArrayEncoding) {
174        // Count size to calculate rows per chunk
175        let mut ceil_bytes_validity = 0;
176        let mut cum_dim = 1;
177        let mut num_validity_buffers = 0;
178        for layer in &layers {
179            cum_dim *= layer.dimension;
180            if layer.validity.is_some() {
181                ceil_bytes_validity += cum_dim.div_ceil(8);
182                num_validity_buffers += 1;
183            }
184        }
185        // It's an estimate because validity buffers may have some padding bits
186        let cum_bits_per_value = data.bits_per_value * cum_dim;
187        let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
188            (cum_bits_per_value / 8, 1)
189        } else {
190            (cum_bits_per_value, 8)
191        };
192        let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
193        let (log_rows_per_chunk, rows_per_chunk) =
194            Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
195
196        let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
197
198        // Allocate buffers for validity, these will be slightly bigger than the input validity buffers
199        let mut chunks = Vec::with_capacity(num_chunks);
200        let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
201        cum_dim = 1;
202        for layer in &layers {
203            cum_dim *= layer.dimension;
204            if let Some(validity) = &layer.validity {
205                let layer_bytes_validity = cum_dim.div_ceil(8);
206                let validity_with_padding =
207                    layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
208                debug_assert!(validity_with_padding >= validity.len());
209                validity_buffers.push(Vec::with_capacity(
210                    layer_bytes_validity as usize * num_chunks,
211                ));
212            }
213        }
214
215        // Now go through and extract validity buffers
216        let mut row_offset = 0;
217        while row_offset + rows_per_chunk <= num_rows {
218            let buffer_sizes = Self::extract_fsl_chunk(
219                &data,
220                &layers,
221                row_offset as usize,
222                rows_per_chunk as usize,
223                &mut validity_buffers,
224            );
225            row_offset += rows_per_chunk;
226            chunks.push(MiniBlockChunk {
227                log_num_values: log_rows_per_chunk as u8,
228                buffer_sizes,
229            })
230        }
231        let rows_in_chunk = num_rows - row_offset;
232        if rows_in_chunk > 0 {
233            let buffer_sizes = Self::extract_fsl_chunk(
234                &data,
235                &layers,
236                row_offset as usize,
237                rows_in_chunk as usize,
238                &mut validity_buffers,
239            );
240            chunks.push(MiniBlockChunk {
241                log_num_values: 0,
242                buffer_sizes,
243            });
244        }
245
246        let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
247        // Finally, add the data buffer
248        let buffers = validity_buffers
249            .into_iter()
250            .map(LanceBuffer::Owned)
251            .chain(std::iter::once(data.data))
252            .collect::<Vec<_>>();
253
254        (
255            MiniBlockCompressed {
256                chunks,
257                data: buffers,
258                num_values: num_rows,
259            },
260            encoding,
261        )
262    }
263
264    fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, ArrayEncoding) {
265        let num_rows = data.num_values();
266        let fsl = data.as_fixed_size_list().unwrap();
267        let mut layers = Vec::new();
268        let mut child = *fsl.child;
269        let mut cur_layer = MiniblockFslLayer {
270            validity: None,
271            dimension: fsl.dimension,
272        };
273        loop {
274            if let DataBlock::Nullable(nullable) = child {
275                cur_layer.validity = Some(nullable.nulls);
276                child = *nullable.data;
277            }
278            match child {
279                DataBlock::FixedSizeList(inner) => {
280                    layers.push(cur_layer);
281                    cur_layer = MiniblockFslLayer {
282                        validity: None,
283                        dimension: inner.dimension,
284                    };
285                    child = *inner.child;
286                }
287                DataBlock::FixedWidth(inner) => {
288                    layers.push(cur_layer);
289                    return Self::chunk_fsl(inner, layers, num_rows);
290                }
291                _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
292            }
293        }
294    }
295}
296
297struct PerValueFslValidityIter {
298    buffer: LanceBuffer,
299    bits_per_row: usize,
300    offset: usize,
301}
302
303/// In this section we deal with per-value encoding of FSL<FSL<...<FSL<FixedWidth>>>> data.
304///
305/// It's easier than mini-block.  All we need to do is flatten the FSL layers into a single buffer.
306/// This includes any validity buffers we encounter on the way.
307impl ValueEncoder {
308    fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> ArrayEncoding {
309        let mut inner = fsl.child.as_ref();
310        let mut has_validity = false;
311        inner = match inner {
312            DataBlock::Nullable(nullable) => {
313                has_validity = true;
314                nullable.data.as_ref()
315            }
316            _ => inner,
317        };
318        let inner_encoding = match inner {
319            DataBlock::FixedWidth(fixed_width) => {
320                ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
321            }
322            DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
323            _ => unreachable!("Unexpected data block type in value encoder's fsl_to_encoding"),
324        };
325        ProtobufUtils::fsl_encoding(fsl.dimension, inner_encoding, has_validity)
326    }
327
328    fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
329        // The simple case is zero-copy, we just return the flattened inner buffer
330        let encoding = Self::fsl_to_encoding(&fsl);
331        let num_values = fsl.num_values();
332        let mut child = *fsl.child;
333        let mut cum_dim = 1;
334        loop {
335            cum_dim *= fsl.dimension;
336            match child {
337                DataBlock::Nullable(nullable) => {
338                    child = *nullable.data;
339                }
340                DataBlock::FixedSizeList(inner) => {
341                    child = *inner.child;
342                }
343                DataBlock::FixedWidth(inner) => {
344                    let data = FixedWidthDataBlock {
345                        bits_per_value: inner.bits_per_value * cum_dim,
346                        num_values,
347                        data: inner.data,
348                        block_info: BlockInfo::new(),
349                    };
350                    return (PerValueDataBlock::Fixed(data), encoding);
351                }
352                _ => unreachable!(
353                    "Unexpected data block type in value encoder's simple_per_value_fsl"
354                ),
355            }
356        }
357    }
358
359    fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
360        // If there are nullable inner values then we need to zip the validity with the values
361        let encoding = Self::fsl_to_encoding(&fsl);
362        let num_values = fsl.num_values();
363        let mut bytes_per_row = 0;
364        let mut cum_dim = 1;
365        let mut current = fsl;
366        let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
367        let data_bytes_per_row: usize;
368        let data_buffer: LanceBuffer;
369        loop {
370            cum_dim *= current.dimension;
371            let mut child = *current.child;
372            if let DataBlock::Nullable(nullable) = child {
373                // Each item will need this many bytes of validity prepended to it
374                bytes_per_row += cum_dim.div_ceil(8) as usize;
375                validity_iters.push(PerValueFslValidityIter {
376                    buffer: nullable.nulls,
377                    bits_per_row: cum_dim as usize,
378                    offset: 0,
379                });
380                child = *nullable.data;
381            };
382            match child {
383                DataBlock::FixedSizeList(inner) => {
384                    current = inner;
385                }
386                DataBlock::FixedWidth(fixed_width) => {
387                    data_bytes_per_row =
388                        (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
389                    bytes_per_row += data_bytes_per_row;
390                    data_buffer = fixed_width.data;
391                    break;
392                }
393                _ => unreachable!(
394                    "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
395                    child
396                ),
397            }
398        }
399
400        let bytes_needed = bytes_per_row * num_values as usize;
401        let mut zipped = Vec::with_capacity(bytes_needed);
402        let data_slice = &data_buffer;
403        // Hopefully values are pretty large so we don't iterate this loop _too_ many times
404        for i in 0..num_values as usize {
405            for validity in validity_iters.iter_mut() {
406                let validity_slice = validity
407                    .buffer
408                    .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
409                zipped.extend_from_slice(&validity_slice);
410                validity.offset += validity.bits_per_row;
411            }
412            let start = i * data_bytes_per_row;
413            let end = start + data_bytes_per_row;
414            zipped.extend_from_slice(&data_slice[start..end]);
415        }
416
417        let zipped = LanceBuffer::Owned(zipped);
418        let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
419            bits_per_value: bytes_per_row as u64 * 8,
420            num_values,
421            data: zipped,
422            block_info: BlockInfo::new(),
423        });
424        (data, encoding)
425    }
426
427    fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
428        if !fsl.child.is_nullable() {
429            Self::simple_per_value_fsl(fsl)
430        } else {
431            Self::nullable_per_value_fsl(fsl)
432        }
433    }
434}
435
436impl BlockCompressor for ValueEncoder {
437    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
438        let data = match data {
439            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
440            _ => unimplemented!(
441                "Cannot compress block of type {} with ValueEncoder",
442                data.name()
443            ),
444        };
445        Ok(data)
446    }
447}
448
449impl MiniBlockCompressor for ValueEncoder {
450    fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
451        match chunk {
452            DataBlock::FixedWidth(fixed_width) => {
453                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
454                Ok((Self::chunk_data(fixed_width), encoding))
455            }
456            DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
457            _ => Err(Error::InvalidInput {
458                source: format!(
459                    "Cannot compress a data block of type {} with ValueEncoder",
460                    chunk.name()
461                )
462                .into(),
463                location: location!(),
464            }),
465        }
466    }
467}
468
469#[derive(Debug)]
470struct ValueFslDesc {
471    dimension: u64,
472    has_validity: bool,
473}
474
475/// A decompressor for fixed-width data that has
476/// been written, as-is, to disk in single contiguous array
477#[derive(Debug)]
478pub struct ValueDecompressor {
479    /// How many bits are in each inner-most item (e.g. FSL<Int32, 100> would be 32)
480    bits_per_item: u64,
481    /// How many bits are in each value (e.g. FSL<Int32, 100> would be 3200)
482    ///
483    /// This number is a little trickier to compute because we also have to include bytes
484    /// of any inner validity
485    bits_per_value: u64,
486    /// How many items are in each value (e.g. FSL<Int32, 100> would be 100)
487    items_per_value: u64,
488    layers: Vec<ValueFslDesc>,
489}
490
491impl ValueDecompressor {
492    pub fn from_flat(description: &pb::Flat) -> Self {
493        Self {
494            bits_per_item: description.bits_per_value,
495            bits_per_value: description.bits_per_value,
496            items_per_value: 1,
497            layers: Vec::default(),
498        }
499    }
500
501    pub fn from_fsl(mut description: &pb::FixedSizeList) -> Self {
502        let mut layers = Vec::new();
503        let mut cum_dim = 1;
504        let mut bytes_per_value = 0;
505        loop {
506            layers.push(ValueFslDesc {
507                has_validity: description.has_validity,
508                dimension: description.dimension as u64,
509            });
510            cum_dim *= description.dimension as u64;
511            if description.has_validity {
512                bytes_per_value += cum_dim.div_ceil(8);
513            }
514            match description
515                .items
516                .as_ref()
517                .unwrap()
518                .array_encoding
519                .as_ref()
520                .unwrap()
521            {
522                pb::array_encoding::ArrayEncoding::FixedSizeList(inner) => {
523                    description = inner;
524                }
525                pb::array_encoding::ArrayEncoding::Flat(flat) => {
526                    let mut bits_per_value = bytes_per_value * 8;
527                    bits_per_value += flat.bits_per_value * cum_dim;
528                    return Self {
529                        bits_per_item: flat.bits_per_value,
530                        bits_per_value,
531                        items_per_value: cum_dim,
532                        layers,
533                    };
534                }
535                _ => unreachable!(),
536            }
537        }
538    }
539
540    fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
541        DataBlock::FixedWidth(FixedWidthDataBlock {
542            bits_per_value: self.bits_per_item,
543            num_values,
544            data,
545            block_info: BlockInfo::new(),
546        })
547    }
548}
549
550impl BlockDecompressor for ValueDecompressor {
551    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
552        let block = self.buffer_to_block(data, num_values);
553        assert_eq!(block.num_values(), num_values);
554        Ok(block)
555    }
556}
557
558impl MiniBlockDecompressor for ValueDecompressor {
559    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
560        let num_items = num_values * self.items_per_value;
561        let mut buffer_iter = data.into_iter().rev();
562
563        // Always at least 1 buffer
564        let data_buf = buffer_iter.next().unwrap();
565        let items = self.buffer_to_block(data_buf, num_items);
566        let mut lists = items;
567
568        for layer in self.layers.iter().rev() {
569            if layer.has_validity {
570                let validity_buf = buffer_iter.next().unwrap();
571                lists = DataBlock::Nullable(NullableDataBlock {
572                    data: Box::new(lists),
573                    nulls: validity_buf,
574                    block_info: BlockInfo::default(),
575                });
576            }
577            lists = DataBlock::FixedSizeList(FixedSizeListBlock {
578                child: Box::new(lists),
579                dimension: layer.dimension,
580            })
581        }
582
583        assert_eq!(lists.num_values(), num_values);
584        Ok(lists)
585    }
586}
587
588struct FslDecompressorValidityBuilder {
589    buffer: BooleanBufferBuilder,
590    bits_per_row: usize,
591    bytes_per_row: usize,
592}
593
594// Helper methods for per-value decompression
595impl ValueDecompressor {
596    fn has_validity(&self) -> bool {
597        self.layers.iter().any(|layer| layer.has_validity)
598    }
599
600    // If there is no validity then decompression is zero-copy, we just need to restore any FSL layers
601    fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
602        let mut cum_dim = 1;
603        for layer in &self.layers {
604            cum_dim *= layer.dimension;
605        }
606        debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
607        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
608            bits_per_value: self.bits_per_item,
609            num_values: num_rows * cum_dim,
610            data: data.data,
611            block_info: BlockInfo::new(),
612        });
613        for layer in self.layers.iter().rev() {
614            block = DataBlock::FixedSizeList(FixedSizeListBlock {
615                child: Box::new(block),
616                dimension: layer.dimension,
617            });
618        }
619        debug_assert_eq!(num_rows, block.num_values());
620        block
621    }
622
623    // If there is validity then it has been zipped in with the values and we must unzip it
624    fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
625        // No support for full-zip on per-value encodings
626        assert_eq!(self.bits_per_item % 8, 0);
627        let bytes_per_item = self.bits_per_item / 8;
628        let mut buffer_builders = Vec::with_capacity(self.layers.len());
629        let mut cum_dim = 1;
630        let mut total_size_bytes = 0;
631        // First, go through the layers, setup our builders, allocate space
632        for layer in &self.layers {
633            cum_dim *= layer.dimension as usize;
634            if layer.has_validity {
635                let validity_size_bits = cum_dim;
636                let validity_size_bytes = validity_size_bits.div_ceil(8);
637                total_size_bytes += num_rows * validity_size_bytes;
638                buffer_builders.push(FslDecompressorValidityBuilder {
639                    buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
640                    bits_per_row: cum_dim,
641                    bytes_per_row: validity_size_bytes,
642                })
643            }
644        }
645        let num_items = num_rows * cum_dim;
646        let data_size = num_items * bytes_per_item as usize;
647        total_size_bytes += data_size;
648        let mut data_buffer = Vec::with_capacity(data_size);
649
650        assert_eq!(data.data.len(), total_size_bytes);
651
652        let bytes_per_value = bytes_per_item as usize;
653        let data_bytes_per_row = bytes_per_value * cum_dim;
654
655        // Next, unzip
656        let mut data_offset = 0;
657        while data_offset < total_size_bytes {
658            for builder in buffer_builders.iter_mut() {
659                let start = data_offset * 8;
660                let end = start + builder.bits_per_row;
661                builder.buffer.append_packed_range(start..end, &data.data);
662                data_offset += builder.bytes_per_row;
663            }
664            let end = data_offset + data_bytes_per_row;
665            data_buffer.extend_from_slice(&data.data[data_offset..end]);
666            data_offset += data_bytes_per_row;
667        }
668
669        // Finally, restore the structure
670        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
671            bits_per_value: self.bits_per_value,
672            num_values: num_items as u64,
673            data: LanceBuffer::Owned(data_buffer),
674            block_info: BlockInfo::new(),
675        });
676
677        let mut validity_bufs = buffer_builders
678            .into_iter()
679            .rev()
680            .map(|mut b| LanceBuffer::Borrowed(b.buffer.finish().into_inner()));
681        for layer in self.layers.iter().rev() {
682            if layer.has_validity {
683                let nullable = NullableDataBlock {
684                    data: Box::new(block),
685                    nulls: validity_bufs.next().unwrap(),
686                    block_info: BlockInfo::new(),
687                };
688                block = DataBlock::Nullable(nullable);
689            }
690            block = DataBlock::FixedSizeList(FixedSizeListBlock {
691                child: Box::new(block),
692                dimension: layer.dimension,
693            });
694        }
695
696        assert_eq!(num_rows, block.num_values() as usize);
697
698        block
699    }
700}
701
702impl FixedPerValueDecompressor for ValueDecompressor {
703    fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
704        if self.has_validity() {
705            Ok(self.unzip_decompress(data, num_rows as usize))
706        } else {
707            Ok(self.simple_decompress(data, num_rows))
708        }
709    }
710
711    fn bits_per_value(&self) -> u64 {
712        self.bits_per_value
713    }
714}
715
716impl PerValueCompressor for ValueEncoder {
717    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
718        let (data, encoding) = match data {
719            DataBlock::FixedWidth(fixed_width) => {
720                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
721                (PerValueDataBlock::Fixed(fixed_width), encoding)
722            }
723            DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
724            _ => unimplemented!(
725                "Cannot compress block of type {} with ValueEncoder",
726                data.name()
727            ),
728        };
729        Ok((data, encoding))
730    }
731}
732
733// public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list
734#[cfg(test)]
735pub(crate) mod tests {
736    use std::{
737        collections::HashMap,
738        sync::{Arc, LazyLock},
739    };
740
741    use arrow_array::{
742        make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
743    };
744    use arrow_buffer::{BooleanBuffer, NullBuffer};
745    use arrow_schema::{DataType, Field, TimeUnit};
746    use lance_datagen::{array, gen, ArrayGeneratorExt, Dimension, RowCount};
747    use rstest::rstest;
748
749    use crate::{
750        compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
751        data::DataBlock,
752        encodings::{
753            logical::primitive::{
754                fullzip::{PerValueCompressor, PerValueDataBlock},
755                miniblock::MiniBlockCompressor,
756            },
757            physical::value::ValueDecompressor,
758        },
759        format::pb,
760        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
761        version::LanceFileVersion,
762    };
763
764    use super::ValueEncoder;
765
766    const PRIMITIVE_TYPES: &[DataType] = &[
767        DataType::Null,
768        DataType::FixedSizeBinary(2),
769        DataType::Date32,
770        DataType::Date64,
771        DataType::Int8,
772        DataType::Int16,
773        DataType::Int32,
774        DataType::Int64,
775        DataType::UInt8,
776        DataType::UInt16,
777        DataType::UInt32,
778        DataType::UInt64,
779        DataType::Float16,
780        DataType::Float32,
781        DataType::Float64,
782        DataType::Decimal128(10, 10),
783        DataType::Decimal256(10, 10),
784        DataType::Timestamp(TimeUnit::Nanosecond, None),
785        DataType::Time32(TimeUnit::Second),
786        DataType::Time64(TimeUnit::Nanosecond),
787        DataType::Duration(TimeUnit::Second),
788        // The Interval type is supported by the reader but the writer works with Lance schema
789        // at the moment and Lance schema can't parse interval
790        // DataType::Interval(IntervalUnit::DayTime),
791    ];
792
793    #[test_log::test(tokio::test)]
794    async fn test_simple_value() {
795        let items = Arc::new(Int32Array::from(vec![
796            Some(0),
797            None,
798            Some(2),
799            Some(3),
800            Some(4),
801            Some(5),
802        ]));
803
804        let test_cases = TestCases::default()
805            .with_range(0..3)
806            .with_range(0..2)
807            .with_range(1..3)
808            .with_indices(vec![0, 1, 2])
809            .with_indices(vec![1])
810            .with_indices(vec![2])
811            .with_file_version(LanceFileVersion::V2_1);
812
813        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
814    }
815
816    #[rstest]
817    #[test_log::test(tokio::test)]
818    async fn test_value_primitive(
819        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
820    ) {
821        for data_type in PRIMITIVE_TYPES {
822            log::info!("Testing encoding for {:?}", data_type);
823            let field = Field::new("", data_type.clone(), false);
824            check_round_trip_encoding_random(field, version).await;
825        }
826    }
827
828    static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
829        vec![DataType::FixedSizeList(
830            Arc::new(Field::new("", DataType::Int32, false)),
831            128,
832        )]
833    });
834
835    #[rstest]
836    #[test_log::test(tokio::test)]
837    async fn test_large_primitive(
838        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
839    ) {
840        for data_type in LARGE_TYPES.iter() {
841            log::info!("Testing encoding for {:?}", data_type);
842            let field = Field::new("", data_type.clone(), false);
843            check_round_trip_encoding_random(field, version).await;
844        }
845    }
846
847    #[test_log::test(tokio::test)]
848    async fn test_decimal128_dictionary_encoding() {
849        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
850        let decimals: Vec<i32> = (0..100).collect();
851        let repeated_strings: Vec<_> = decimals
852            .iter()
853            .cycle()
854            .take(decimals.len() * 10000)
855            .map(|&v| Some(v as i128))
856            .collect();
857        let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
858        check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
859    }
860
861    #[test_log::test(tokio::test)]
862    async fn test_miniblock_stress() {
863        // Tests for strange page sizes and batch sizes and validity scenarios for miniblock
864
865        // 10K integers, 100 per array, all valid
866        let data1 = (0..100)
867            .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
868            .collect::<Vec<_>>();
869
870        // Same as above but with mixed validity
871        let data2 = (0..100)
872            .map(|_| {
873                Arc::new(Int32Array::from_iter((0..100).map(|i| {
874                    if i % 2 == 0 {
875                        Some(i)
876                    } else {
877                        None
878                    }
879                }))) as Arc<dyn Array>
880            })
881            .collect::<Vec<_>>();
882
883        // Same as above but with all null for first half then all valid
884        // TODO: Re-enable once the all-null path is complete
885        let _data3 = (0..100)
886            .map(|chunk_idx| {
887                Arc::new(Int32Array::from_iter((0..100).map(|i| {
888                    if chunk_idx < 50 {
889                        None
890                    } else {
891                        Some(i)
892                    }
893                }))) as Arc<dyn Array>
894            })
895            .collect::<Vec<_>>();
896
897        for data in [data1, data2 /*data3*/] {
898            for batch_size in [10, 100, 1500, 15000] {
899                // 40000 bytes of data
900                let test_cases = TestCases::default()
901                    .with_page_sizes(vec![1000, 2000, 3000, 60000])
902                    .with_batch_size(batch_size)
903                    .with_file_version(LanceFileVersion::V2_1);
904
905                check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
906            }
907        }
908    }
909
910    fn create_simple_fsl() -> FixedSizeListArray {
911        // [[0, 1], NULL], [NULL, NULL], [[8, 9], [NULL, 11]]
912        let items = Arc::new(Int32Array::from(vec![
913            Some(0),
914            Some(1),
915            Some(2),
916            Some(3),
917            None,
918            None,
919            None,
920            None,
921            Some(8),
922            Some(9),
923            None,
924            Some(11),
925        ]));
926        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
927        let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
928        let inner_list = Arc::new(FixedSizeListArray::new(
929            items_field.clone(),
930            2,
931            items,
932            Some(NullBuffer::new(inner_list_nulls)),
933        ));
934        let inner_list_field = Arc::new(Field::new(
935            "item",
936            DataType::FixedSizeList(items_field, 2),
937            true,
938        ));
939        FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
940    }
941
942    #[test]
943    fn test_fsl_value_compression_miniblock() {
944        let sample_list = create_simple_fsl();
945
946        let starting_data = DataBlock::from_array(sample_list.clone());
947
948        let encoder = ValueEncoder::default();
949        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
950
951        assert_eq!(data.num_values, 3);
952        assert_eq!(data.data.len(), 3);
953        assert_eq!(data.chunks.len(), 1);
954        assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
955        assert_eq!(data.chunks[0].log_num_values, 0);
956
957        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
958            compression.array_encoding.unwrap()
959        else {
960            panic!()
961        };
962
963        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
964
965        let decompressed =
966            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
967
968        let decompressed = make_array(
969            decompressed
970                .into_arrow(sample_list.data_type().clone(), true)
971                .unwrap(),
972        );
973
974        assert_eq!(decompressed.as_ref(), &sample_list);
975    }
976
977    #[test]
978    fn test_fsl_value_compression_per_value() {
979        let sample_list = create_simple_fsl();
980
981        let starting_data = DataBlock::from_array(sample_list.clone());
982
983        let encoder = ValueEncoder::default();
984        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
985
986        let PerValueDataBlock::Fixed(data) = data else {
987            panic!()
988        };
989
990        assert_eq!(data.bits_per_value, 144);
991        assert_eq!(data.num_values, 3);
992        assert_eq!(data.data.len(), 18 * 3);
993
994        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
995            compression.array_encoding.unwrap()
996        else {
997            panic!()
998        };
999
1000        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1001
1002        let num_values = data.num_values;
1003        let decompressed =
1004            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1005
1006        let decompressed = make_array(
1007            decompressed
1008                .into_arrow(sample_list.data_type().clone(), true)
1009                .unwrap(),
1010        );
1011
1012        assert_eq!(decompressed.as_ref(), &sample_list);
1013    }
1014
1015    fn create_random_fsl() -> Arc<dyn Array> {
1016        // Several levels of def and multiple pages
1017        let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1018        let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1019        let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1020        let list_three = array::cycle_vec(list_two, Dimension::from(2));
1021
1022        // Should be 256Ki rows ~ 1MiB of data
1023        let batch = gen()
1024            .anon_col(list_three)
1025            .into_batch_rows(RowCount::from(8 * 1024))
1026            .unwrap();
1027        batch.column(0).clone()
1028    }
1029
1030    #[test]
1031    fn fsl_value_miniblock_stress() {
1032        let sample_array = create_random_fsl();
1033
1034        let starting_data =
1035            DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1036
1037        let encoder = ValueEncoder::default();
1038        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1039
1040        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1041            compression.array_encoding.unwrap()
1042        else {
1043            panic!()
1044        };
1045
1046        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1047
1048        let decompressed =
1049            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1050
1051        let decompressed = make_array(
1052            decompressed
1053                .into_arrow(sample_array.data_type().clone(), true)
1054                .unwrap(),
1055        );
1056
1057        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1058    }
1059
1060    #[test]
1061    fn fsl_value_per_value_stress() {
1062        let sample_array = create_random_fsl();
1063
1064        let starting_data =
1065            DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1066
1067        let encoder = ValueEncoder::default();
1068        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1069
1070        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1071            compression.array_encoding.unwrap()
1072        else {
1073            panic!()
1074        };
1075
1076        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1077
1078        let PerValueDataBlock::Fixed(data) = data else {
1079            panic!()
1080        };
1081
1082        let num_values = data.num_values;
1083        let decompressed =
1084            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1085
1086        let decompressed = make_array(
1087            decompressed
1088                .into_arrow(sample_array.data_type().clone(), true)
1089                .unwrap(),
1090        );
1091
1092        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1093    }
1094}