Skip to main content

lance_encoding/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_buffer::{BooleanBufferBuilder, bit_util};
5
6use crate::buffer::LanceBuffer;
7use crate::compression::{
8    BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
9};
10use crate::data::{
11    BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
12};
13use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
14use crate::encodings::logical::primitive::miniblock::{
15    MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES, MiniBlockChunk, MiniBlockCompressed,
16    MiniBlockCompressor,
17};
18use crate::format::ProtobufUtils21;
19use crate::format::pb21::compressive_encoding::Compression;
20use crate::format::pb21::{self, CompressiveEncoding};
21
22use lance_core::{Error, Result};
23
24/// A compression strategy that writes fixed-width data as-is (no compression)
25#[derive(Debug, Default)]
26pub struct ValueEncoder {}
27
28impl ValueEncoder {
29    /// Use the largest chunk we can smaller than 4KiB
30    fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
31        let mut size_bytes = 2 * bytes_per_word;
32        let (mut log_num_vals, mut num_vals) = match values_per_word {
33            1 => (1, 2),
34            8 => (3, 8),
35            _ => unreachable!(),
36        };
37
38        // If the type is so wide that we can't even fit 2 values we shouldn't be here
39        assert!(size_bytes < MAX_MINIBLOCK_BYTES);
40
41        while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
42            log_num_vals += 1;
43            size_bytes *= 2;
44            num_vals *= 2;
45        }
46
47        (log_num_vals, num_vals)
48    }
49
50    fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
51        // Usually there are X bytes per value.  However, when working with boolean
52        // or FSL<boolean> we might have some number of bits per value that isn't
53        // divisible by 8.  In this case, to avoid chunking in the middle of a byte
54        // we calculate how many 8-value words we can fit in a chunk.
55        let (bytes_per_word, values_per_word) = if data.bits_per_value.is_multiple_of(8) {
56            (data.bits_per_value / 8, 1)
57        } else {
58            (data.bits_per_value, 8)
59        };
60
61        // Aim for 4KiB chunks
62        let (log_vals_per_chunk, vals_per_chunk) =
63            Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
64        let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
65        debug_assert_eq!(vals_per_chunk % values_per_word, 0);
66        let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
67        let bytes_per_chunk = u32::try_from(bytes_per_chunk).unwrap();
68        debug_assert!(bytes_per_chunk > 0);
69
70        let data_buffer = data.data;
71
72        let mut row_offset = 0;
73        let mut chunks = Vec::with_capacity(num_chunks);
74
75        let mut bytes_counter = 0;
76        loop {
77            if row_offset + vals_per_chunk <= data.num_values {
78                // We can make a full chunk
79                chunks.push(MiniBlockChunk {
80                    log_num_values: log_vals_per_chunk as u8,
81                    buffer_sizes: vec![bytes_per_chunk],
82                });
83                row_offset += vals_per_chunk;
84                bytes_counter += bytes_per_chunk as u64;
85            } else if row_offset < data.num_values {
86                // Final chunk, special values
87                let num_bytes = data_buffer.len() as u64 - bytes_counter;
88                let num_bytes = u32::try_from(num_bytes).unwrap();
89                chunks.push(MiniBlockChunk {
90                    log_num_values: 0,
91                    buffer_sizes: vec![num_bytes],
92                });
93                break;
94            } else {
95                // If we get here then all chunks were full chunks and we have no remainder chunk
96                break;
97            }
98        }
99
100        debug_assert_eq!(chunks.len(), num_chunks);
101
102        MiniBlockCompressed {
103            chunks,
104            data: vec![data_buffer],
105            num_values: data.num_values,
106        }
107    }
108}
109
110#[derive(Debug)]
111struct MiniblockFslLayer {
112    validity: Option<LanceBuffer>,
113    dimension: u64,
114}
115
116/// This impl deals with encoding FSL<FSL<...<FSL<FixedWidth>>>> data as a mini-block compressor.
117/// The tricky part of FSL data is that we want to include inner validity buffers (we don't want these
118/// to be part of the rep-def because that usually ends up being more expensive).
119///
120/// The resulting mini-block will, instead of having a single buffer, have X + 1 buffers where X is
121/// the number of FSL layers that contain validity.
122///
123/// In the simple case where there is no validity inside the FSL layers, all we are doing here is flattening
124/// the FSL layers into a single buffer.
125///
126/// Also: We don't allow a row to be broken across chunks.  This typically isn't too big of a deal since we
127/// are usually dealing with relatively small vectors if we are using mini-block.
128///
129/// Note: when we do have validity we have to make copies of the validity buffers because they are bit buffers
130/// and we need to bit slice them which requires copies or offsets.  Paying the price at write time to make
131/// the copies is better than paying the price at read time to do the bit slicing.
132impl ValueEncoder {
133    fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> CompressiveEncoding {
134        let mut encoding = ProtobufUtils21::flat(bits_per_value, None);
135        for layer in layers.iter().rev() {
136            let has_validity = layer.validity.is_some();
137            let dimension = layer.dimension;
138            encoding = ProtobufUtils21::fsl(dimension, has_validity, encoding);
139        }
140        encoding
141    }
142
143    fn extract_fsl_chunk(
144        data: &FixedWidthDataBlock,
145        layers: &[MiniblockFslLayer],
146        row_offset: usize,
147        num_rows: usize,
148        validity_buffers: &mut [Vec<u8>],
149    ) -> Vec<u32> {
150        let mut row_offset = row_offset;
151        let mut num_values = num_rows;
152        let mut buffer_counter = 0;
153        let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
154        for layer in layers {
155            row_offset *= layer.dimension as usize;
156            num_values *= layer.dimension as usize;
157            if let Some(validity) = &layer.validity {
158                let validity_slice = validity
159                    .clone()
160                    .bit_slice_le_with_length(row_offset, num_values);
161                validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
162                buffer_sizes.push(validity_slice.len() as u32);
163                buffer_counter += 1;
164            }
165        }
166
167        let bits_in_chunk = data.bits_per_value * num_values as u64;
168        let bytes_in_chunk = bits_in_chunk.div_ceil(8);
169        let bytes_in_chunk = u32::try_from(bytes_in_chunk).unwrap();
170        debug_assert!(bytes_in_chunk > 0);
171        buffer_sizes.push(bytes_in_chunk);
172
173        buffer_sizes
174    }
175
176    fn chunk_fsl(
177        data: FixedWidthDataBlock,
178        layers: Vec<MiniblockFslLayer>,
179        num_rows: u64,
180    ) -> (MiniBlockCompressed, CompressiveEncoding) {
181        // Count size to calculate rows per chunk
182        let mut ceil_bytes_validity = 0;
183        let mut cum_dim = 1;
184        let mut num_validity_buffers = 0;
185        for layer in &layers {
186            cum_dim *= layer.dimension;
187            if layer.validity.is_some() {
188                ceil_bytes_validity += cum_dim.div_ceil(8);
189                num_validity_buffers += 1;
190            }
191        }
192        // It's an estimate because validity buffers may have some padding bits
193        let cum_bits_per_value = data.bits_per_value * cum_dim;
194        let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value.is_multiple_of(8) {
195            (cum_bits_per_value / 8, 1)
196        } else {
197            (cum_bits_per_value, 8)
198        };
199        let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
200        let (log_rows_per_chunk, rows_per_chunk) =
201            Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
202
203        let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
204
205        // Allocate buffers for validity, these will be slightly bigger than the input validity buffers
206        let mut chunks = Vec::with_capacity(num_chunks);
207        let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
208        cum_dim = 1;
209        for layer in &layers {
210            cum_dim *= layer.dimension;
211            if let Some(validity) = &layer.validity {
212                let layer_bytes_validity = cum_dim.div_ceil(8);
213                let validity_with_padding =
214                    layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
215                debug_assert!(validity_with_padding >= validity.len());
216                validity_buffers.push(Vec::with_capacity(
217                    layer_bytes_validity as usize * num_chunks,
218                ));
219            }
220        }
221
222        // Now go through and extract validity buffers
223        let mut row_offset = 0;
224        while row_offset + rows_per_chunk <= num_rows {
225            let buffer_sizes = Self::extract_fsl_chunk(
226                &data,
227                &layers,
228                row_offset as usize,
229                rows_per_chunk as usize,
230                &mut validity_buffers,
231            );
232            row_offset += rows_per_chunk;
233            chunks.push(MiniBlockChunk {
234                log_num_values: log_rows_per_chunk as u8,
235                buffer_sizes,
236            })
237        }
238        let rows_in_chunk = num_rows - row_offset;
239        if rows_in_chunk > 0 {
240            let buffer_sizes = Self::extract_fsl_chunk(
241                &data,
242                &layers,
243                row_offset as usize,
244                rows_in_chunk as usize,
245                &mut validity_buffers,
246            );
247            chunks.push(MiniBlockChunk {
248                log_num_values: 0,
249                buffer_sizes,
250            });
251        }
252
253        let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
254        // Finally, add the data buffer
255        let buffers = validity_buffers
256            .into_iter()
257            .map(LanceBuffer::from)
258            .chain(std::iter::once(data.data))
259            .collect::<Vec<_>>();
260
261        (
262            MiniBlockCompressed {
263                chunks,
264                data: buffers,
265                num_values: num_rows,
266            },
267            encoding,
268        )
269    }
270
271    fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
272        let num_rows = data.num_values();
273        let fsl = data.as_fixed_size_list().unwrap();
274        let mut layers = Vec::new();
275        let mut child = *fsl.child;
276        let mut cur_layer = MiniblockFslLayer {
277            validity: None,
278            dimension: fsl.dimension,
279        };
280        loop {
281            if let DataBlock::Nullable(nullable) = child {
282                cur_layer.validity = Some(nullable.nulls);
283                child = *nullable.data;
284            }
285            match child {
286                DataBlock::FixedSizeList(inner) => {
287                    layers.push(cur_layer);
288                    cur_layer = MiniblockFslLayer {
289                        validity: None,
290                        dimension: inner.dimension,
291                    };
292                    child = *inner.child;
293                }
294                DataBlock::FixedWidth(inner) => {
295                    layers.push(cur_layer);
296                    return Self::chunk_fsl(inner, layers, num_rows);
297                }
298                _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
299            }
300        }
301    }
302}
303
304struct PerValueFslValidityIter {
305    buffer: LanceBuffer,
306    bits_per_row: usize,
307    offset: usize,
308}
309
310/// In this section we deal with per-value encoding of FSL<FSL<...<FSL<FixedWidth>>>> data.
311///
312/// It's easier than mini-block.  All we need to do is flatten the FSL layers into a single buffer.
313/// This includes any validity buffers we encounter on the way.
314impl ValueEncoder {
315    fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> CompressiveEncoding {
316        let mut inner = fsl.child.as_ref();
317        let mut has_validity = false;
318        inner = match inner {
319            DataBlock::Nullable(nullable) => {
320                has_validity = true;
321                nullable.data.as_ref()
322            }
323            DataBlock::AllNull(_) => {
324                return ProtobufUtils21::constant(None);
325            }
326            _ => inner,
327        };
328        let inner_encoding = match inner {
329            DataBlock::FixedWidth(fixed_width) => {
330                ProtobufUtils21::flat(fixed_width.bits_per_value, None)
331            }
332            DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
333            _ => unreachable!(
334                "Unexpected data block type in value encoder's fsl_to_encoding: {}",
335                inner.name()
336            ),
337        };
338        ProtobufUtils21::fsl(fsl.dimension, has_validity, inner_encoding)
339    }
340
341    fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
342        // The simple case is zero-copy, we just return the flattened inner buffer
343        let encoding = Self::fsl_to_encoding(&fsl);
344        let num_values = fsl.num_values();
345        let mut child = *fsl.child;
346        let mut cum_dim = 1;
347        loop {
348            cum_dim *= fsl.dimension;
349            match child {
350                DataBlock::Nullable(nullable) => {
351                    child = *nullable.data;
352                }
353                DataBlock::FixedSizeList(inner) => {
354                    child = *inner.child;
355                }
356                DataBlock::FixedWidth(inner) => {
357                    let data = FixedWidthDataBlock {
358                        bits_per_value: inner.bits_per_value * cum_dim,
359                        num_values,
360                        data: inner.data,
361                        block_info: BlockInfo::new(),
362                    };
363                    return (PerValueDataBlock::Fixed(data), encoding);
364                }
365                _ => unreachable!(
366                    "Unexpected data block type in value encoder's simple_per_value_fsl"
367                ),
368            }
369        }
370    }
371
372    fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
373        // If there are nullable inner values then we need to zip the validity with the values
374        let encoding = Self::fsl_to_encoding(&fsl);
375        let num_values = fsl.num_values();
376        let mut bytes_per_row = 0;
377        let mut cum_dim = 1;
378        let mut current = fsl;
379        let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
380        let data_bytes_per_row: usize;
381        let data_buffer: LanceBuffer;
382        loop {
383            cum_dim *= current.dimension;
384            let mut child = *current.child;
385            if let DataBlock::Nullable(nullable) = child {
386                // Each item will need this many bytes of validity prepended to it
387                bytes_per_row += cum_dim.div_ceil(8) as usize;
388                validity_iters.push(PerValueFslValidityIter {
389                    buffer: nullable.nulls,
390                    bits_per_row: cum_dim as usize,
391                    offset: 0,
392                });
393                child = *nullable.data;
394            };
395            match child {
396                DataBlock::FixedSizeList(inner) => {
397                    current = inner;
398                }
399                DataBlock::FixedWidth(fixed_width) => {
400                    data_bytes_per_row =
401                        (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
402                    bytes_per_row += data_bytes_per_row;
403                    data_buffer = fixed_width.data;
404                    break;
405                }
406                DataBlock::AllNull(_) => {
407                    data_bytes_per_row = 0;
408                    data_buffer = LanceBuffer::empty();
409                    break;
410                }
411                _ => unreachable!(
412                    "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
413                    child
414                ),
415            }
416        }
417
418        let bytes_needed = bytes_per_row * num_values as usize;
419        let mut zipped = Vec::with_capacity(bytes_needed);
420        let data_slice = &data_buffer;
421        // Hopefully values are pretty large so we don't iterate this loop _too_ many times
422        for i in 0..num_values as usize {
423            for validity in validity_iters.iter_mut() {
424                let validity_slice = validity
425                    .buffer
426                    .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
427                zipped.extend_from_slice(&validity_slice);
428                validity.offset += validity.bits_per_row;
429            }
430            let start = i * data_bytes_per_row;
431            let end = start + data_bytes_per_row;
432            zipped.extend_from_slice(&data_slice[start..end]);
433        }
434
435        let zipped = LanceBuffer::from(zipped);
436        let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
437            bits_per_value: bytes_per_row as u64 * 8,
438            num_values,
439            data: zipped,
440            block_info: BlockInfo::new(),
441        });
442        (data, encoding)
443    }
444
445    fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
446        if !fsl.child.is_nullable() {
447            Self::simple_per_value_fsl(fsl)
448        } else {
449            Self::nullable_per_value_fsl(fsl)
450        }
451    }
452}
453
454impl BlockCompressor for ValueEncoder {
455    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
456        let data = match data {
457            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
458            _ => unimplemented!(
459                "Cannot compress block of type {} with ValueEncoder",
460                data.name()
461            ),
462        };
463        Ok(data)
464    }
465}
466
467impl MiniBlockCompressor for ValueEncoder {
468    fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
469        match chunk {
470            DataBlock::FixedWidth(fixed_width) => {
471                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
472                Ok((Self::chunk_data(fixed_width), encoding))
473            }
474            DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
475            _ => Err(Error::invalid_input_source(
476                format!(
477                    "Cannot compress a data block of type {} with ValueEncoder",
478                    chunk.name()
479                )
480                .into(),
481            )),
482        }
483    }
484}
485
486#[derive(Debug)]
487struct ValueFslDesc {
488    dimension: u64,
489    has_validity: bool,
490}
491
492/// A decompressor for fixed-width data that has
493/// been written, as-is, to disk in single contiguous array
494#[derive(Debug)]
495pub struct ValueDecompressor {
496    /// How many bits are in each inner-most item (e.g. FSL<Int32, 100> would be 32)
497    bits_per_item: u64,
498    /// How many bits are in each value (e.g. FSL<Int32, 100> would be 3200)
499    ///
500    /// This number is a little trickier to compute because we also have to include bytes
501    /// of any inner validity
502    bits_per_value: u64,
503    /// How many items are in each value (e.g. FSL<Int32, 100> would be 100)
504    items_per_value: u64,
505    layers: Vec<ValueFslDesc>,
506}
507
508impl ValueDecompressor {
509    pub fn from_flat(description: &pb21::Flat) -> Self {
510        Self {
511            bits_per_item: description.bits_per_value,
512            bits_per_value: description.bits_per_value,
513            items_per_value: 1,
514            layers: Vec::default(),
515        }
516    }
517
518    pub fn from_fsl(mut description: &pb21::FixedSizeList) -> Self {
519        let mut layers = Vec::new();
520        let mut cum_dim = 1;
521        let mut bytes_per_value = 0;
522        loop {
523            layers.push(ValueFslDesc {
524                has_validity: description.has_validity,
525                dimension: description.items_per_value,
526            });
527            cum_dim *= description.items_per_value;
528            if description.has_validity {
529                bytes_per_value += cum_dim.div_ceil(8);
530            }
531            match description
532                .values
533                .as_ref()
534                .unwrap()
535                .compression
536                .as_ref()
537                .unwrap()
538            {
539                Compression::FixedSizeList(inner) => {
540                    description = inner;
541                }
542                Compression::Flat(flat) => {
543                    let mut bits_per_value = bytes_per_value * 8;
544                    bits_per_value += flat.bits_per_value * cum_dim;
545                    return Self {
546                        bits_per_item: flat.bits_per_value,
547                        bits_per_value,
548                        items_per_value: cum_dim,
549                        layers,
550                    };
551                }
552                _ => unreachable!(),
553            }
554        }
555    }
556
557    fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
558        DataBlock::FixedWidth(FixedWidthDataBlock {
559            bits_per_value: self.bits_per_item,
560            num_values,
561            data,
562            block_info: BlockInfo::new(),
563        })
564    }
565}
566
567impl BlockDecompressor for ValueDecompressor {
568    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
569        let block = self.buffer_to_block(data, num_values);
570        assert_eq!(block.num_values(), num_values);
571        Ok(block)
572    }
573}
574
575impl MiniBlockDecompressor for ValueDecompressor {
576    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
577        let num_items = num_values * self.items_per_value;
578        let mut buffer_iter = data.into_iter().rev();
579
580        // Always at least 1 buffer
581        let data_buf = buffer_iter.next().unwrap();
582        let items = self.buffer_to_block(data_buf, num_items);
583        let mut lists = items;
584
585        for layer in self.layers.iter().rev() {
586            if layer.has_validity {
587                let validity_buf = buffer_iter.next().unwrap();
588                lists = DataBlock::Nullable(NullableDataBlock {
589                    data: Box::new(lists),
590                    nulls: validity_buf,
591                    block_info: BlockInfo::default(),
592                });
593            }
594            lists = DataBlock::FixedSizeList(FixedSizeListBlock {
595                child: Box::new(lists),
596                dimension: layer.dimension,
597            })
598        }
599
600        assert_eq!(lists.num_values(), num_values);
601        Ok(lists)
602    }
603}
604
605struct FslDecompressorValidityBuilder {
606    buffer: BooleanBufferBuilder,
607    bits_per_row: usize,
608    bytes_per_row: usize,
609}
610
611// Helper methods for per-value decompression
612impl ValueDecompressor {
613    fn has_validity(&self) -> bool {
614        self.layers.iter().any(|layer| layer.has_validity)
615    }
616
617    // If there is no validity then decompression is zero-copy, we just need to restore any FSL layers
618    fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
619        let mut cum_dim = 1;
620        for layer in &self.layers {
621            cum_dim *= layer.dimension;
622        }
623        debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
624        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
625            bits_per_value: self.bits_per_item,
626            num_values: num_rows * cum_dim,
627            data: data.data,
628            block_info: BlockInfo::new(),
629        });
630        for layer in self.layers.iter().rev() {
631            block = DataBlock::FixedSizeList(FixedSizeListBlock {
632                child: Box::new(block),
633                dimension: layer.dimension,
634            });
635        }
636        debug_assert_eq!(num_rows, block.num_values());
637        block
638    }
639
640    // If there is validity then it has been zipped in with the values and we must unzip it
641    fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
642        // No support for full-zip on per-value encodings
643        assert_eq!(self.bits_per_item % 8, 0);
644        let bytes_per_item = self.bits_per_item / 8;
645        let mut buffer_builders = Vec::with_capacity(self.layers.len());
646        let mut cum_dim = 1;
647        let mut total_size_bytes = 0;
648        // First, go through the layers, setup our builders, allocate space
649        for layer in &self.layers {
650            cum_dim *= layer.dimension as usize;
651            if layer.has_validity {
652                let validity_size_bits = cum_dim;
653                let validity_size_bytes = validity_size_bits.div_ceil(8);
654                total_size_bytes += num_rows * validity_size_bytes;
655                buffer_builders.push(FslDecompressorValidityBuilder {
656                    buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
657                    bits_per_row: cum_dim,
658                    bytes_per_row: validity_size_bytes,
659                })
660            }
661        }
662        let num_items = num_rows * cum_dim;
663        let data_size = num_items * bytes_per_item as usize;
664        total_size_bytes += data_size;
665        let mut data_buffer = Vec::with_capacity(data_size);
666
667        assert_eq!(data.data.len(), total_size_bytes);
668
669        let bytes_per_value = bytes_per_item as usize;
670        let data_bytes_per_row = bytes_per_value * cum_dim;
671
672        // Next, unzip
673        let mut data_offset = 0;
674        while data_offset < total_size_bytes {
675            for builder in buffer_builders.iter_mut() {
676                let start = data_offset * 8;
677                let end = start + builder.bits_per_row;
678                builder.buffer.append_packed_range(start..end, &data.data);
679                data_offset += builder.bytes_per_row;
680            }
681            let end = data_offset + data_bytes_per_row;
682            data_buffer.extend_from_slice(&data.data[data_offset..end]);
683            data_offset += data_bytes_per_row;
684        }
685
686        // Finally, restore the structure
687        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
688            bits_per_value: self.bits_per_item,
689            num_values: num_items as u64,
690            data: LanceBuffer::from(data_buffer),
691            block_info: BlockInfo::new(),
692        });
693
694        let mut validity_bufs = buffer_builders
695            .into_iter()
696            .rev()
697            .map(|mut b| LanceBuffer::from(b.buffer.finish().into_inner()));
698        for layer in self.layers.iter().rev() {
699            if layer.has_validity {
700                let nullable = NullableDataBlock {
701                    data: Box::new(block),
702                    nulls: validity_bufs.next().unwrap(),
703                    block_info: BlockInfo::new(),
704                };
705                block = DataBlock::Nullable(nullable);
706            }
707            block = DataBlock::FixedSizeList(FixedSizeListBlock {
708                child: Box::new(block),
709                dimension: layer.dimension,
710            });
711        }
712
713        assert_eq!(num_rows, block.num_values() as usize);
714
715        block
716    }
717}
718
719impl FixedPerValueDecompressor for ValueDecompressor {
720    fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
721        if self.has_validity() {
722            Ok(self.unzip_decompress(data, num_rows as usize))
723        } else {
724            Ok(self.simple_decompress(data, num_rows))
725        }
726    }
727
728    fn bits_per_value(&self) -> u64 {
729        self.bits_per_value
730    }
731}
732
733impl PerValueCompressor for ValueEncoder {
734    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
735        let (data, encoding) = match data {
736            DataBlock::FixedWidth(fixed_width) => {
737                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
738                (PerValueDataBlock::Fixed(fixed_width), encoding)
739            }
740            DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
741            _ => unimplemented!(
742                "Cannot compress block of type {} with ValueEncoder",
743                data.name()
744            ),
745        };
746        Ok((data, encoding))
747    }
748}
749
750// public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list
751#[cfg(test)]
752pub(crate) mod tests {
753    use std::{
754        collections::HashMap,
755        sync::{Arc, LazyLock},
756    };
757
758    use arrow_array::{
759        Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array, ListArray, UInt8Array,
760        make_array, new_null_array, types::UInt32Type,
761    };
762    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
763    use arrow_schema::{DataType, Field, TimeUnit};
764    use lance_datagen::{ArrayGeneratorExt, Dimension, RowCount, array, gen_batch};
765
766    use crate::{
767        compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
768        data::DataBlock,
769        encodings::{
770            logical::primitive::{
771                fullzip::{PerValueCompressor, PerValueDataBlock},
772                miniblock::MiniBlockCompressor,
773            },
774            physical::value::ValueDecompressor,
775        },
776        format::pb21::compressive_encoding::Compression,
777        testing::{
778            FnArrayGeneratorProvider, TestCases, check_basic_random,
779            check_round_trip_encoding_generated, check_round_trip_encoding_of_data,
780        },
781        version::LanceFileVersion,
782    };
783
784    use super::ValueEncoder;
785
786    const PRIMITIVE_TYPES: &[DataType] = &[
787        DataType::Null,
788        DataType::FixedSizeBinary(2),
789        DataType::Date32,
790        DataType::Date64,
791        DataType::Int8,
792        DataType::Int16,
793        DataType::Int32,
794        DataType::Int64,
795        DataType::UInt8,
796        DataType::UInt16,
797        DataType::UInt32,
798        DataType::UInt64,
799        DataType::Float16,
800        DataType::Float32,
801        DataType::Float64,
802        DataType::Decimal128(10, 10),
803        DataType::Decimal256(10, 10),
804        DataType::Timestamp(TimeUnit::Nanosecond, None),
805        DataType::Time32(TimeUnit::Second),
806        DataType::Time64(TimeUnit::Nanosecond),
807        DataType::Duration(TimeUnit::Second),
808        // The Interval type is supported by the reader but the writer works with Lance schema
809        // at the moment and Lance schema can't parse interval
810        // DataType::Interval(IntervalUnit::DayTime),
811    ];
812
813    #[test_log::test(tokio::test)]
814    async fn test_simple_value() {
815        let items = Arc::new(Int32Array::from(vec![
816            Some(0),
817            None,
818            Some(2),
819            Some(3),
820            Some(4),
821            Some(5),
822        ]));
823
824        let test_cases = TestCases::default()
825            .with_range(0..3)
826            .with_range(0..2)
827            .with_range(1..3)
828            .with_indices(vec![0, 1, 2])
829            .with_indices(vec![1])
830            .with_indices(vec![2])
831            .with_min_file_version(LanceFileVersion::V2_1);
832
833        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
834    }
835
836    #[test_log::test(tokio::test)]
837    async fn test_simple_range() {
838        let items = Arc::new(Int32Array::from_iter(
839            (0..5000).map(|i| if i % 2 == 0 { Some(i) } else { None }),
840        ));
841
842        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
843
844        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
845    }
846
847    #[test_log::test(tokio::test)]
848    async fn test_value_primitive() {
849        for data_type in PRIMITIVE_TYPES {
850            log::info!("Testing encoding for {:?}", data_type);
851            let field = Field::new("", data_type.clone(), false);
852            check_basic_random(field).await;
853        }
854    }
855
856    static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
857        vec![DataType::FixedSizeList(
858            Arc::new(Field::new("", DataType::Int32, false)),
859            128,
860        )]
861    });
862
863    #[test_log::test(tokio::test)]
864    async fn test_large_primitive() {
865        for data_type in LARGE_TYPES.iter() {
866            log::info!("Testing encoding for {:?}", data_type);
867            let field = Field::new("", data_type.clone(), false);
868            check_basic_random(field).await;
869        }
870    }
871
872    #[test_log::test(tokio::test)]
873    async fn test_decimal128_dictionary_encoding() {
874        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
875        let decimals: Vec<i32> = (0..100).collect();
876        let repeated_strings: Vec<_> = decimals
877            .iter()
878            .cycle()
879            .take(decimals.len() * 10000)
880            .map(|&v| Some(v as i128))
881            .collect();
882        let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
883        check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
884    }
885
886    #[test_log::test(tokio::test)]
887    async fn test_miniblock_stress() {
888        // Tests for strange page sizes and batch sizes and validity scenarios for miniblock
889
890        // 10K integers, 100 per array, all valid
891        let data1 = (0..100)
892            .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
893            .collect::<Vec<_>>();
894
895        // Same as above but with mixed validity
896        let data2 = (0..100)
897            .map(|_| {
898                Arc::new(Int32Array::from_iter(
899                    (0..100).map(|i| if i % 2 == 0 { Some(i) } else { None }),
900                )) as Arc<dyn Array>
901            })
902            .collect::<Vec<_>>();
903
904        // Same as above but with all null for first half then all valid
905        // TODO: Re-enable once the all-null path is complete
906        let _data3 = (0..100)
907            .map(|chunk_idx| {
908                Arc::new(Int32Array::from_iter(
909                    (0..100).map(|i| if chunk_idx < 50 { None } else { Some(i) }),
910                )) as Arc<dyn Array>
911            })
912            .collect::<Vec<_>>();
913
914        for data in [data1, data2 /*data3*/] {
915            for batch_size in [10, 100, 1500, 15000] {
916                // 40000 bytes of data
917                let test_cases = TestCases::default()
918                    .with_page_sizes(vec![1000, 2000, 3000, 60000])
919                    .with_batch_size(batch_size)
920                    .with_min_file_version(LanceFileVersion::V2_1);
921
922                check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
923            }
924        }
925    }
926
927    fn create_simple_fsl() -> FixedSizeListArray {
928        // [[0, 1], NULL], [NULL, NULL], [[8, 9], [NULL, 11]]
929        let items = Arc::new(Int32Array::from(vec![
930            Some(0),
931            Some(1),
932            Some(2),
933            Some(3),
934            None,
935            None,
936            None,
937            None,
938            Some(8),
939            Some(9),
940            None,
941            Some(11),
942        ]));
943        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
944        let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
945        let inner_list = Arc::new(FixedSizeListArray::new(
946            items_field.clone(),
947            2,
948            items,
949            Some(NullBuffer::new(inner_list_nulls)),
950        ));
951        let inner_list_field = Arc::new(Field::new(
952            "item",
953            DataType::FixedSizeList(items_field, 2),
954            true,
955        ));
956        FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
957    }
958
959    #[test]
960    fn test_fsl_value_compression_miniblock() {
961        let sample_list = create_simple_fsl();
962
963        let starting_data = DataBlock::from_array(sample_list.clone());
964
965        let encoder = ValueEncoder::default();
966        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
967
968        assert_eq!(data.num_values, 3);
969        assert_eq!(data.data.len(), 3);
970        assert_eq!(data.chunks.len(), 1);
971        assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
972        assert_eq!(data.chunks[0].log_num_values, 0);
973
974        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
975            panic!()
976        };
977
978        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
979
980        let decompressed =
981            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
982
983        let decompressed = make_array(
984            decompressed
985                .into_arrow(sample_list.data_type().clone(), true)
986                .unwrap(),
987        );
988
989        assert_eq!(decompressed.as_ref(), &sample_list);
990    }
991
992    #[test]
993    fn test_fsl_value_compression_per_value() {
994        let sample_list = create_simple_fsl();
995
996        let starting_data = DataBlock::from_array(sample_list.clone());
997
998        let encoder = ValueEncoder::default();
999        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1000
1001        let PerValueDataBlock::Fixed(data) = data else {
1002            panic!()
1003        };
1004
1005        assert_eq!(data.bits_per_value, 144);
1006        assert_eq!(data.num_values, 3);
1007        assert_eq!(data.data.len(), 18 * 3);
1008
1009        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1010            panic!()
1011        };
1012
1013        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1014
1015        let num_values = data.num_values;
1016        let decompressed =
1017            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1018
1019        let decompressed = make_array(
1020            decompressed
1021                .into_arrow(sample_list.data_type().clone(), true)
1022                .unwrap(),
1023        );
1024
1025        assert_eq!(decompressed.as_ref(), &sample_list);
1026    }
1027
1028    #[test_log::test(tokio::test)]
1029    async fn test_fsl_all_null() {
1030        let items = new_null_array(&DataType::Int32, 12);
1031        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1032        let list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1033        let list_array =
1034            FixedSizeListArray::new(items_field, 2, items, Some(NullBuffer::new(list_nulls)));
1035
1036        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
1037
1038        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1039            .await;
1040    }
1041
1042    #[test_log::test(tokio::test)]
1043    async fn regress_list_fsl() {
1044        // This regresses a case where rows are large lists that span multiple
1045        // mini-block chunks which gives us some all-premable mini-block chunks.
1046        let offsets = ScalarBuffer::<i32>::from(vec![0, 393, 755, 1156, 1536]);
1047        let data = UInt8Array::from(vec![0; 1536 * 16]);
1048        let fsl_field = Arc::new(Field::new("item", DataType::UInt8, true));
1049        let fsl = FixedSizeListArray::new(fsl_field, 16, Arc::new(data), None);
1050        let list_field = Arc::new(Field::new("item", fsl.data_type().clone(), false));
1051        let list_arr = ListArray::new(list_field, OffsetBuffer::new(offsets), Arc::new(fsl), None);
1052
1053        let test_cases = TestCases::default()
1054            .with_min_file_version(LanceFileVersion::V2_1)
1055            .with_batch_size(1);
1056
1057        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1058            .await;
1059    }
1060
1061    fn create_random_fsl() -> Arc<dyn Array> {
1062        // Several levels of def and multiple pages
1063        let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1064        let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1065        let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1066        let list_three = array::cycle_vec(list_two, Dimension::from(2));
1067
1068        // Should be 256Ki rows ~ 1MiB of data
1069        let batch = gen_batch()
1070            .anon_col(list_three)
1071            .into_batch_rows(RowCount::from(8 * 1024))
1072            .unwrap();
1073        batch.column(0).clone()
1074    }
1075
1076    #[test]
1077    fn fsl_value_miniblock_stress() {
1078        let sample_array = create_random_fsl();
1079
1080        let starting_data = DataBlock::from_arrays(
1081            std::slice::from_ref(&sample_array),
1082            sample_array.len() as u64,
1083        );
1084
1085        let encoder = ValueEncoder::default();
1086        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1087
1088        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1089            panic!()
1090        };
1091
1092        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1093
1094        let decompressed =
1095            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1096
1097        let decompressed = make_array(
1098            decompressed
1099                .into_arrow(sample_array.data_type().clone(), true)
1100                .unwrap(),
1101        );
1102
1103        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1104    }
1105
1106    #[test]
1107    fn fsl_value_per_value_stress() {
1108        let sample_array = create_random_fsl();
1109
1110        let starting_data = DataBlock::from_arrays(
1111            std::slice::from_ref(&sample_array),
1112            sample_array.len() as u64,
1113        );
1114
1115        let encoder = ValueEncoder::default();
1116        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1117
1118        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1119            panic!()
1120        };
1121
1122        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1123
1124        let PerValueDataBlock::Fixed(data) = data else {
1125            panic!()
1126        };
1127
1128        let num_values = data.num_values;
1129        let decompressed =
1130            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1131
1132        let decompressed = make_array(
1133            decompressed
1134                .into_arrow(sample_array.data_type().clone(), true)
1135                .unwrap(),
1136        );
1137
1138        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1139    }
1140
1141    #[test_log::test(tokio::test)]
1142    async fn test_fsl_nullable_items() {
1143        let datagen = Box::new(FnArrayGeneratorProvider::new(move || {
1144            lance_datagen::array::rand_vec_nullable::<UInt32Type>(Dimension::from(128), 0.5)
1145        }));
1146
1147        let field = Field::new(
1148            "",
1149            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt32, true)), 128),
1150            false,
1151        );
1152        check_round_trip_encoding_generated(field, datagen, TestCases::default()).await;
1153    }
1154
1155    #[test_log::test(tokio::test)]
1156    async fn test_value_encoding_verification() {
1157        use std::collections::HashMap;
1158
1159        let test_cases = TestCases::default()
1160            .with_expected_encoding("flat")
1161            .with_min_file_version(LanceFileVersion::V2_1);
1162
1163        // Test both explicit configuration and automatic fallback scenarios
1164        // 1. Test explicit "none" compression to force flat encoding
1165        // Also explicitly disable BSS to ensure value encoding is tested
1166        let mut metadata_explicit = HashMap::new();
1167        metadata_explicit.insert("lance-encoding:compression".to_string(), "none".to_string());
1168        metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1169
1170        let arr_explicit =
1171            Arc::new(Int32Array::from((0..1000).collect::<Vec<i32>>())) as Arc<dyn Array>;
1172        check_round_trip_encoding_of_data(vec![arr_explicit], &test_cases, metadata_explicit).await;
1173
1174        // 2. Test automatic fallback to flat encoding when bitpacking conditions aren't met
1175        // Use unique values to avoid RLE encoding
1176        // Explicitly disable BSS to ensure value encoding is tested
1177        let mut metadata = HashMap::new();
1178        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1179
1180        let arr_fallback = Arc::new(Int32Array::from(
1181            (0..100).map(|i| i * 73 + 19).collect::<Vec<i32>>(),
1182        )) as Arc<dyn Array>;
1183        check_round_trip_encoding_of_data(vec![arr_fallback], &test_cases, metadata).await;
1184    }
1185
1186    #[test_log::test(tokio::test)]
1187    async fn test_mixed_page_validity() {
1188        let no_nulls = Arc::new(Int32Array::from_iter_values([1, 2]));
1189        let has_nulls = Arc::new(Int32Array::from_iter([Some(3), None, Some(5)]));
1190
1191        let test_cases = TestCases::default().with_page_sizes(vec![1]);
1192        check_round_trip_encoding_of_data(vec![no_nulls, has_nulls], &test_cases, HashMap::new())
1193            .await;
1194    }
1195}