lance_encoding/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use snafu::location;
6
7use crate::buffer::LanceBuffer;
8use crate::compression::{
9    BlockCompressor, BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor,
10};
11use crate::data::{
12    BlockInfo, DataBlock, FixedSizeListBlock, FixedWidthDataBlock, NullableDataBlock,
13};
14use crate::encodings::logical::primitive::fullzip::{PerValueCompressor, PerValueDataBlock};
15use crate::encodings::logical::primitive::miniblock::{
16    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, MAX_MINIBLOCK_BYTES,
17    MAX_MINIBLOCK_VALUES,
18};
19use crate::format::pb21::compressive_encoding::Compression;
20use crate::format::pb21::{self, CompressiveEncoding};
21use crate::format::ProtobufUtils21;
22
23use lance_core::{Error, Result};
24
25/// A compression strategy that writes fixed-width data as-is (no compression)
26#[derive(Debug, Default)]
27pub struct ValueEncoder {}
28
29impl ValueEncoder {
30    /// Use the largest chunk we can smaller than 4KiB
31    fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
32        let mut size_bytes = 2 * bytes_per_word;
33        let (mut log_num_vals, mut num_vals) = match values_per_word {
34            1 => (1, 2),
35            8 => (3, 8),
36            _ => unreachable!(),
37        };
38
39        // If the type is so wide that we can't even fit 2 values we shouldn't be here
40        assert!(size_bytes < MAX_MINIBLOCK_BYTES);
41
42        while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
43            log_num_vals += 1;
44            size_bytes *= 2;
45            num_vals *= 2;
46        }
47
48        (log_num_vals, num_vals)
49    }
50
51    fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
52        // Usually there are X bytes per value.  However, when working with boolean
53        // or FSL<boolean> we might have some number of bits per value that isn't
54        // divisible by 8.  In this case, to avoid chunking in the middle of a byte
55        // we calculate how many 8-value words we can fit in a chunk.
56        let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
57            (data.bits_per_value / 8, 1)
58        } else {
59            (data.bits_per_value, 8)
60        };
61
62        // Aim for 4KiB chunks
63        let (log_vals_per_chunk, vals_per_chunk) =
64            Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
65        let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
66        debug_assert_eq!(vals_per_chunk % values_per_word, 0);
67        let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
68        let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
69        debug_assert!(bytes_per_chunk > 0);
70
71        let data_buffer = data.data;
72
73        let mut row_offset = 0;
74        let mut chunks = Vec::with_capacity(num_chunks);
75
76        let mut bytes_counter = 0;
77        loop {
78            if row_offset + vals_per_chunk <= data.num_values {
79                // We can make a full chunk
80                chunks.push(MiniBlockChunk {
81                    log_num_values: log_vals_per_chunk as u8,
82                    buffer_sizes: vec![bytes_per_chunk],
83                });
84                row_offset += vals_per_chunk;
85                bytes_counter += bytes_per_chunk as u64;
86            } else if row_offset < data.num_values {
87                // Final chunk, special values
88                let num_bytes = data_buffer.len() as u64 - bytes_counter;
89                let num_bytes = u16::try_from(num_bytes).unwrap();
90                chunks.push(MiniBlockChunk {
91                    log_num_values: 0,
92                    buffer_sizes: vec![num_bytes],
93                });
94                break;
95            } else {
96                // If we get here then all chunks were full chunks and we have no remainder chunk
97                break;
98            }
99        }
100
101        debug_assert_eq!(chunks.len(), num_chunks);
102
103        MiniBlockCompressed {
104            chunks,
105            data: vec![data_buffer],
106            num_values: data.num_values,
107        }
108    }
109}
110
111#[derive(Debug)]
112struct MiniblockFslLayer {
113    validity: Option<LanceBuffer>,
114    dimension: u64,
115}
116
117/// This impl deals with encoding FSL<FSL<...<FSL<FixedWidth>>>> data as a mini-block compressor.
118/// The tricky part of FSL data is that we want to include inner validity buffers (we don't want these
119/// to be part of the rep-def because that usually ends up being more expensive).
120///
121/// The resulting mini-block will, instead of having a single buffer, have X + 1 buffers where X is
122/// the number of FSL layers that contain validity.
123///
124/// In the simple case where there is no validity inside the FSL layers, all we are doing here is flattening
125/// the FSL layers into a single buffer.
126///
127/// Also: We don't allow a row to be broken across chunks.  This typically isn't too big of a deal since we
128/// are usually dealing with relatively small vectors if we are using mini-block.
129///
130/// Note: when we do have validity we have to make copies of the validity buffers because they are bit buffers
131/// and we need to bit slice them which requires copies or offsets.  Paying the price at write time to make
132/// the copies is better than paying the price at read time to do the bit slicing.
133impl ValueEncoder {
134    fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> CompressiveEncoding {
135        let mut encoding = ProtobufUtils21::flat(bits_per_value, None);
136        for layer in layers.iter().rev() {
137            let has_validity = layer.validity.is_some();
138            let dimension = layer.dimension;
139            encoding = ProtobufUtils21::fsl(dimension, has_validity, encoding);
140        }
141        encoding
142    }
143
144    fn extract_fsl_chunk(
145        data: &FixedWidthDataBlock,
146        layers: &[MiniblockFslLayer],
147        row_offset: usize,
148        num_rows: usize,
149        validity_buffers: &mut [Vec<u8>],
150    ) -> Vec<u16> {
151        let mut row_offset = row_offset;
152        let mut num_values = num_rows;
153        let mut buffer_counter = 0;
154        let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
155        for layer in layers {
156            row_offset *= layer.dimension as usize;
157            num_values *= layer.dimension as usize;
158            if let Some(validity) = &layer.validity {
159                let validity_slice = validity
160                    .clone()
161                    .bit_slice_le_with_length(row_offset, num_values);
162                validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
163                buffer_sizes.push(validity_slice.len() as u16);
164                buffer_counter += 1;
165            }
166        }
167
168        let bits_in_chunk = data.bits_per_value * num_values as u64;
169        let bytes_in_chunk = bits_in_chunk.div_ceil(8);
170        let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
171        debug_assert!(bytes_in_chunk > 0);
172        buffer_sizes.push(bytes_in_chunk);
173
174        buffer_sizes
175    }
176
177    fn chunk_fsl(
178        data: FixedWidthDataBlock,
179        layers: Vec<MiniblockFslLayer>,
180        num_rows: u64,
181    ) -> (MiniBlockCompressed, CompressiveEncoding) {
182        // Count size to calculate rows per chunk
183        let mut ceil_bytes_validity = 0;
184        let mut cum_dim = 1;
185        let mut num_validity_buffers = 0;
186        for layer in &layers {
187            cum_dim *= layer.dimension;
188            if layer.validity.is_some() {
189                ceil_bytes_validity += cum_dim.div_ceil(8);
190                num_validity_buffers += 1;
191            }
192        }
193        // It's an estimate because validity buffers may have some padding bits
194        let cum_bits_per_value = data.bits_per_value * cum_dim;
195        let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
196            (cum_bits_per_value / 8, 1)
197        } else {
198            (cum_bits_per_value, 8)
199        };
200        let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
201        let (log_rows_per_chunk, rows_per_chunk) =
202            Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
203
204        let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
205
206        // Allocate buffers for validity, these will be slightly bigger than the input validity buffers
207        let mut chunks = Vec::with_capacity(num_chunks);
208        let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
209        cum_dim = 1;
210        for layer in &layers {
211            cum_dim *= layer.dimension;
212            if let Some(validity) = &layer.validity {
213                let layer_bytes_validity = cum_dim.div_ceil(8);
214                let validity_with_padding =
215                    layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
216                debug_assert!(validity_with_padding >= validity.len());
217                validity_buffers.push(Vec::with_capacity(
218                    layer_bytes_validity as usize * num_chunks,
219                ));
220            }
221        }
222
223        // Now go through and extract validity buffers
224        let mut row_offset = 0;
225        while row_offset + rows_per_chunk <= num_rows {
226            let buffer_sizes = Self::extract_fsl_chunk(
227                &data,
228                &layers,
229                row_offset as usize,
230                rows_per_chunk as usize,
231                &mut validity_buffers,
232            );
233            row_offset += rows_per_chunk;
234            chunks.push(MiniBlockChunk {
235                log_num_values: log_rows_per_chunk as u8,
236                buffer_sizes,
237            })
238        }
239        let rows_in_chunk = num_rows - row_offset;
240        if rows_in_chunk > 0 {
241            let buffer_sizes = Self::extract_fsl_chunk(
242                &data,
243                &layers,
244                row_offset as usize,
245                rows_in_chunk as usize,
246                &mut validity_buffers,
247            );
248            chunks.push(MiniBlockChunk {
249                log_num_values: 0,
250                buffer_sizes,
251            });
252        }
253
254        let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
255        // Finally, add the data buffer
256        let buffers = validity_buffers
257            .into_iter()
258            .map(LanceBuffer::from)
259            .chain(std::iter::once(data.data))
260            .collect::<Vec<_>>();
261
262        (
263            MiniBlockCompressed {
264                chunks,
265                data: buffers,
266                num_values: num_rows,
267            },
268            encoding,
269        )
270    }
271
272    fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, CompressiveEncoding) {
273        let num_rows = data.num_values();
274        let fsl = data.as_fixed_size_list().unwrap();
275        let mut layers = Vec::new();
276        let mut child = *fsl.child;
277        let mut cur_layer = MiniblockFslLayer {
278            validity: None,
279            dimension: fsl.dimension,
280        };
281        loop {
282            if let DataBlock::Nullable(nullable) = child {
283                cur_layer.validity = Some(nullable.nulls);
284                child = *nullable.data;
285            }
286            match child {
287                DataBlock::FixedSizeList(inner) => {
288                    layers.push(cur_layer);
289                    cur_layer = MiniblockFslLayer {
290                        validity: None,
291                        dimension: inner.dimension,
292                    };
293                    child = *inner.child;
294                }
295                DataBlock::FixedWidth(inner) => {
296                    layers.push(cur_layer);
297                    return Self::chunk_fsl(inner, layers, num_rows);
298                }
299                _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
300            }
301        }
302    }
303}
304
305struct PerValueFslValidityIter {
306    buffer: LanceBuffer,
307    bits_per_row: usize,
308    offset: usize,
309}
310
311/// In this section we deal with per-value encoding of FSL<FSL<...<FSL<FixedWidth>>>> data.
312///
313/// It's easier than mini-block.  All we need to do is flatten the FSL layers into a single buffer.
314/// This includes any validity buffers we encounter on the way.
315impl ValueEncoder {
316    fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> CompressiveEncoding {
317        let mut inner = fsl.child.as_ref();
318        let mut has_validity = false;
319        inner = match inner {
320            DataBlock::Nullable(nullable) => {
321                has_validity = true;
322                nullable.data.as_ref()
323            }
324            DataBlock::AllNull(_) => {
325                return ProtobufUtils21::constant(None);
326            }
327            _ => inner,
328        };
329        let inner_encoding = match inner {
330            DataBlock::FixedWidth(fixed_width) => {
331                ProtobufUtils21::flat(fixed_width.bits_per_value, None)
332            }
333            DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
334            _ => unreachable!(
335                "Unexpected data block type in value encoder's fsl_to_encoding: {}",
336                inner.name()
337            ),
338        };
339        ProtobufUtils21::fsl(fsl.dimension, has_validity, inner_encoding)
340    }
341
342    fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
343        // The simple case is zero-copy, we just return the flattened inner buffer
344        let encoding = Self::fsl_to_encoding(&fsl);
345        let num_values = fsl.num_values();
346        let mut child = *fsl.child;
347        let mut cum_dim = 1;
348        loop {
349            cum_dim *= fsl.dimension;
350            match child {
351                DataBlock::Nullable(nullable) => {
352                    child = *nullable.data;
353                }
354                DataBlock::FixedSizeList(inner) => {
355                    child = *inner.child;
356                }
357                DataBlock::FixedWidth(inner) => {
358                    let data = FixedWidthDataBlock {
359                        bits_per_value: inner.bits_per_value * cum_dim,
360                        num_values,
361                        data: inner.data,
362                        block_info: BlockInfo::new(),
363                    };
364                    return (PerValueDataBlock::Fixed(data), encoding);
365                }
366                _ => unreachable!(
367                    "Unexpected data block type in value encoder's simple_per_value_fsl"
368                ),
369            }
370        }
371    }
372
373    fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
374        // If there are nullable inner values then we need to zip the validity with the values
375        let encoding = Self::fsl_to_encoding(&fsl);
376        let num_values = fsl.num_values();
377        let mut bytes_per_row = 0;
378        let mut cum_dim = 1;
379        let mut current = fsl;
380        let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
381        let data_bytes_per_row: usize;
382        let data_buffer: LanceBuffer;
383        loop {
384            cum_dim *= current.dimension;
385            let mut child = *current.child;
386            if let DataBlock::Nullable(nullable) = child {
387                // Each item will need this many bytes of validity prepended to it
388                bytes_per_row += cum_dim.div_ceil(8) as usize;
389                validity_iters.push(PerValueFslValidityIter {
390                    buffer: nullable.nulls,
391                    bits_per_row: cum_dim as usize,
392                    offset: 0,
393                });
394                child = *nullable.data;
395            };
396            match child {
397                DataBlock::FixedSizeList(inner) => {
398                    current = inner;
399                }
400                DataBlock::FixedWidth(fixed_width) => {
401                    data_bytes_per_row =
402                        (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
403                    bytes_per_row += data_bytes_per_row;
404                    data_buffer = fixed_width.data;
405                    break;
406                }
407                DataBlock::AllNull(_) => {
408                    data_bytes_per_row = 0;
409                    data_buffer = LanceBuffer::empty();
410                    break;
411                }
412                _ => unreachable!(
413                    "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
414                    child
415                ),
416            }
417        }
418
419        let bytes_needed = bytes_per_row * num_values as usize;
420        let mut zipped = Vec::with_capacity(bytes_needed);
421        let data_slice = &data_buffer;
422        // Hopefully values are pretty large so we don't iterate this loop _too_ many times
423        for i in 0..num_values as usize {
424            for validity in validity_iters.iter_mut() {
425                let validity_slice = validity
426                    .buffer
427                    .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
428                zipped.extend_from_slice(&validity_slice);
429                validity.offset += validity.bits_per_row;
430            }
431            let start = i * data_bytes_per_row;
432            let end = start + data_bytes_per_row;
433            zipped.extend_from_slice(&data_slice[start..end]);
434        }
435
436        let zipped = LanceBuffer::from(zipped);
437        let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
438            bits_per_value: bytes_per_row as u64 * 8,
439            num_values,
440            data: zipped,
441            block_info: BlockInfo::new(),
442        });
443        (data, encoding)
444    }
445
446    fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, CompressiveEncoding) {
447        if !fsl.child.is_nullable() {
448            Self::simple_per_value_fsl(fsl)
449        } else {
450            Self::nullable_per_value_fsl(fsl)
451        }
452    }
453}
454
455impl BlockCompressor for ValueEncoder {
456    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
457        let data = match data {
458            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
459            _ => unimplemented!(
460                "Cannot compress block of type {} with ValueEncoder",
461                data.name()
462            ),
463        };
464        Ok(data)
465    }
466}
467
468impl MiniBlockCompressor for ValueEncoder {
469    fn compress(&self, chunk: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
470        match chunk {
471            DataBlock::FixedWidth(fixed_width) => {
472                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
473                Ok((Self::chunk_data(fixed_width), encoding))
474            }
475            DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
476            _ => Err(Error::InvalidInput {
477                source: format!(
478                    "Cannot compress a data block of type {} with ValueEncoder",
479                    chunk.name()
480                )
481                .into(),
482                location: location!(),
483            }),
484        }
485    }
486}
487
488#[derive(Debug)]
489struct ValueFslDesc {
490    dimension: u64,
491    has_validity: bool,
492}
493
494/// A decompressor for fixed-width data that has
495/// been written, as-is, to disk in single contiguous array
496#[derive(Debug)]
497pub struct ValueDecompressor {
498    /// How many bits are in each inner-most item (e.g. FSL<Int32, 100> would be 32)
499    bits_per_item: u64,
500    /// How many bits are in each value (e.g. FSL<Int32, 100> would be 3200)
501    ///
502    /// This number is a little trickier to compute because we also have to include bytes
503    /// of any inner validity
504    bits_per_value: u64,
505    /// How many items are in each value (e.g. FSL<Int32, 100> would be 100)
506    items_per_value: u64,
507    layers: Vec<ValueFslDesc>,
508}
509
510impl ValueDecompressor {
511    pub fn from_flat(description: &pb21::Flat) -> Self {
512        Self {
513            bits_per_item: description.bits_per_value,
514            bits_per_value: description.bits_per_value,
515            items_per_value: 1,
516            layers: Vec::default(),
517        }
518    }
519
520    pub fn from_fsl(mut description: &pb21::FixedSizeList) -> Self {
521        let mut layers = Vec::new();
522        let mut cum_dim = 1;
523        let mut bytes_per_value = 0;
524        loop {
525            layers.push(ValueFslDesc {
526                has_validity: description.has_validity,
527                dimension: description.items_per_value,
528            });
529            cum_dim *= description.items_per_value;
530            if description.has_validity {
531                bytes_per_value += cum_dim.div_ceil(8);
532            }
533            match description
534                .values
535                .as_ref()
536                .unwrap()
537                .compression
538                .as_ref()
539                .unwrap()
540            {
541                Compression::FixedSizeList(inner) => {
542                    description = inner;
543                }
544                Compression::Flat(flat) => {
545                    let mut bits_per_value = bytes_per_value * 8;
546                    bits_per_value += flat.bits_per_value * cum_dim;
547                    return Self {
548                        bits_per_item: flat.bits_per_value,
549                        bits_per_value,
550                        items_per_value: cum_dim,
551                        layers,
552                    };
553                }
554                _ => unreachable!(),
555            }
556        }
557    }
558
559    fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
560        DataBlock::FixedWidth(FixedWidthDataBlock {
561            bits_per_value: self.bits_per_item,
562            num_values,
563            data,
564            block_info: BlockInfo::new(),
565        })
566    }
567}
568
569impl BlockDecompressor for ValueDecompressor {
570    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
571        let block = self.buffer_to_block(data, num_values);
572        assert_eq!(block.num_values(), num_values);
573        Ok(block)
574    }
575}
576
577impl MiniBlockDecompressor for ValueDecompressor {
578    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
579        let num_items = num_values * self.items_per_value;
580        let mut buffer_iter = data.into_iter().rev();
581
582        // Always at least 1 buffer
583        let data_buf = buffer_iter.next().unwrap();
584        let items = self.buffer_to_block(data_buf, num_items);
585        let mut lists = items;
586
587        for layer in self.layers.iter().rev() {
588            if layer.has_validity {
589                let validity_buf = buffer_iter.next().unwrap();
590                lists = DataBlock::Nullable(NullableDataBlock {
591                    data: Box::new(lists),
592                    nulls: validity_buf,
593                    block_info: BlockInfo::default(),
594                });
595            }
596            lists = DataBlock::FixedSizeList(FixedSizeListBlock {
597                child: Box::new(lists),
598                dimension: layer.dimension,
599            })
600        }
601
602        assert_eq!(lists.num_values(), num_values);
603        Ok(lists)
604    }
605}
606
607struct FslDecompressorValidityBuilder {
608    buffer: BooleanBufferBuilder,
609    bits_per_row: usize,
610    bytes_per_row: usize,
611}
612
613// Helper methods for per-value decompression
614impl ValueDecompressor {
615    fn has_validity(&self) -> bool {
616        self.layers.iter().any(|layer| layer.has_validity)
617    }
618
619    // If there is no validity then decompression is zero-copy, we just need to restore any FSL layers
620    fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
621        let mut cum_dim = 1;
622        for layer in &self.layers {
623            cum_dim *= layer.dimension;
624        }
625        debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
626        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
627            bits_per_value: self.bits_per_item,
628            num_values: num_rows * cum_dim,
629            data: data.data,
630            block_info: BlockInfo::new(),
631        });
632        for layer in self.layers.iter().rev() {
633            block = DataBlock::FixedSizeList(FixedSizeListBlock {
634                child: Box::new(block),
635                dimension: layer.dimension,
636            });
637        }
638        debug_assert_eq!(num_rows, block.num_values());
639        block
640    }
641
642    // If there is validity then it has been zipped in with the values and we must unzip it
643    fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
644        // No support for full-zip on per-value encodings
645        assert_eq!(self.bits_per_item % 8, 0);
646        let bytes_per_item = self.bits_per_item / 8;
647        let mut buffer_builders = Vec::with_capacity(self.layers.len());
648        let mut cum_dim = 1;
649        let mut total_size_bytes = 0;
650        // First, go through the layers, setup our builders, allocate space
651        for layer in &self.layers {
652            cum_dim *= layer.dimension as usize;
653            if layer.has_validity {
654                let validity_size_bits = cum_dim;
655                let validity_size_bytes = validity_size_bits.div_ceil(8);
656                total_size_bytes += num_rows * validity_size_bytes;
657                buffer_builders.push(FslDecompressorValidityBuilder {
658                    buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
659                    bits_per_row: cum_dim,
660                    bytes_per_row: validity_size_bytes,
661                })
662            }
663        }
664        let num_items = num_rows * cum_dim;
665        let data_size = num_items * bytes_per_item as usize;
666        total_size_bytes += data_size;
667        let mut data_buffer = Vec::with_capacity(data_size);
668
669        assert_eq!(data.data.len(), total_size_bytes);
670
671        let bytes_per_value = bytes_per_item as usize;
672        let data_bytes_per_row = bytes_per_value * cum_dim;
673
674        // Next, unzip
675        let mut data_offset = 0;
676        while data_offset < total_size_bytes {
677            for builder in buffer_builders.iter_mut() {
678                let start = data_offset * 8;
679                let end = start + builder.bits_per_row;
680                builder.buffer.append_packed_range(start..end, &data.data);
681                data_offset += builder.bytes_per_row;
682            }
683            let end = data_offset + data_bytes_per_row;
684            data_buffer.extend_from_slice(&data.data[data_offset..end]);
685            data_offset += data_bytes_per_row;
686        }
687
688        // Finally, restore the structure
689        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
690            bits_per_value: self.bits_per_item,
691            num_values: num_items as u64,
692            data: LanceBuffer::from(data_buffer),
693            block_info: BlockInfo::new(),
694        });
695
696        let mut validity_bufs = buffer_builders
697            .into_iter()
698            .rev()
699            .map(|mut b| LanceBuffer::from(b.buffer.finish().into_inner()));
700        for layer in self.layers.iter().rev() {
701            if layer.has_validity {
702                let nullable = NullableDataBlock {
703                    data: Box::new(block),
704                    nulls: validity_bufs.next().unwrap(),
705                    block_info: BlockInfo::new(),
706                };
707                block = DataBlock::Nullable(nullable);
708            }
709            block = DataBlock::FixedSizeList(FixedSizeListBlock {
710                child: Box::new(block),
711                dimension: layer.dimension,
712            });
713        }
714
715        assert_eq!(num_rows, block.num_values() as usize);
716
717        block
718    }
719}
720
721impl FixedPerValueDecompressor for ValueDecompressor {
722    fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
723        if self.has_validity() {
724            Ok(self.unzip_decompress(data, num_rows as usize))
725        } else {
726            Ok(self.simple_decompress(data, num_rows))
727        }
728    }
729
730    fn bits_per_value(&self) -> u64 {
731        self.bits_per_value
732    }
733}
734
735impl PerValueCompressor for ValueEncoder {
736    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
737        let (data, encoding) = match data {
738            DataBlock::FixedWidth(fixed_width) => {
739                let encoding = ProtobufUtils21::flat(fixed_width.bits_per_value, None);
740                (PerValueDataBlock::Fixed(fixed_width), encoding)
741            }
742            DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
743            _ => unimplemented!(
744                "Cannot compress block of type {} with ValueEncoder",
745                data.name()
746            ),
747        };
748        Ok((data, encoding))
749    }
750}
751
752// public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list
753#[cfg(test)]
754pub(crate) mod tests {
755    use std::{
756        collections::HashMap,
757        sync::{Arc, LazyLock},
758    };
759
760    use arrow_array::{
761        make_array, new_null_array, types::UInt32Type, Array, ArrayRef, Decimal128Array,
762        FixedSizeListArray, Int32Array, ListArray, UInt8Array,
763    };
764    use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
765    use arrow_schema::{DataType, Field, TimeUnit};
766    use lance_datagen::{array, gen_batch, ArrayGeneratorExt, Dimension, RowCount};
767
768    use crate::{
769        compression::{FixedPerValueDecompressor, MiniBlockDecompressor},
770        data::DataBlock,
771        encodings::{
772            logical::primitive::{
773                fullzip::{PerValueCompressor, PerValueDataBlock},
774                miniblock::MiniBlockCompressor,
775            },
776            physical::value::ValueDecompressor,
777        },
778        format::pb21::compressive_encoding::Compression,
779        testing::{
780            check_basic_random, check_round_trip_encoding_generated,
781            check_round_trip_encoding_of_data, FnArrayGeneratorProvider, TestCases,
782        },
783        version::LanceFileVersion,
784    };
785
786    use super::ValueEncoder;
787
788    const PRIMITIVE_TYPES: &[DataType] = &[
789        DataType::Null,
790        DataType::FixedSizeBinary(2),
791        DataType::Date32,
792        DataType::Date64,
793        DataType::Int8,
794        DataType::Int16,
795        DataType::Int32,
796        DataType::Int64,
797        DataType::UInt8,
798        DataType::UInt16,
799        DataType::UInt32,
800        DataType::UInt64,
801        DataType::Float16,
802        DataType::Float32,
803        DataType::Float64,
804        DataType::Decimal128(10, 10),
805        DataType::Decimal256(10, 10),
806        DataType::Timestamp(TimeUnit::Nanosecond, None),
807        DataType::Time32(TimeUnit::Second),
808        DataType::Time64(TimeUnit::Nanosecond),
809        DataType::Duration(TimeUnit::Second),
810        // The Interval type is supported by the reader but the writer works with Lance schema
811        // at the moment and Lance schema can't parse interval
812        // DataType::Interval(IntervalUnit::DayTime),
813    ];
814
815    #[test_log::test(tokio::test)]
816    async fn test_simple_value() {
817        let items = Arc::new(Int32Array::from(vec![
818            Some(0),
819            None,
820            Some(2),
821            Some(3),
822            Some(4),
823            Some(5),
824        ]));
825
826        let test_cases = TestCases::default()
827            .with_range(0..3)
828            .with_range(0..2)
829            .with_range(1..3)
830            .with_indices(vec![0, 1, 2])
831            .with_indices(vec![1])
832            .with_indices(vec![2])
833            .with_min_file_version(LanceFileVersion::V2_1);
834
835        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
836    }
837
838    #[test_log::test(tokio::test)]
839    async fn test_simple_range() {
840        let items = Arc::new(Int32Array::from_iter((0..5000).map(|i| {
841            if i % 2 == 0 {
842                Some(i)
843            } else {
844                None
845            }
846        })));
847
848        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
849
850        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
851    }
852
853    #[test_log::test(tokio::test)]
854    async fn test_value_primitive() {
855        for data_type in PRIMITIVE_TYPES {
856            log::info!("Testing encoding for {:?}", data_type);
857            let field = Field::new("", data_type.clone(), false);
858            check_basic_random(field).await;
859        }
860    }
861
862    static LARGE_TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
863        vec![DataType::FixedSizeList(
864            Arc::new(Field::new("", DataType::Int32, false)),
865            128,
866        )]
867    });
868
869    #[test_log::test(tokio::test)]
870    async fn test_large_primitive() {
871        for data_type in LARGE_TYPES.iter() {
872            log::info!("Testing encoding for {:?}", data_type);
873            let field = Field::new("", data_type.clone(), false);
874            check_basic_random(field).await;
875        }
876    }
877
878    #[test_log::test(tokio::test)]
879    async fn test_decimal128_dictionary_encoding() {
880        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
881        let decimals: Vec<i32> = (0..100).collect();
882        let repeated_strings: Vec<_> = decimals
883            .iter()
884            .cycle()
885            .take(decimals.len() * 10000)
886            .map(|&v| Some(v as i128))
887            .collect();
888        let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
889        check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
890    }
891
892    #[test_log::test(tokio::test)]
893    async fn test_miniblock_stress() {
894        // Tests for strange page sizes and batch sizes and validity scenarios for miniblock
895
896        // 10K integers, 100 per array, all valid
897        let data1 = (0..100)
898            .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
899            .collect::<Vec<_>>();
900
901        // Same as above but with mixed validity
902        let data2 = (0..100)
903            .map(|_| {
904                Arc::new(Int32Array::from_iter((0..100).map(|i| {
905                    if i % 2 == 0 {
906                        Some(i)
907                    } else {
908                        None
909                    }
910                }))) as Arc<dyn Array>
911            })
912            .collect::<Vec<_>>();
913
914        // Same as above but with all null for first half then all valid
915        // TODO: Re-enable once the all-null path is complete
916        let _data3 = (0..100)
917            .map(|chunk_idx| {
918                Arc::new(Int32Array::from_iter((0..100).map(|i| {
919                    if chunk_idx < 50 {
920                        None
921                    } else {
922                        Some(i)
923                    }
924                }))) as Arc<dyn Array>
925            })
926            .collect::<Vec<_>>();
927
928        for data in [data1, data2 /*data3*/] {
929            for batch_size in [10, 100, 1500, 15000] {
930                // 40000 bytes of data
931                let test_cases = TestCases::default()
932                    .with_page_sizes(vec![1000, 2000, 3000, 60000])
933                    .with_batch_size(batch_size)
934                    .with_min_file_version(LanceFileVersion::V2_1);
935
936                check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
937            }
938        }
939    }
940
941    fn create_simple_fsl() -> FixedSizeListArray {
942        // [[0, 1], NULL], [NULL, NULL], [[8, 9], [NULL, 11]]
943        let items = Arc::new(Int32Array::from(vec![
944            Some(0),
945            Some(1),
946            Some(2),
947            Some(3),
948            None,
949            None,
950            None,
951            None,
952            Some(8),
953            Some(9),
954            None,
955            Some(11),
956        ]));
957        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
958        let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
959        let inner_list = Arc::new(FixedSizeListArray::new(
960            items_field.clone(),
961            2,
962            items,
963            Some(NullBuffer::new(inner_list_nulls)),
964        ));
965        let inner_list_field = Arc::new(Field::new(
966            "item",
967            DataType::FixedSizeList(items_field, 2),
968            true,
969        ));
970        FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
971    }
972
973    #[test]
974    fn test_fsl_value_compression_miniblock() {
975        let sample_list = create_simple_fsl();
976
977        let starting_data = DataBlock::from_array(sample_list.clone());
978
979        let encoder = ValueEncoder::default();
980        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
981
982        assert_eq!(data.num_values, 3);
983        assert_eq!(data.data.len(), 3);
984        assert_eq!(data.chunks.len(), 1);
985        assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
986        assert_eq!(data.chunks[0].log_num_values, 0);
987
988        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
989            panic!()
990        };
991
992        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
993
994        let decompressed =
995            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
996
997        let decompressed = make_array(
998            decompressed
999                .into_arrow(sample_list.data_type().clone(), true)
1000                .unwrap(),
1001        );
1002
1003        assert_eq!(decompressed.as_ref(), &sample_list);
1004    }
1005
1006    #[test]
1007    fn test_fsl_value_compression_per_value() {
1008        let sample_list = create_simple_fsl();
1009
1010        let starting_data = DataBlock::from_array(sample_list.clone());
1011
1012        let encoder = ValueEncoder::default();
1013        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1014
1015        let PerValueDataBlock::Fixed(data) = data else {
1016            panic!()
1017        };
1018
1019        assert_eq!(data.bits_per_value, 144);
1020        assert_eq!(data.num_values, 3);
1021        assert_eq!(data.data.len(), 18 * 3);
1022
1023        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1024            panic!()
1025        };
1026
1027        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1028
1029        let num_values = data.num_values;
1030        let decompressed =
1031            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1032
1033        let decompressed = make_array(
1034            decompressed
1035                .into_arrow(sample_list.data_type().clone(), true)
1036                .unwrap(),
1037        );
1038
1039        assert_eq!(decompressed.as_ref(), &sample_list);
1040    }
1041
1042    #[test_log::test(tokio::test)]
1043    async fn test_fsl_all_null() {
1044        let items = new_null_array(&DataType::Int32, 12);
1045        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1046        let list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1047        let list_array =
1048            FixedSizeListArray::new(items_field, 2, items, Some(NullBuffer::new(list_nulls)));
1049
1050        let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
1051
1052        check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1053            .await;
1054    }
1055
1056    #[test_log::test(tokio::test)]
1057    async fn regress_list_fsl() {
1058        // This regresses a case where rows are large lists that span multiple
1059        // mini-block chunks which gives us some all-premable mini-block chunks.
1060        let offsets = ScalarBuffer::<i32>::from(vec![0, 393, 755, 1156, 1536]);
1061        let data = UInt8Array::from(vec![0; 1536 * 16]);
1062        let fsl_field = Arc::new(Field::new("item", DataType::UInt8, true));
1063        let fsl = FixedSizeListArray::new(fsl_field, 16, Arc::new(data), None);
1064        let list_field = Arc::new(Field::new("item", fsl.data_type().clone(), false));
1065        let list_arr = ListArray::new(list_field, OffsetBuffer::new(offsets), Arc::new(fsl), None);
1066
1067        let test_cases = TestCases::default()
1068            .with_min_file_version(LanceFileVersion::V2_1)
1069            .with_batch_size(1);
1070
1071        check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1072            .await;
1073    }
1074
1075    fn create_random_fsl() -> Arc<dyn Array> {
1076        // Several levels of def and multiple pages
1077        let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1078        let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1079        let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1080        let list_three = array::cycle_vec(list_two, Dimension::from(2));
1081
1082        // Should be 256Ki rows ~ 1MiB of data
1083        let batch = gen_batch()
1084            .anon_col(list_three)
1085            .into_batch_rows(RowCount::from(8 * 1024))
1086            .unwrap();
1087        batch.column(0).clone()
1088    }
1089
1090    #[test]
1091    fn fsl_value_miniblock_stress() {
1092        let sample_array = create_random_fsl();
1093
1094        let starting_data = DataBlock::from_arrays(
1095            std::slice::from_ref(&sample_array),
1096            sample_array.len() as u64,
1097        );
1098
1099        let encoder = ValueEncoder::default();
1100        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1101
1102        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1103            panic!()
1104        };
1105
1106        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1107
1108        let decompressed =
1109            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1110
1111        let decompressed = make_array(
1112            decompressed
1113                .into_arrow(sample_array.data_type().clone(), true)
1114                .unwrap(),
1115        );
1116
1117        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1118    }
1119
1120    #[test]
1121    fn fsl_value_per_value_stress() {
1122        let sample_array = create_random_fsl();
1123
1124        let starting_data = DataBlock::from_arrays(
1125            std::slice::from_ref(&sample_array),
1126            sample_array.len() as u64,
1127        );
1128
1129        let encoder = ValueEncoder::default();
1130        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1131
1132        let Compression::FixedSizeList(fsl) = compression.compression.unwrap() else {
1133            panic!()
1134        };
1135
1136        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1137
1138        let PerValueDataBlock::Fixed(data) = data else {
1139            panic!()
1140        };
1141
1142        let num_values = data.num_values;
1143        let decompressed =
1144            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1145
1146        let decompressed = make_array(
1147            decompressed
1148                .into_arrow(sample_array.data_type().clone(), true)
1149                .unwrap(),
1150        );
1151
1152        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1153    }
1154
1155    #[test_log::test(tokio::test)]
1156    async fn test_fsl_nullable_items() {
1157        let datagen = Box::new(FnArrayGeneratorProvider::new(move || {
1158            lance_datagen::array::rand_vec_nullable::<UInt32Type>(Dimension::from(128), 0.5)
1159        }));
1160
1161        let field = Field::new(
1162            "",
1163            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::UInt32, true)), 128),
1164            false,
1165        );
1166        check_round_trip_encoding_generated(field, datagen, TestCases::default()).await;
1167    }
1168
1169    #[test_log::test(tokio::test)]
1170    async fn test_value_encoding_verification() {
1171        use std::collections::HashMap;
1172
1173        let test_cases = TestCases::default()
1174            .with_expected_encoding("flat")
1175            .with_min_file_version(LanceFileVersion::V2_1);
1176
1177        // Test both explicit configuration and automatic fallback scenarios
1178        // 1. Test explicit "none" compression to force flat encoding
1179        // Also explicitly disable BSS to ensure value encoding is tested
1180        let mut metadata_explicit = HashMap::new();
1181        metadata_explicit.insert("lance-encoding:compression".to_string(), "none".to_string());
1182        metadata_explicit.insert("lance-encoding:bss".to_string(), "off".to_string());
1183
1184        let arr_explicit =
1185            Arc::new(Int32Array::from((0..1000).collect::<Vec<i32>>())) as Arc<dyn Array>;
1186        check_round_trip_encoding_of_data(vec![arr_explicit], &test_cases, metadata_explicit).await;
1187
1188        // 2. Test automatic fallback to flat encoding when bitpacking conditions aren't met
1189        // Use unique values to avoid RLE encoding
1190        // Explicitly disable BSS to ensure value encoding is tested
1191        let mut metadata = HashMap::new();
1192        metadata.insert("lance-encoding:bss".to_string(), "off".to_string());
1193
1194        let arr_fallback = Arc::new(Int32Array::from(
1195            (0..100).map(|i| i * 73 + 19).collect::<Vec<i32>>(),
1196        )) as Arc<dyn Array>;
1197        check_round_trip_encoding_of_data(vec![arr_fallback], &test_cases, metadata).await;
1198    }
1199
1200    #[test_log::test(tokio::test)]
1201    async fn test_mixed_page_validity() {
1202        let no_nulls = Arc::new(Int32Array::from_iter_values([1, 2]));
1203        let has_nulls = Arc::new(Int32Array::from_iter([Some(3), None, Some(5)]));
1204
1205        let test_cases = TestCases::default().with_page_sizes(vec![1]);
1206        check_round_trip_encoding_of_data(vec![no_nulls, has_nulls], &test_cases, HashMap::new())
1207            .await;
1208    }
1209}