lance_encoding/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_buffer::{bit_util, BooleanBufferBuilder};
5use arrow_schema::DataType;
6use bytes::Bytes;
7use futures::{future::BoxFuture, FutureExt};
8use log::trace;
9use snafu::location;
10use std::ops::Range;
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::LanceBuffer;
14use crate::data::{
15    BlockInfo, ConstantDataBlock, DataBlock, FixedSizeListBlock, FixedWidthDataBlock,
16    NullableDataBlock,
17};
18use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor};
19use crate::encoder::{
20    BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
21    PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES,
22};
23use crate::format::pb::{self, ArrayEncoding};
24use crate::format::ProtobufUtils;
25use crate::{
26    decoder::{PageScheduler, PrimitivePageDecoder},
27    encoder::{ArrayEncoder, EncodedArray},
28    EncodingsIo,
29};
30
31use lance_core::{Error, Result};
32
33use super::block_compress::{CompressionConfig, CompressionScheme, GeneralBufferCompressor};
34
35/// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk
36#[derive(Debug, Clone, Copy)]
37pub struct ValuePageScheduler {
38    // TODO: do we really support values greater than 2^32 bytes per value?
39    // I think we want to, in theory, but will need to test this case.
40    bytes_per_value: u64,
41    buffer_offset: u64,
42    buffer_size: u64,
43    compression_config: CompressionConfig,
44}
45
46impl ValuePageScheduler {
47    pub fn new(
48        bytes_per_value: u64,
49        buffer_offset: u64,
50        buffer_size: u64,
51        compression_config: CompressionConfig,
52    ) -> Self {
53        Self {
54            bytes_per_value,
55            buffer_offset,
56            buffer_size,
57            compression_config,
58        }
59    }
60}
61
62impl PageScheduler for ValuePageScheduler {
63    fn schedule_ranges(
64        &self,
65        ranges: &[std::ops::Range<u64>],
66        scheduler: &Arc<dyn EncodingsIo>,
67        top_level_row: u64,
68    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
69        let (mut min, mut max) = (u64::MAX, 0);
70        let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
71            ranges
72                .iter()
73                .map(|range| {
74                    let start = self.buffer_offset + (range.start * self.bytes_per_value);
75                    let end = self.buffer_offset + (range.end * self.bytes_per_value);
76                    min = min.min(start);
77                    max = max.max(end);
78                    start..end
79                })
80                .collect::<Vec<_>>()
81        } else {
82            min = self.buffer_offset;
83            max = self.buffer_offset + self.buffer_size;
84            // for compressed page, the ranges are always the entire page,
85            // and it is guaranteed that only one range is passed
86            vec![Range {
87                start: min,
88                end: max,
89            }]
90        };
91
92        trace!(
93            "Scheduling I/O for {} ranges spread across byte range {}..{}",
94            byte_ranges.len(),
95            min,
96            max
97        );
98        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
99        let bytes_per_value = self.bytes_per_value;
100
101        let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
102            ranges
103                .iter()
104                .map(|range| {
105                    let start = (range.start * bytes_per_value) as usize;
106                    let end = (range.end * bytes_per_value) as usize;
107                    start..end
108                })
109                .collect::<Vec<_>>()
110        } else {
111            vec![]
112        };
113
114        let compression_config = self.compression_config;
115        async move {
116            let bytes = bytes.await?;
117
118            Ok(Box::new(ValuePageDecoder {
119                bytes_per_value,
120                data: bytes,
121                uncompressed_data: Arc::new(Mutex::new(None)),
122                uncompressed_range_offsets: range_offsets,
123                compression_config,
124            }) as Box<dyn PrimitivePageDecoder>)
125        }
126        .boxed()
127    }
128}
129
130struct ValuePageDecoder {
131    bytes_per_value: u64,
132    data: Vec<Bytes>,
133    uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
134    uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
135    compression_config: CompressionConfig,
136}
137
138impl ValuePageDecoder {
139    fn decompress(&self) -> Result<Vec<Bytes>> {
140        // for compressed page, it is guaranteed that only one range is passed
141        let bytes_u8: Vec<u8> = self.data[0].to_vec();
142        let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config);
143        let mut uncompressed_bytes: Vec<u8> = Vec::new();
144        buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
145
146        let mut bytes_in_ranges: Vec<Bytes> =
147            Vec::with_capacity(self.uncompressed_range_offsets.len());
148        for range in &self.uncompressed_range_offsets {
149            let start = range.start;
150            let end = range.end;
151            bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
152        }
153        Ok(bytes_in_ranges)
154    }
155
156    fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
157        let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
158        if uncompressed_bytes.is_none() {
159            *uncompressed_bytes = Some(self.decompress()?);
160        }
161        Ok(Arc::clone(&self.uncompressed_data))
162    }
163
164    fn is_compressed(&self) -> bool {
165        !self.uncompressed_range_offsets.is_empty()
166    }
167
168    fn decode_buffers<'a>(
169        &'a self,
170        buffers: impl IntoIterator<Item = &'a Bytes>,
171        mut bytes_to_skip: u64,
172        mut bytes_to_take: u64,
173    ) -> LanceBuffer {
174        let mut dest: Option<Vec<u8>> = None;
175
176        for buf in buffers.into_iter() {
177            let buf_len = buf.len() as u64;
178            if bytes_to_skip > buf_len {
179                bytes_to_skip -= buf_len;
180            } else {
181                let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
182                bytes_to_take -= bytes_to_take_here;
183                let start = bytes_to_skip as usize;
184                let end = start + bytes_to_take_here as usize;
185                let slice = buf.slice(start..end);
186                match (&mut dest, bytes_to_take) {
187                    (None, 0) => {
188                        // The entire request is contained in one buffer so we can maybe zero-copy
189                        // if the slice is aligned properly
190                        return LanceBuffer::from_bytes(slice, self.bytes_per_value);
191                    }
192                    (None, _) => {
193                        dest.replace(Vec::with_capacity(bytes_to_take as usize));
194                    }
195                    _ => {}
196                }
197                dest.as_mut().unwrap().extend_from_slice(&slice);
198                bytes_to_skip = 0;
199            }
200        }
201        LanceBuffer::from(dest.unwrap_or_default())
202    }
203}
204
205impl PrimitivePageDecoder for ValuePageDecoder {
206    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
207        let bytes_to_skip = rows_to_skip * self.bytes_per_value;
208        let bytes_to_take = num_rows * self.bytes_per_value;
209
210        let data_buffer = if self.is_compressed() {
211            let decoding_data = self.get_uncompressed_bytes()?;
212            let buffers = decoding_data.lock().unwrap();
213            self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
214        } else {
215            self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
216        };
217        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
218            bits_per_value: self.bytes_per_value * 8,
219            data: data_buffer,
220            num_values: num_rows,
221            block_info: BlockInfo::new(),
222        }))
223    }
224}
225
226/// A compression strategy that writes fixed-width data as-is (no compression)
227#[derive(Debug, Default)]
228pub struct ValueEncoder {}
229
230impl ValueEncoder {
231    /// Use the largest chunk we can smaller than 4KiB
232    fn find_log_vals_per_chunk(bytes_per_word: u64, values_per_word: u64) -> (u64, u64) {
233        let mut size_bytes = 2 * bytes_per_word;
234        let (mut log_num_vals, mut num_vals) = match values_per_word {
235            1 => (1, 2),
236            8 => (3, 8),
237            _ => unreachable!(),
238        };
239
240        // If the type is so wide that we can't even fit 2 values we shouldn't be here
241        assert!(size_bytes < MAX_MINIBLOCK_BYTES);
242
243        while 2 * size_bytes < MAX_MINIBLOCK_BYTES && 2 * num_vals <= MAX_MINIBLOCK_VALUES {
244            log_num_vals += 1;
245            size_bytes *= 2;
246            num_vals *= 2;
247        }
248
249        (log_num_vals, num_vals)
250    }
251
252    fn chunk_data(data: FixedWidthDataBlock) -> MiniBlockCompressed {
253        // Usually there are X bytes per value.  However, when working with boolean
254        // or FSL<boolean> we might have some number of bits per value that isn't
255        // divisible by 8.  In this case, to avoid chunking in the middle of a byte
256        // we calculate how many 8-value words we can fit in a chunk.
257        let (bytes_per_word, values_per_word) = if data.bits_per_value % 8 == 0 {
258            (data.bits_per_value / 8, 1)
259        } else {
260            (data.bits_per_value, 8)
261        };
262
263        // Aim for 4KiB chunks
264        let (log_vals_per_chunk, vals_per_chunk) =
265            Self::find_log_vals_per_chunk(bytes_per_word, values_per_word);
266        let num_chunks = bit_util::ceil(data.num_values as usize, vals_per_chunk as usize);
267        debug_assert_eq!(vals_per_chunk % values_per_word, 0);
268        let bytes_per_chunk = bytes_per_word * (vals_per_chunk / values_per_word);
269        let bytes_per_chunk = u16::try_from(bytes_per_chunk).unwrap();
270
271        let data_buffer = data.data;
272
273        let mut row_offset = 0;
274        let mut chunks = Vec::with_capacity(num_chunks);
275
276        let mut bytes_counter = 0;
277        loop {
278            if row_offset + vals_per_chunk <= data.num_values {
279                chunks.push(MiniBlockChunk {
280                    log_num_values: log_vals_per_chunk as u8,
281                    buffer_sizes: vec![bytes_per_chunk],
282                });
283                row_offset += vals_per_chunk;
284                bytes_counter += bytes_per_chunk as u64;
285            } else {
286                // Final chunk, special values
287                let num_bytes = data_buffer.len() as u64 - bytes_counter;
288                let num_bytes = u16::try_from(num_bytes).unwrap();
289                chunks.push(MiniBlockChunk {
290                    log_num_values: 0,
291                    buffer_sizes: vec![num_bytes],
292                });
293                break;
294            }
295        }
296
297        MiniBlockCompressed {
298            chunks,
299            data: vec![data_buffer],
300            num_values: data.num_values,
301        }
302    }
303}
304
305#[derive(Debug)]
306struct MiniblockFslLayer {
307    validity: Option<LanceBuffer>,
308    dimension: u64,
309}
310
311/// This impl deals with encoding FSL<FSL<...<FSL<FixedWidth>>>> data as a mini-block compressor.
312/// The tricky part of FSL data is that we want to include inner validity buffers (we don't want these
313/// to be part of the rep-def because that usually ends up being more expensive).
314///
315/// The resulting mini-block will, instead of having a single buffer, have X + 1 buffers where X is
316/// the number of FSL layers that contain validity.
317///
318/// In the simple case where there is no validity inside the FSL layers, all we are doing here is flattening
319/// the FSL layers into a single buffer.
320///
321/// Also: We don't allow a row to be broken across chunks.  This typically isn't too big of a deal since we
322/// are usually dealing with relatively small vectors if we are using mini-block.
323///
324/// Note: when we do have validity we have to make copies of the validity buffers because they are bit buffers
325/// and we need to bit slice them which requires copies or offsets.  Paying the price at write time to make
326/// the copies is better than paying the price at read time to do the bit slicing.
327impl ValueEncoder {
328    fn make_fsl_encoding(layers: &[MiniblockFslLayer], bits_per_value: u64) -> ArrayEncoding {
329        let mut encoding = ProtobufUtils::flat_encoding(bits_per_value, 0, None);
330        for layer in layers.iter().rev() {
331            let has_validity = layer.validity.is_some();
332            let dimension = layer.dimension;
333            encoding = ProtobufUtils::fsl_encoding(dimension, encoding, has_validity);
334        }
335        encoding
336    }
337
338    fn extract_fsl_chunk(
339        data: &FixedWidthDataBlock,
340        layers: &[MiniblockFslLayer],
341        row_offset: usize,
342        num_rows: usize,
343        validity_buffers: &mut [Vec<u8>],
344    ) -> Vec<u16> {
345        let mut row_offset = row_offset;
346        let mut num_values = num_rows;
347        let mut buffer_counter = 0;
348        let mut buffer_sizes = Vec::with_capacity(validity_buffers.len() + 1);
349        for layer in layers {
350            row_offset *= layer.dimension as usize;
351            num_values *= layer.dimension as usize;
352            if let Some(validity) = &layer.validity {
353                let validity_slice = validity
354                    .try_clone()
355                    .unwrap()
356                    .bit_slice_le_with_length(row_offset, num_values);
357                validity_buffers[buffer_counter].extend_from_slice(&validity_slice);
358                buffer_sizes.push(validity_slice.len() as u16);
359                buffer_counter += 1;
360            }
361        }
362
363        let bits_in_chunk = data.bits_per_value * num_values as u64;
364        let bytes_in_chunk = bits_in_chunk.div_ceil(8);
365        let bytes_in_chunk = u16::try_from(bytes_in_chunk).unwrap();
366        buffer_sizes.push(bytes_in_chunk);
367
368        buffer_sizes
369    }
370
371    fn chunk_fsl(
372        data: FixedWidthDataBlock,
373        layers: Vec<MiniblockFslLayer>,
374        num_rows: u64,
375    ) -> (MiniBlockCompressed, ArrayEncoding) {
376        // Count size to calculate rows per chunk
377        let mut ceil_bytes_validity = 0;
378        let mut cum_dim = 1;
379        let mut num_validity_buffers = 0;
380        for layer in &layers {
381            cum_dim *= layer.dimension;
382            if layer.validity.is_some() {
383                ceil_bytes_validity += cum_dim.div_ceil(8);
384                num_validity_buffers += 1;
385            }
386        }
387        // It's an estimate because validity buffers may have some padding bits
388        let cum_bits_per_value = data.bits_per_value * cum_dim;
389        let (cum_bytes_per_word, vals_per_word) = if cum_bits_per_value % 8 == 0 {
390            (cum_bits_per_value / 8, 1)
391        } else {
392            (cum_bits_per_value, 8)
393        };
394        let est_bytes_per_word = (ceil_bytes_validity * vals_per_word) + cum_bytes_per_word;
395        let (log_rows_per_chunk, rows_per_chunk) =
396            Self::find_log_vals_per_chunk(est_bytes_per_word, vals_per_word);
397
398        let num_chunks = num_rows.div_ceil(rows_per_chunk) as usize;
399
400        // Allocate buffers for validity, these will be slightly bigger than the input validity buffers
401        let mut chunks = Vec::with_capacity(num_chunks);
402        let mut validity_buffers: Vec<Vec<u8>> = Vec::with_capacity(num_validity_buffers);
403        cum_dim = 1;
404        for layer in &layers {
405            cum_dim *= layer.dimension;
406            if let Some(validity) = &layer.validity {
407                let layer_bytes_validity = cum_dim.div_ceil(8);
408                let validity_with_padding =
409                    layer_bytes_validity as usize * num_chunks * rows_per_chunk as usize;
410                debug_assert!(validity_with_padding >= validity.len());
411                validity_buffers.push(Vec::with_capacity(
412                    layer_bytes_validity as usize * num_chunks,
413                ));
414            }
415        }
416
417        // Now go through and extract validity buffers
418        let mut row_offset = 0;
419        while row_offset + rows_per_chunk <= num_rows {
420            let buffer_sizes = Self::extract_fsl_chunk(
421                &data,
422                &layers,
423                row_offset as usize,
424                rows_per_chunk as usize,
425                &mut validity_buffers,
426            );
427            row_offset += rows_per_chunk;
428            chunks.push(MiniBlockChunk {
429                log_num_values: log_rows_per_chunk as u8,
430                buffer_sizes,
431            })
432        }
433        let rows_in_chunk = num_rows - row_offset;
434        if rows_in_chunk > 0 {
435            let buffer_sizes = Self::extract_fsl_chunk(
436                &data,
437                &layers,
438                row_offset as usize,
439                rows_in_chunk as usize,
440                &mut validity_buffers,
441            );
442            chunks.push(MiniBlockChunk {
443                log_num_values: 0,
444                buffer_sizes,
445            });
446        }
447
448        let encoding = Self::make_fsl_encoding(&layers, data.bits_per_value);
449        // Finally, add the data buffer
450        let buffers = validity_buffers
451            .into_iter()
452            .map(LanceBuffer::Owned)
453            .chain(std::iter::once(data.data))
454            .collect::<Vec<_>>();
455
456        (
457            MiniBlockCompressed {
458                chunks,
459                data: buffers,
460                num_values: num_rows,
461            },
462            encoding,
463        )
464    }
465
466    fn miniblock_fsl(data: DataBlock) -> (MiniBlockCompressed, ArrayEncoding) {
467        let num_rows = data.num_values();
468        let fsl = data.as_fixed_size_list().unwrap();
469        let mut layers = Vec::new();
470        let mut child = *fsl.child;
471        let mut cur_layer = MiniblockFslLayer {
472            validity: None,
473            dimension: fsl.dimension,
474        };
475        loop {
476            if let DataBlock::Nullable(nullable) = child {
477                cur_layer.validity = Some(nullable.nulls);
478                child = *nullable.data;
479            }
480            match child {
481                DataBlock::FixedSizeList(inner) => {
482                    layers.push(cur_layer);
483                    cur_layer = MiniblockFslLayer {
484                        validity: None,
485                        dimension: inner.dimension,
486                    };
487                    child = *inner.child;
488                }
489                DataBlock::FixedWidth(inner) => {
490                    layers.push(cur_layer);
491                    return Self::chunk_fsl(inner, layers, num_rows);
492                }
493                _ => unreachable!("Unexpected data block type in value encoder's miniblock_fsl"),
494            }
495        }
496    }
497}
498
499struct PerValueFslValidityIter {
500    buffer: LanceBuffer,
501    bits_per_row: usize,
502    offset: usize,
503}
504
505/// In this section we deal with per-value encoding of FSL<FSL<...<FSL<FixedWidth>>>> data.
506///
507/// It's easier than mini-block.  All we need to do is flatten the FSL layers into a single buffer.
508/// This includes any validity buffers we encounter on the way.
509impl ValueEncoder {
510    fn fsl_to_encoding(fsl: &FixedSizeListBlock) -> ArrayEncoding {
511        let mut inner = fsl.child.as_ref();
512        let mut has_validity = false;
513        inner = match inner {
514            DataBlock::Nullable(nullable) => {
515                has_validity = true;
516                nullable.data.as_ref()
517            }
518            _ => inner,
519        };
520        let inner_encoding = match inner {
521            DataBlock::FixedWidth(fixed_width) => {
522                ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None)
523            }
524            DataBlock::FixedSizeList(inner) => Self::fsl_to_encoding(inner),
525            _ => unreachable!("Unexpected data block type in value encoder's fsl_to_encoding"),
526        };
527        ProtobufUtils::fsl_encoding(fsl.dimension, inner_encoding, has_validity)
528    }
529
530    fn simple_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
531        // The simple case is zero-copy, we just return the flattened inner buffer
532        let encoding = Self::fsl_to_encoding(&fsl);
533        let num_values = fsl.num_values();
534        let mut child = *fsl.child;
535        let mut cum_dim = 1;
536        loop {
537            cum_dim *= fsl.dimension;
538            match child {
539                DataBlock::Nullable(nullable) => {
540                    child = *nullable.data;
541                }
542                DataBlock::FixedSizeList(inner) => {
543                    child = *inner.child;
544                }
545                DataBlock::FixedWidth(inner) => {
546                    let data = FixedWidthDataBlock {
547                        bits_per_value: inner.bits_per_value * cum_dim,
548                        num_values,
549                        data: inner.data,
550                        block_info: BlockInfo::new(),
551                    };
552                    return (PerValueDataBlock::Fixed(data), encoding);
553                }
554                _ => unreachable!(
555                    "Unexpected data block type in value encoder's simple_per_value_fsl"
556                ),
557            }
558        }
559    }
560
561    fn nullable_per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
562        // If there are nullable inner values then we need to zip the validity with the values
563        let encoding = Self::fsl_to_encoding(&fsl);
564        let num_values = fsl.num_values();
565        let mut bytes_per_row = 0;
566        let mut cum_dim = 1;
567        let mut current = fsl;
568        let mut validity_iters: Vec<PerValueFslValidityIter> = Vec::new();
569        let data_bytes_per_row: usize;
570        let data_buffer: LanceBuffer;
571        loop {
572            cum_dim *= current.dimension;
573            let mut child = *current.child;
574            if let DataBlock::Nullable(nullable) = child {
575                // Each item will need this many bytes of validity prepended to it
576                bytes_per_row += cum_dim.div_ceil(8) as usize;
577                validity_iters.push(PerValueFslValidityIter {
578                    buffer: nullable.nulls,
579                    bits_per_row: cum_dim as usize,
580                    offset: 0,
581                });
582                child = *nullable.data;
583            };
584            match child {
585                DataBlock::FixedSizeList(inner) => {
586                    current = inner;
587                }
588                DataBlock::FixedWidth(fixed_width) => {
589                    data_bytes_per_row =
590                        (fixed_width.bits_per_value.div_ceil(8) * cum_dim) as usize;
591                    bytes_per_row += data_bytes_per_row;
592                    data_buffer = fixed_width.data;
593                    break;
594                }
595                _ => unreachable!(
596                    "Unexpected data block type in value encoder's nullable_per_value_fsl: {:?}",
597                    child
598                ),
599            }
600        }
601
602        let bytes_needed = bytes_per_row * num_values as usize;
603        let mut zipped = Vec::with_capacity(bytes_needed);
604        let data_slice = &data_buffer;
605        // Hopefully values are pretty large so we don't iterate this loop _too_ many times
606        for i in 0..num_values as usize {
607            for validity in validity_iters.iter_mut() {
608                let validity_slice = validity
609                    .buffer
610                    .bit_slice_le_with_length(validity.offset, validity.bits_per_row);
611                zipped.extend_from_slice(&validity_slice);
612                validity.offset += validity.bits_per_row;
613            }
614            let start = i * data_bytes_per_row;
615            let end = start + data_bytes_per_row;
616            zipped.extend_from_slice(&data_slice[start..end]);
617        }
618
619        let zipped = LanceBuffer::Owned(zipped);
620        let data = PerValueDataBlock::Fixed(FixedWidthDataBlock {
621            bits_per_value: bytes_per_row as u64 * 8,
622            num_values,
623            data: zipped,
624            block_info: BlockInfo::new(),
625        });
626        (data, encoding)
627    }
628
629    fn per_value_fsl(fsl: FixedSizeListBlock) -> (PerValueDataBlock, ArrayEncoding) {
630        if !fsl.child.is_nullable() {
631            Self::simple_per_value_fsl(fsl)
632        } else {
633            Self::nullable_per_value_fsl(fsl)
634        }
635    }
636}
637
638impl BlockCompressor for ValueEncoder {
639    fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
640        let data = match data {
641            DataBlock::FixedWidth(fixed_width) => fixed_width.data,
642            _ => unimplemented!(
643                "Cannot compress block of type {} with ValueEncoder",
644                data.name()
645            ),
646        };
647        Ok(data)
648    }
649}
650
651impl ArrayEncoder for ValueEncoder {
652    fn encode(
653        &self,
654        data: DataBlock,
655        _data_type: &DataType,
656        buffer_index: &mut u32,
657    ) -> Result<EncodedArray> {
658        let index = *buffer_index;
659        *buffer_index += 1;
660
661        let encoding = match &data {
662            DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
663                fixed_width.bits_per_value,
664                index,
665                None,
666            )),
667            _ => Err(Error::InvalidInput {
668                source: format!(
669                    "Cannot encode a data block of type {} with ValueEncoder",
670                    data.name()
671                )
672                .into(),
673                location: location!(),
674            }),
675        }?;
676        Ok(EncodedArray { data, encoding })
677    }
678}
679
680impl MiniBlockCompressor for ValueEncoder {
681    fn compress(
682        &self,
683        chunk: DataBlock,
684    ) -> Result<(
685        crate::encoder::MiniBlockCompressed,
686        crate::format::pb::ArrayEncoding,
687    )> {
688        match chunk {
689            DataBlock::FixedWidth(fixed_width) => {
690                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
691                Ok((Self::chunk_data(fixed_width), encoding))
692            }
693            DataBlock::FixedSizeList(_) => Ok(Self::miniblock_fsl(chunk)),
694            _ => Err(Error::InvalidInput {
695                source: format!(
696                    "Cannot compress a data block of type {} with ValueEncoder",
697                    chunk.name()
698                )
699                .into(),
700                location: location!(),
701            }),
702        }
703    }
704}
705
706/// A decompressor for constant-encoded data
707#[derive(Debug)]
708pub struct ConstantDecompressor {
709    scalar: LanceBuffer,
710}
711
712impl ConstantDecompressor {
713    pub fn new(scalar: LanceBuffer) -> Self {
714        Self {
715            scalar: scalar.into_borrowed(),
716        }
717    }
718}
719
720impl BlockDecompressor for ConstantDecompressor {
721    fn decompress(&self, _data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
722        Ok(DataBlock::Constant(ConstantDataBlock {
723            data: self.scalar.try_clone().unwrap(),
724            num_values,
725        }))
726    }
727}
728
729#[derive(Debug)]
730struct ValueFslDesc {
731    dimension: u64,
732    has_validity: bool,
733}
734
735/// A decompressor for fixed-width data that has
736/// been written, as-is, to disk in single contiguous array
737#[derive(Debug)]
738pub struct ValueDecompressor {
739    /// How many bits are in each inner-most item (e.g. FSL<Int32, 100> would be 32)
740    bits_per_item: u64,
741    /// How many bits are in each value (e.g. FSL<Int32, 100> would be 3200)
742    ///
743    /// This number is a little trickier to compute because we also have to include bytes
744    /// of any inner validity
745    bits_per_value: u64,
746    /// How many items are in each value (e.g. FSL<Int32, 100> would be 100)
747    items_per_value: u64,
748    layers: Vec<ValueFslDesc>,
749}
750
751impl ValueDecompressor {
752    pub fn from_flat(description: &pb::Flat) -> Self {
753        Self {
754            bits_per_item: description.bits_per_value,
755            bits_per_value: description.bits_per_value,
756            items_per_value: 1,
757            layers: Vec::default(),
758        }
759    }
760
761    pub fn from_fsl(mut description: &pb::FixedSizeList) -> Self {
762        let mut layers = Vec::new();
763        let mut cum_dim = 1;
764        let mut bytes_per_value = 0;
765        loop {
766            layers.push(ValueFslDesc {
767                has_validity: description.has_validity,
768                dimension: description.dimension as u64,
769            });
770            cum_dim *= description.dimension as u64;
771            if description.has_validity {
772                bytes_per_value += cum_dim.div_ceil(8);
773            }
774            match description
775                .items
776                .as_ref()
777                .unwrap()
778                .array_encoding
779                .as_ref()
780                .unwrap()
781            {
782                pb::array_encoding::ArrayEncoding::FixedSizeList(inner) => {
783                    description = inner;
784                }
785                pb::array_encoding::ArrayEncoding::Flat(flat) => {
786                    let mut bits_per_value = bytes_per_value * 8;
787                    bits_per_value += flat.bits_per_value * cum_dim;
788                    return Self {
789                        bits_per_item: flat.bits_per_value,
790                        bits_per_value,
791                        items_per_value: cum_dim,
792                        layers,
793                    };
794                }
795                _ => unreachable!(),
796            }
797        }
798    }
799
800    fn buffer_to_block(&self, data: LanceBuffer, num_values: u64) -> DataBlock {
801        DataBlock::FixedWidth(FixedWidthDataBlock {
802            bits_per_value: self.bits_per_item,
803            num_values,
804            data,
805            block_info: BlockInfo::new(),
806        })
807    }
808}
809
810impl BlockDecompressor for ValueDecompressor {
811    fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
812        let block = self.buffer_to_block(data, num_values);
813        assert_eq!(block.num_values(), num_values);
814        Ok(block)
815    }
816}
817
818impl MiniBlockDecompressor for ValueDecompressor {
819    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
820        let num_items = num_values * self.items_per_value;
821        let mut buffer_iter = data.into_iter().rev();
822
823        // Always at least 1 buffer
824        let data_buf = buffer_iter.next().unwrap();
825        let items = self.buffer_to_block(data_buf, num_items);
826        let mut lists = items;
827
828        for layer in self.layers.iter().rev() {
829            if layer.has_validity {
830                let validity_buf = buffer_iter.next().unwrap();
831                lists = DataBlock::Nullable(NullableDataBlock {
832                    data: Box::new(lists),
833                    nulls: validity_buf,
834                    block_info: BlockInfo::default(),
835                });
836            }
837            lists = DataBlock::FixedSizeList(FixedSizeListBlock {
838                child: Box::new(lists),
839                dimension: layer.dimension,
840            })
841        }
842
843        assert_eq!(lists.num_values(), num_values);
844        Ok(lists)
845    }
846}
847
848struct FslDecompressorValidityBuilder {
849    buffer: BooleanBufferBuilder,
850    bits_per_row: usize,
851    bytes_per_row: usize,
852}
853
854// Helper methods for per-value decompression
855impl ValueDecompressor {
856    fn has_validity(&self) -> bool {
857        self.layers.iter().any(|layer| layer.has_validity)
858    }
859
860    // If there is no validity then decompression is zero-copy, we just need to restore any FSL layers
861    fn simple_decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> DataBlock {
862        let mut cum_dim = 1;
863        for layer in &self.layers {
864            cum_dim *= layer.dimension;
865        }
866        debug_assert_eq!(self.bits_per_item, data.bits_per_value / cum_dim);
867        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
868            bits_per_value: self.bits_per_item,
869            num_values: num_rows * cum_dim,
870            data: data.data,
871            block_info: BlockInfo::new(),
872        });
873        for layer in self.layers.iter().rev() {
874            block = DataBlock::FixedSizeList(FixedSizeListBlock {
875                child: Box::new(block),
876                dimension: layer.dimension,
877            });
878        }
879        debug_assert_eq!(num_rows, block.num_values());
880        block
881    }
882
883    // If there is validity then it has been zipped in with the values and we must unzip it
884    fn unzip_decompress(&self, data: FixedWidthDataBlock, num_rows: usize) -> DataBlock {
885        // No support for full-zip on per-value encodings
886        assert_eq!(self.bits_per_item % 8, 0);
887        let bytes_per_item = self.bits_per_item / 8;
888        let mut buffer_builders = Vec::with_capacity(self.layers.len());
889        let mut cum_dim = 1;
890        let mut total_size_bytes = 0;
891        // First, go through the layers, setup our builders, allocate space
892        for layer in &self.layers {
893            cum_dim *= layer.dimension as usize;
894            if layer.has_validity {
895                let validity_size_bits = cum_dim;
896                let validity_size_bytes = validity_size_bits.div_ceil(8);
897                total_size_bytes += num_rows * validity_size_bytes;
898                buffer_builders.push(FslDecompressorValidityBuilder {
899                    buffer: BooleanBufferBuilder::new(validity_size_bits * num_rows),
900                    bits_per_row: cum_dim,
901                    bytes_per_row: validity_size_bytes,
902                })
903            }
904        }
905        let num_items = num_rows * cum_dim;
906        let data_size = num_items * bytes_per_item as usize;
907        total_size_bytes += data_size;
908        let mut data_buffer = Vec::with_capacity(data_size);
909
910        assert_eq!(data.data.len(), total_size_bytes);
911
912        let bytes_per_value = bytes_per_item as usize;
913        let data_bytes_per_row = bytes_per_value * cum_dim;
914
915        // Next, unzip
916        let mut data_offset = 0;
917        while data_offset < total_size_bytes {
918            for builder in buffer_builders.iter_mut() {
919                let start = data_offset * 8;
920                let end = start + builder.bits_per_row;
921                builder.buffer.append_packed_range(start..end, &data.data);
922                data_offset += builder.bytes_per_row;
923            }
924            let end = data_offset + data_bytes_per_row;
925            data_buffer.extend_from_slice(&data.data[data_offset..end]);
926            data_offset += data_bytes_per_row;
927        }
928
929        // Finally, restore the structure
930        let mut block = DataBlock::FixedWidth(FixedWidthDataBlock {
931            bits_per_value: self.bits_per_value,
932            num_values: num_items as u64,
933            data: LanceBuffer::Owned(data_buffer),
934            block_info: BlockInfo::new(),
935        });
936
937        let mut validity_bufs = buffer_builders
938            .into_iter()
939            .rev()
940            .map(|mut b| LanceBuffer::Borrowed(b.buffer.finish().into_inner()));
941        for layer in self.layers.iter().rev() {
942            if layer.has_validity {
943                let nullable = NullableDataBlock {
944                    data: Box::new(block),
945                    nulls: validity_bufs.next().unwrap(),
946                    block_info: BlockInfo::new(),
947                };
948                block = DataBlock::Nullable(nullable);
949            }
950            block = DataBlock::FixedSizeList(FixedSizeListBlock {
951                child: Box::new(block),
952                dimension: layer.dimension,
953            });
954        }
955
956        assert_eq!(num_rows, block.num_values() as usize);
957
958        block
959    }
960}
961
962impl FixedPerValueDecompressor for ValueDecompressor {
963    fn decompress(&self, data: FixedWidthDataBlock, num_rows: u64) -> Result<DataBlock> {
964        if self.has_validity() {
965            Ok(self.unzip_decompress(data, num_rows as usize))
966        } else {
967            Ok(self.simple_decompress(data, num_rows))
968        }
969    }
970
971    fn bits_per_value(&self) -> u64 {
972        self.bits_per_value
973    }
974}
975
976impl PerValueCompressor for ValueEncoder {
977    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, ArrayEncoding)> {
978        let (data, encoding) = match data {
979            DataBlock::FixedWidth(fixed_width) => {
980                let encoding = ProtobufUtils::flat_encoding(fixed_width.bits_per_value, 0, None);
981                (PerValueDataBlock::Fixed(fixed_width), encoding)
982            }
983            DataBlock::FixedSizeList(fixed_size_list) => Self::per_value_fsl(fixed_size_list),
984            _ => unimplemented!(
985                "Cannot compress block of type {} with ValueEncoder",
986                data.name()
987            ),
988        };
989        Ok((data, encoding))
990    }
991}
992
993// public tests module because we share the PRIMITIVE_TYPES constant with fixed_size_list
994#[cfg(test)]
995pub(crate) mod tests {
996    use std::{collections::HashMap, sync::Arc};
997
998    use arrow_array::{
999        make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
1000    };
1001    use arrow_buffer::{BooleanBuffer, NullBuffer};
1002    use arrow_schema::{DataType, Field, TimeUnit};
1003    use lance_datagen::{array, gen, ArrayGeneratorExt, Dimension, RowCount};
1004    use rstest::rstest;
1005
1006    use crate::{
1007        data::DataBlock,
1008        decoder::{FixedPerValueDecompressor, MiniBlockDecompressor},
1009        encoder::{MiniBlockCompressor, PerValueCompressor, PerValueDataBlock},
1010        encodings::physical::value::ValueDecompressor,
1011        format::pb,
1012        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
1013        version::LanceFileVersion,
1014    };
1015
1016    use super::ValueEncoder;
1017
1018    const PRIMITIVE_TYPES: &[DataType] = &[
1019        DataType::Null,
1020        DataType::FixedSizeBinary(2),
1021        DataType::Date32,
1022        DataType::Date64,
1023        DataType::Int8,
1024        DataType::Int16,
1025        DataType::Int32,
1026        DataType::Int64,
1027        DataType::UInt8,
1028        DataType::UInt16,
1029        DataType::UInt32,
1030        DataType::UInt64,
1031        DataType::Float16,
1032        DataType::Float32,
1033        DataType::Float64,
1034        DataType::Decimal128(10, 10),
1035        DataType::Decimal256(10, 10),
1036        DataType::Timestamp(TimeUnit::Nanosecond, None),
1037        DataType::Time32(TimeUnit::Second),
1038        DataType::Time64(TimeUnit::Nanosecond),
1039        DataType::Duration(TimeUnit::Second),
1040        // The Interval type is supported by the reader but the writer works with Lance schema
1041        // at the moment and Lance schema can't parse interval
1042        // DataType::Interval(IntervalUnit::DayTime),
1043    ];
1044
1045    #[test_log::test(tokio::test)]
1046    async fn test_simple_value() {
1047        let items = Arc::new(Int32Array::from(vec![
1048            Some(0),
1049            None,
1050            Some(2),
1051            Some(3),
1052            Some(4),
1053            Some(5),
1054        ]));
1055
1056        let test_cases = TestCases::default()
1057            .with_range(0..3)
1058            .with_range(0..2)
1059            .with_range(1..3)
1060            .with_indices(vec![0, 1, 2])
1061            .with_indices(vec![1])
1062            .with_indices(vec![2])
1063            .with_file_version(LanceFileVersion::V2_1);
1064
1065        check_round_trip_encoding_of_data(vec![items], &test_cases, HashMap::default()).await;
1066    }
1067
1068    #[rstest]
1069    #[test_log::test(tokio::test)]
1070    async fn test_value_primitive(
1071        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1072    ) {
1073        for data_type in PRIMITIVE_TYPES {
1074            log::info!("Testing encoding for {:?}", data_type);
1075            let field = Field::new("", data_type.clone(), false);
1076            check_round_trip_encoding_random(field, version).await;
1077        }
1078    }
1079
1080    lazy_static::lazy_static! {
1081        static ref LARGE_TYPES: Vec<DataType> = vec![DataType::FixedSizeList(
1082            Arc::new(Field::new("", DataType::Int32, false)),
1083            128,
1084        )];
1085    }
1086
1087    #[rstest]
1088    #[test_log::test(tokio::test)]
1089    async fn test_large_primitive(
1090        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
1091    ) {
1092        for data_type in LARGE_TYPES.iter() {
1093            log::info!("Testing encoding for {:?}", data_type);
1094            let field = Field::new("", data_type.clone(), false);
1095            check_round_trip_encoding_random(field, version).await;
1096        }
1097    }
1098
1099    #[test_log::test(tokio::test)]
1100    async fn test_decimal128_dictionary_encoding() {
1101        let test_cases = TestCases::default().with_file_version(LanceFileVersion::V2_1);
1102        let decimals: Vec<i32> = (0..100).collect();
1103        let repeated_strings: Vec<_> = decimals
1104            .iter()
1105            .cycle()
1106            .take(decimals.len() * 10000)
1107            .map(|&v| Some(v as i128))
1108            .collect();
1109        let decimal_array = Arc::new(Decimal128Array::from(repeated_strings)) as ArrayRef;
1110        check_round_trip_encoding_of_data(vec![decimal_array], &test_cases, HashMap::new()).await;
1111    }
1112
1113    #[test_log::test(tokio::test)]
1114    async fn test_miniblock_stress() {
1115        // Tests for strange page sizes and batch sizes and validity scenarios for miniblock
1116
1117        // 10K integers, 100 per array, all valid
1118        let data1 = (0..100)
1119            .map(|_| Arc::new(Int32Array::from_iter_values(0..100)) as Arc<dyn Array>)
1120            .collect::<Vec<_>>();
1121
1122        // Same as above but with mixed validity
1123        let data2 = (0..100)
1124            .map(|_| {
1125                Arc::new(Int32Array::from_iter((0..100).map(|i| {
1126                    if i % 2 == 0 {
1127                        Some(i)
1128                    } else {
1129                        None
1130                    }
1131                }))) as Arc<dyn Array>
1132            })
1133            .collect::<Vec<_>>();
1134
1135        // Same as above but with all null for first half then all valid
1136        // TODO: Re-enable once the all-null path is complete
1137        let _data3 = (0..100)
1138            .map(|chunk_idx| {
1139                Arc::new(Int32Array::from_iter((0..100).map(|i| {
1140                    if chunk_idx < 50 {
1141                        None
1142                    } else {
1143                        Some(i)
1144                    }
1145                }))) as Arc<dyn Array>
1146            })
1147            .collect::<Vec<_>>();
1148
1149        for data in [data1, data2 /*data3*/] {
1150            for batch_size in [10, 100, 1500, 15000] {
1151                // 40000 bytes of data
1152                let test_cases = TestCases::default()
1153                    .with_page_sizes(vec![1000, 2000, 3000, 60000])
1154                    .with_batch_size(batch_size)
1155                    .with_file_version(LanceFileVersion::V2_1);
1156
1157                check_round_trip_encoding_of_data(data.clone(), &test_cases, HashMap::new()).await;
1158            }
1159        }
1160    }
1161
1162    fn create_simple_fsl() -> FixedSizeListArray {
1163        // [[0, 1], NULL], [NULL, NULL], [[8, 9], [NULL, 11]]
1164        let items = Arc::new(Int32Array::from(vec![
1165            Some(0),
1166            Some(1),
1167            Some(2),
1168            Some(3),
1169            None,
1170            None,
1171            None,
1172            None,
1173            Some(8),
1174            Some(9),
1175            None,
1176            Some(11),
1177        ]));
1178        let items_field = Arc::new(Field::new("item", DataType::Int32, true));
1179        let inner_list_nulls = BooleanBuffer::from(vec![true, false, false, false, true, true]);
1180        let inner_list = Arc::new(FixedSizeListArray::new(
1181            items_field.clone(),
1182            2,
1183            items,
1184            Some(NullBuffer::new(inner_list_nulls)),
1185        ));
1186        let inner_list_field = Arc::new(Field::new(
1187            "item",
1188            DataType::FixedSizeList(items_field, 2),
1189            true,
1190        ));
1191        FixedSizeListArray::new(inner_list_field, 2, inner_list, None)
1192    }
1193
1194    #[test]
1195    fn test_fsl_value_compression_miniblock() {
1196        let sample_list = create_simple_fsl();
1197
1198        let starting_data = DataBlock::from_array(sample_list.clone());
1199
1200        let encoder = ValueEncoder::default();
1201        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1202
1203        assert_eq!(data.num_values, 3);
1204        assert_eq!(data.data.len(), 3);
1205        assert_eq!(data.chunks.len(), 1);
1206        assert_eq!(data.chunks[0].buffer_sizes, vec![1, 2, 48]);
1207        assert_eq!(data.chunks[0].log_num_values, 0);
1208
1209        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1210            compression.array_encoding.unwrap()
1211        else {
1212            panic!()
1213        };
1214
1215        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1216
1217        let decompressed =
1218            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1219
1220        let decompressed = make_array(
1221            decompressed
1222                .into_arrow(sample_list.data_type().clone(), true)
1223                .unwrap(),
1224        );
1225
1226        assert_eq!(decompressed.as_ref(), &sample_list);
1227    }
1228
1229    #[test]
1230    fn test_fsl_value_compression_per_value() {
1231        let sample_list = create_simple_fsl();
1232
1233        let starting_data = DataBlock::from_array(sample_list.clone());
1234
1235        let encoder = ValueEncoder::default();
1236        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1237
1238        let PerValueDataBlock::Fixed(data) = data else {
1239            panic!()
1240        };
1241
1242        assert_eq!(data.bits_per_value, 144);
1243        assert_eq!(data.num_values, 3);
1244        assert_eq!(data.data.len(), 18 * 3);
1245
1246        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1247            compression.array_encoding.unwrap()
1248        else {
1249            panic!()
1250        };
1251
1252        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1253
1254        let num_values = data.num_values;
1255        let decompressed =
1256            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1257
1258        let decompressed = make_array(
1259            decompressed
1260                .into_arrow(sample_list.data_type().clone(), true)
1261                .unwrap(),
1262        );
1263
1264        assert_eq!(decompressed.as_ref(), &sample_list);
1265    }
1266
1267    fn create_random_fsl() -> Arc<dyn Array> {
1268        // Several levels of def and multiple pages
1269        let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);
1270        let list_one = array::cycle_vec(inner, Dimension::from(4)).with_random_nulls(0.1);
1271        let list_two = array::cycle_vec(list_one, Dimension::from(4)).with_random_nulls(0.1);
1272        let list_three = array::cycle_vec(list_two, Dimension::from(2));
1273
1274        // Should be 256Ki rows ~ 1MiB of data
1275        let batch = gen()
1276            .anon_col(list_three)
1277            .into_batch_rows(RowCount::from(8 * 1024))
1278            .unwrap();
1279        batch.column(0).clone()
1280    }
1281
1282    #[test]
1283    fn fsl_value_miniblock_stress() {
1284        let sample_array = create_random_fsl();
1285
1286        let starting_data =
1287            DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1288
1289        let encoder = ValueEncoder::default();
1290        let (data, compression) = MiniBlockCompressor::compress(&encoder, starting_data).unwrap();
1291
1292        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1293            compression.array_encoding.unwrap()
1294        else {
1295            panic!()
1296        };
1297
1298        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1299
1300        let decompressed =
1301            MiniBlockDecompressor::decompress(&decompressor, data.data, data.num_values).unwrap();
1302
1303        let decompressed = make_array(
1304            decompressed
1305                .into_arrow(sample_array.data_type().clone(), true)
1306                .unwrap(),
1307        );
1308
1309        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1310    }
1311
1312    #[test]
1313    fn fsl_value_per_value_stress() {
1314        let sample_array = create_random_fsl();
1315
1316        let starting_data =
1317            DataBlock::from_arrays(&[sample_array.clone()], sample_array.len() as u64);
1318
1319        let encoder = ValueEncoder::default();
1320        let (data, compression) = PerValueCompressor::compress(&encoder, starting_data).unwrap();
1321
1322        let pb::array_encoding::ArrayEncoding::FixedSizeList(fsl) =
1323            compression.array_encoding.unwrap()
1324        else {
1325            panic!()
1326        };
1327
1328        let decompressor = ValueDecompressor::from_fsl(fsl.as_ref());
1329
1330        let PerValueDataBlock::Fixed(data) = data else {
1331            panic!()
1332        };
1333
1334        let num_values = data.num_values;
1335        let decompressed =
1336            FixedPerValueDecompressor::decompress(&decompressor, data, num_values).unwrap();
1337
1338        let decompressed = make_array(
1339            decompressed
1340                .into_arrow(sample_array.data_type().clone(), true)
1341                .unwrap(),
1342        );
1343
1344        assert_eq!(decompressed.as_ref(), sample_array.as_ref());
1345    }
1346}