lance_encoding/encodings/physical/
packed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Packed encoding
5//!
6//! These encodings take struct data and compress it in a way that all fields are collected
7//! together.
8//!
9//! This encoding can be transparent or opaque.  In order to be transparent we must use transparent
10//! compression on all children.  Then we can zip together the compressed children.
11
12use std::{convert::TryInto, sync::Arc};
13
14use arrow_array::types::UInt64Type;
15
16use lance_core::{datatypes::Field, Error, Result};
17use snafu::location;
18
19use crate::{
20    buffer::LanceBuffer,
21    compression::{
22        DefaultCompressionStrategy, FixedPerValueDecompressor, MiniBlockDecompressor,
23        VariablePerValueDecompressor,
24    },
25    data::{
26        BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, StructDataBlock,
27        VariableWidthBlock,
28    },
29    encodings::logical::primitive::{
30        fullzip::{PerValueCompressor, PerValueDataBlock},
31        miniblock::{MiniBlockCompressed, MiniBlockCompressor},
32    },
33    format::{
34        pb21::{compressive_encoding::Compression, CompressiveEncoding, PackedStruct},
35        ProtobufUtils21,
36    },
37    statistics::{GetStat, Stat},
38};
39
40use super::value::{ValueDecompressor, ValueEncoder};
41
42// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`.
43// Only fields with fixed-width fields are supported for now, and the
44// assumption that all fields has `bits_per_value % 8 == 0` is made.
45fn struct_data_block_to_fixed_width_data_block(
46    struct_data_block: StructDataBlock,
47    bits_per_values: &[u64],
48) -> DataBlock {
49    let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
50    let mut output = Vec::with_capacity(data_size as usize);
51    let num_values = struct_data_block.children[0].num_values();
52
53    for i in 0..num_values as usize {
54        for (j, child) in struct_data_block.children.iter().enumerate() {
55            let bytes_per_value = (bits_per_values[j] / 8) as usize;
56            let this_data = child
57                .as_fixed_width_ref()
58                .unwrap()
59                .data
60                .slice_with_length(bytes_per_value * i, bytes_per_value);
61            output.extend_from_slice(&this_data);
62        }
63    }
64
65    DataBlock::FixedWidth(FixedWidthDataBlock {
66        bits_per_value: bits_per_values.iter().copied().sum(),
67        data: LanceBuffer::from(output),
68        num_values,
69        block_info: BlockInfo::default(),
70    })
71}
72
73#[derive(Debug, Default)]
74pub struct PackedStructFixedWidthMiniBlockEncoder {}
75
76impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
77    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
78        match data {
79            DataBlock::Struct(struct_data_block) => {
80                let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
81
82                // transform struct datablock to fixed-width data block.
83                let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
84
85                // store and transformed fixed-width data block.
86                let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
87                let (value_miniblock_compressed, value_array_encoding) =
88                value_miniblock_compressor.compress(data_block)?;
89
90                Ok((
91                    value_miniblock_compressed,
92                    ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
93                ))
94            }
95            _ => Err(Error::InvalidInput {
96                source: format!(
97                    "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
98                    data.name()
99                )
100                .into(),
101                location: location!(),
102            }),
103        }
104    }
105}
106
107#[derive(Debug)]
108pub struct PackedStructFixedWidthMiniBlockDecompressor {
109    bits_per_values: Vec<u64>,
110    array_encoding: Box<dyn MiniBlockDecompressor>,
111}
112
113impl PackedStructFixedWidthMiniBlockDecompressor {
114    pub fn new(description: &PackedStruct) -> Self {
115        let array_encoding: Box<dyn MiniBlockDecompressor> = match description.values.as_ref().unwrap().compression.as_ref().unwrap() {
116            Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
117            _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
118        };
119        Self {
120            bits_per_values: description.bits_per_value.clone(),
121            array_encoding,
122        }
123    }
124}
125
126impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
127    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
128        assert_eq!(data.len(), 1);
129        let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
130        let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
131            panic!("ValueDecompressor should output FixedWidth DataBlock")
132        };
133
134        let bytes_per_values = self
135            .bits_per_values
136            .iter()
137            .map(|bits_per_value| *bits_per_value as usize / 8)
138            .collect::<Vec<_>>();
139
140        assert!(encoded_data_block.bits_per_value % 8 == 0);
141        let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
142
143        // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`.
144        let mut prefix_sum = vec![0; self.bits_per_values.len()];
145        for i in 0..(self.bits_per_values.len() - 1) {
146            prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
147        }
148
149        let mut children_data_block = vec![];
150        for i in 0..self.bits_per_values.len() {
151            let child_buf_size = bytes_per_values[i] * num_values as usize;
152            let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
153
154            for j in 0..num_values as usize {
155                // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`.
156                let this_value = encoded_data_block.data.slice_with_length(
157                    prefix_sum[i] + (j * encoded_bytes_per_row),
158                    bytes_per_values[i],
159                );
160
161                child_buf.extend_from_slice(&this_value);
162            }
163
164            let child = DataBlock::FixedWidth(FixedWidthDataBlock {
165                data: LanceBuffer::from(child_buf),
166                bits_per_value: self.bits_per_values[i],
167                num_values,
168                block_info: BlockInfo::default(),
169            });
170            children_data_block.push(child);
171        }
172        Ok(DataBlock::Struct(StructDataBlock {
173            children: children_data_block,
174            block_info: BlockInfo::default(),
175            validity: None,
176        }))
177    }
178}
179
180#[derive(Debug)]
181enum VariablePackedFieldData {
182    Fixed {
183        block: FixedWidthDataBlock,
184    },
185    Variable {
186        block: VariableWidthBlock,
187        bits_per_length: u64,
188    },
189}
190
191impl VariablePackedFieldData {
192    fn append_row_bytes(&self, row_idx: usize, output: &mut Vec<u8>) -> Result<()> {
193        match self {
194            Self::Fixed { block } => {
195                let bits_per_value = block.bits_per_value;
196                if bits_per_value % 8 != 0 {
197                    return Err(Error::invalid_input(
198                        "Packed struct variable encoding requires byte-aligned fixed-width children",
199                        location!(),
200                    ));
201                }
202                let bytes_per_value = (bits_per_value / 8) as usize;
203                let start = row_idx.checked_mul(bytes_per_value).ok_or_else(|| {
204                    Error::invalid_input("Packed struct row size overflow", location!())
205                })?;
206                let end = start + bytes_per_value;
207                let data = block.data.as_ref();
208                if end > data.len() {
209                    return Err(Error::invalid_input(
210                        "Packed struct fixed child out of bounds",
211                        location!(),
212                    ));
213                }
214                output.extend_from_slice(&data[start..end]);
215                Ok(())
216            }
217            Self::Variable {
218                block,
219                bits_per_length,
220            } => {
221                if bits_per_length % 8 != 0 {
222                    return Err(Error::invalid_input(
223                        "Packed struct variable children must have byte-aligned length prefixes",
224                        location!(),
225                    ));
226                }
227                let prefix_bytes = (*bits_per_length / 8) as usize;
228                if !(prefix_bytes == 4 || prefix_bytes == 8) {
229                    return Err(Error::invalid_input(
230                        "Packed struct variable children must use 32 or 64-bit length prefixes",
231                        location!(),
232                    ));
233                }
234                match block.bits_per_offset {
235                    32 => {
236                        let offsets = block.offsets.borrow_to_typed_slice::<u32>();
237                        let start = offsets[row_idx] as usize;
238                        let end = offsets[row_idx + 1] as usize;
239                        if end > block.data.len() {
240                            return Err(Error::invalid_input(
241                                "Packed struct variable child offsets out of bounds",
242                                location!(),
243                            ));
244                        }
245                        let len = (end - start) as u32;
246                        if prefix_bytes != std::mem::size_of::<u32>() {
247                            return Err(Error::invalid_input(
248                                "Packed struct variable child length prefix mismatch",
249                                location!(),
250                            ));
251                        }
252                        output.extend_from_slice(&len.to_le_bytes());
253                        output.extend_from_slice(&block.data[start..end]);
254                        Ok(())
255                    }
256                    64 => {
257                        let offsets = block.offsets.borrow_to_typed_slice::<u64>();
258                        let start = offsets[row_idx] as usize;
259                        let end = offsets[row_idx + 1] as usize;
260                        if end > block.data.len() {
261                            return Err(Error::invalid_input(
262                                "Packed struct variable child offsets out of bounds",
263                                location!(),
264                            ));
265                        }
266                        let len = (end - start) as u64;
267                        if prefix_bytes != std::mem::size_of::<u64>() {
268                            return Err(Error::invalid_input(
269                                "Packed struct variable child length prefix mismatch",
270                                location!(),
271                            ));
272                        }
273                        output.extend_from_slice(&len.to_le_bytes());
274                        output.extend_from_slice(&block.data[start..end]);
275                        Ok(())
276                    }
277                    _ => Err(Error::invalid_input(
278                        "Packed struct variable child must use 32 or 64-bit offsets",
279                        location!(),
280                    )),
281                }
282            }
283        }
284    }
285}
286
287#[derive(Debug)]
288pub struct PackedStructVariablePerValueEncoder {
289    strategy: DefaultCompressionStrategy,
290    fields: Vec<Field>,
291}
292
293impl PackedStructVariablePerValueEncoder {
294    pub fn new(strategy: DefaultCompressionStrategy, fields: Vec<Field>) -> Self {
295        Self { strategy, fields }
296    }
297}
298
299impl PerValueCompressor for PackedStructVariablePerValueEncoder {
300    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
301        let DataBlock::Struct(struct_block) = data else {
302            return Err(Error::invalid_input(
303                "Packed struct encoder requires Struct data block",
304                location!(),
305            ));
306        };
307
308        if struct_block.children.is_empty() {
309            return Err(Error::invalid_input(
310                "Packed struct encoder requires at least one child field",
311                location!(),
312            ));
313        }
314        if struct_block.children.len() != self.fields.len() {
315            return Err(Error::invalid_input(
316                "Struct field metadata does not match number of children",
317                location!(),
318            ));
319        }
320
321        let num_values = struct_block.children[0].num_values();
322        for child in struct_block.children.iter() {
323            if child.num_values() != num_values {
324                return Err(Error::invalid_input(
325                    "Packed struct children must have matching value counts",
326                    location!(),
327                ));
328            }
329        }
330
331        let mut field_data = Vec::with_capacity(self.fields.len());
332        let mut field_metadata = Vec::with_capacity(self.fields.len());
333
334        for (field, child_block) in self.fields.iter().zip(struct_block.children.into_iter()) {
335            let compressor = crate::compression::CompressionStrategy::create_per_value(
336                &self.strategy,
337                field,
338                &child_block,
339            )?;
340            let (compressed, encoding) = compressor.compress(child_block)?;
341            match compressed {
342                PerValueDataBlock::Fixed(block) => {
343                    field_metadata.push(ProtobufUtils21::packed_struct_field_fixed(
344                        encoding,
345                        block.bits_per_value,
346                    ));
347                    field_data.push(VariablePackedFieldData::Fixed { block });
348                }
349                PerValueDataBlock::Variable(block) => {
350                    let bits_per_length = block.bits_per_offset as u64;
351                    field_metadata.push(ProtobufUtils21::packed_struct_field_variable(
352                        encoding,
353                        bits_per_length,
354                    ));
355                    field_data.push(VariablePackedFieldData::Variable {
356                        block,
357                        bits_per_length,
358                    });
359                }
360            }
361        }
362
363        let mut row_data: Vec<u8> = Vec::new();
364        let mut row_offsets: Vec<u64> = Vec::with_capacity(num_values as usize + 1);
365        row_offsets.push(0);
366        let mut total_bytes: usize = 0;
367        let mut max_row_len: usize = 0;
368        for row in 0..num_values as usize {
369            let start = row_data.len();
370            for field in &field_data {
371                field.append_row_bytes(row, &mut row_data)?;
372            }
373            let end = row_data.len();
374            let row_len = end - start;
375            max_row_len = max_row_len.max(row_len);
376            total_bytes = total_bytes.checked_add(row_len).ok_or_else(|| {
377                Error::invalid_input("Packed struct row data size overflow", location!())
378            })?;
379            row_offsets.push(end as u64);
380        }
381        debug_assert_eq!(total_bytes, row_data.len());
382
383        let use_u32_offsets = total_bytes <= u32::MAX as usize && max_row_len <= u32::MAX as usize;
384        let bits_per_offset = if use_u32_offsets { 32 } else { 64 };
385        let offsets_buffer = if use_u32_offsets {
386            let offsets_u32 = row_offsets
387                .iter()
388                .map(|&offset| offset as u32)
389                .collect::<Vec<_>>();
390            LanceBuffer::reinterpret_vec(offsets_u32)
391        } else {
392            LanceBuffer::reinterpret_vec(row_offsets)
393        };
394
395        let data_block = VariableWidthBlock {
396            data: LanceBuffer::from(row_data),
397            bits_per_offset,
398            offsets: offsets_buffer,
399            num_values,
400            block_info: BlockInfo::new(),
401        };
402
403        Ok((
404            PerValueDataBlock::Variable(data_block),
405            ProtobufUtils21::packed_struct_variable(field_metadata),
406        ))
407    }
408}
409
410#[derive(Debug)]
411pub(crate) enum VariablePackedStructFieldKind {
412    Fixed {
413        bits_per_value: u64,
414        decompressor: Arc<dyn FixedPerValueDecompressor>,
415    },
416    Variable {
417        bits_per_length: u64,
418        decompressor: Arc<dyn VariablePerValueDecompressor>,
419    },
420}
421
422#[derive(Debug)]
423pub(crate) struct VariablePackedStructFieldDecoder {
424    pub(crate) kind: VariablePackedStructFieldKind,
425}
426
427#[derive(Debug)]
428pub struct PackedStructVariablePerValueDecompressor {
429    fields: Vec<VariablePackedStructFieldDecoder>,
430}
431
432impl PackedStructVariablePerValueDecompressor {
433    pub(crate) fn new(fields: Vec<VariablePackedStructFieldDecoder>) -> Self {
434        Self { fields }
435    }
436}
437
438enum FieldAccumulator {
439    Fixed {
440        builder: DataBlockBuilder,
441        bits_per_value: u64,
442    },
443    Variable32 {
444        builder: DataBlockBuilder,
445    },
446    Variable64 {
447        builder: DataBlockBuilder,
448    },
449}
450
451impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
452    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
453        let num_values = data.num_values;
454        let offsets_u64 = match data.bits_per_offset {
455            32 => data
456                .offsets
457                .borrow_to_typed_slice::<u32>()
458                .iter()
459                .map(|v| *v as u64)
460                .collect::<Vec<_>>(),
461            64 => data
462                .offsets
463                .borrow_to_typed_slice::<u64>()
464                .as_ref()
465                .to_vec(),
466            _ => {
467                return Err(Error::invalid_input(
468                    "Packed struct row offsets must be 32 or 64 bits",
469                    location!(),
470                ))
471            }
472        };
473
474        if offsets_u64.len() != num_values as usize + 1 {
475            return Err(Error::invalid_input(
476                "Packed struct row offsets length mismatch",
477                location!(),
478            ));
479        }
480
481        let mut accumulators = Vec::with_capacity(self.fields.len());
482        for field in &self.fields {
483            match &field.kind {
484                VariablePackedStructFieldKind::Fixed { bits_per_value, .. } => {
485                    if bits_per_value % 8 != 0 {
486                        return Err(Error::invalid_input(
487                            "Packed struct fixed child must be byte-aligned",
488                            location!(),
489                        ));
490                    }
491                    let bytes_per_value = bits_per_value.checked_div(8).ok_or_else(|| {
492                        Error::invalid_input(
493                            "Invalid bits per value for packed struct field",
494                            location!(),
495                        )
496                    })?;
497                    let estimate = bytes_per_value.checked_mul(num_values).ok_or_else(|| {
498                        Error::invalid_input(
499                            "Packed struct fixed child allocation overflow",
500                            location!(),
501                        )
502                    })?;
503                    accumulators.push(FieldAccumulator::Fixed {
504                        builder: DataBlockBuilder::with_capacity_estimate(estimate),
505                        bits_per_value: *bits_per_value,
506                    });
507                }
508                VariablePackedStructFieldKind::Variable {
509                    bits_per_length, ..
510                } => match bits_per_length {
511                    32 => accumulators.push(FieldAccumulator::Variable32 {
512                        builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
513                    }),
514                    64 => accumulators.push(FieldAccumulator::Variable64 {
515                        builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
516                    }),
517                    _ => {
518                        return Err(Error::invalid_input(
519                            "Packed struct variable child must use 32 or 64-bit length prefixes",
520                            location!(),
521                        ))
522                    }
523                },
524            }
525        }
526
527        for row_idx in 0..num_values as usize {
528            let row_start = offsets_u64[row_idx] as usize;
529            let row_end = offsets_u64[row_idx + 1] as usize;
530            if row_end > data.data.len() || row_start > row_end {
531                return Err(Error::invalid_input(
532                    "Packed struct row bounds exceed buffer",
533                    location!(),
534                ));
535            }
536            let mut cursor = row_start;
537            for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
538                match (&field.kind, accumulator) {
539                    (
540                        VariablePackedStructFieldKind::Fixed { bits_per_value, .. },
541                        FieldAccumulator::Fixed {
542                            builder,
543                            bits_per_value: acc_bits,
544                        },
545                    ) => {
546                        debug_assert_eq!(bits_per_value, acc_bits);
547                        let bytes_per_value = (bits_per_value / 8) as usize;
548                        let end = cursor + bytes_per_value;
549                        if end > row_end {
550                            return Err(Error::invalid_input(
551                                "Packed struct fixed child exceeds row bounds",
552                                location!(),
553                            ));
554                        }
555                        let value_block = DataBlock::FixedWidth(FixedWidthDataBlock {
556                            data: LanceBuffer::from(data.data[cursor..end].to_vec()),
557                            bits_per_value: *bits_per_value,
558                            num_values: 1,
559                            block_info: BlockInfo::new(),
560                        });
561                        builder.append(&value_block, 0..1);
562                        cursor = end;
563                    }
564                    (
565                        VariablePackedStructFieldKind::Variable {
566                            bits_per_length, ..
567                        },
568                        FieldAccumulator::Variable32 { builder },
569                    ) => {
570                        if *bits_per_length != 32 {
571                            return Err(Error::invalid_input(
572                                "Packed struct length prefix size mismatch",
573                                location!(),
574                            ));
575                        }
576                        let end = cursor + std::mem::size_of::<u32>();
577                        if end > row_end {
578                            return Err(Error::invalid_input(
579                                "Packed struct variable child length prefix out of bounds",
580                                location!(),
581                            ));
582                        }
583                        let len = u32::from_le_bytes(
584                            data.data[cursor..end]
585                                .try_into()
586                                .expect("slice has exact length"),
587                        ) as usize;
588                        cursor = end;
589                        let value_end = cursor + len;
590                        if value_end > row_end {
591                            return Err(Error::invalid_input(
592                                "Packed struct variable child exceeds row bounds",
593                                location!(),
594                            ));
595                        }
596                        let value_block = DataBlock::VariableWidth(VariableWidthBlock {
597                            data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
598                            bits_per_offset: 32,
599                            offsets: LanceBuffer::reinterpret_vec(vec![0_u32, len as u32]),
600                            num_values: 1,
601                            block_info: BlockInfo::new(),
602                        });
603                        builder.append(&value_block, 0..1);
604                        cursor = value_end;
605                    }
606                    (
607                        VariablePackedStructFieldKind::Variable {
608                            bits_per_length, ..
609                        },
610                        FieldAccumulator::Variable64 { builder },
611                    ) => {
612                        if *bits_per_length != 64 {
613                            return Err(Error::invalid_input(
614                                "Packed struct length prefix size mismatch",
615                                location!(),
616                            ));
617                        }
618                        let end = cursor + std::mem::size_of::<u64>();
619                        if end > row_end {
620                            return Err(Error::invalid_input(
621                                "Packed struct variable child length prefix out of bounds",
622                                location!(),
623                            ));
624                        }
625                        let len = u64::from_le_bytes(
626                            data.data[cursor..end]
627                                .try_into()
628                                .expect("slice has exact length"),
629                        ) as usize;
630                        cursor = end;
631                        let value_end = cursor + len;
632                        if value_end > row_end {
633                            return Err(Error::invalid_input(
634                                "Packed struct variable child exceeds row bounds",
635                                location!(),
636                            ));
637                        }
638                        let value_block = DataBlock::VariableWidth(VariableWidthBlock {
639                            data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
640                            bits_per_offset: 64,
641                            offsets: LanceBuffer::reinterpret_vec(vec![0_u64, len as u64]),
642                            num_values: 1,
643                            block_info: BlockInfo::new(),
644                        });
645                        builder.append(&value_block, 0..1);
646                        cursor = value_end;
647                    }
648                    _ => {
649                        return Err(Error::invalid_input(
650                            "Packed struct accumulator kind mismatch",
651                            location!(),
652                        ))
653                    }
654                }
655            }
656            if cursor != row_end {
657                return Err(Error::invalid_input(
658                    "Packed struct row parsing did not consume full row",
659                    location!(),
660                ));
661            }
662        }
663
664        let mut children = Vec::with_capacity(self.fields.len());
665        for (field, accumulator) in self.fields.iter().zip(accumulators.into_iter()) {
666            match (field, accumulator) {
667                (
668                    VariablePackedStructFieldDecoder {
669                        kind: VariablePackedStructFieldKind::Fixed { decompressor, .. },
670                    },
671                    FieldAccumulator::Fixed { builder, .. },
672                ) => {
673                    let DataBlock::FixedWidth(block) = builder.finish() else {
674                        panic!("Expected fixed-width datablock from builder");
675                    };
676                    let decoded = decompressor.decompress(block, num_values)?;
677                    children.push(decoded);
678                }
679                (
680                    VariablePackedStructFieldDecoder {
681                        kind:
682                            VariablePackedStructFieldKind::Variable {
683                                bits_per_length,
684                                decompressor,
685                            },
686                    },
687                    FieldAccumulator::Variable32 { builder },
688                ) => {
689                    let DataBlock::VariableWidth(mut block) = builder.finish() else {
690                        panic!("Expected variable-width datablock from builder");
691                    };
692                    debug_assert_eq!(block.bits_per_offset, 32);
693                    block.bits_per_offset = (*bits_per_length) as u8;
694                    let decoded = decompressor.decompress(block)?;
695                    children.push(decoded);
696                }
697                (
698                    VariablePackedStructFieldDecoder {
699                        kind:
700                            VariablePackedStructFieldKind::Variable {
701                                bits_per_length,
702                                decompressor,
703                            },
704                    },
705                    FieldAccumulator::Variable64 { builder },
706                ) => {
707                    let DataBlock::VariableWidth(mut block) = builder.finish() else {
708                        panic!("Expected variable-width datablock from builder");
709                    };
710                    debug_assert_eq!(block.bits_per_offset, 64);
711                    block.bits_per_offset = (*bits_per_length) as u8;
712                    let decoded = decompressor.decompress(block)?;
713                    children.push(decoded);
714                }
715                _ => {
716                    return Err(Error::invalid_input(
717                        "Packed struct accumulator mismatch during finalize",
718                        location!(),
719                    ))
720                }
721            }
722        }
723
724        Ok(DataBlock::Struct(StructDataBlock {
725            children,
726            block_info: BlockInfo::new(),
727            validity: None,
728        }))
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::{
736        compression::CompressionStrategy,
737        compression::{DefaultCompressionStrategy, DefaultDecompressionStrategy},
738        statistics::ComputeStat,
739        version::LanceFileVersion,
740    };
741    use arrow_array::{
742        Array, ArrayRef, BinaryArray, Int32Array, Int64Array, LargeStringArray, StringArray,
743    };
744    use arrow_schema::{DataType, Field as ArrowField, Fields};
745    use std::sync::Arc;
746
747    fn fixed_block_from_array(array: Int64Array) -> FixedWidthDataBlock {
748        let num_values = array.len() as u64;
749        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
750        match block {
751            DataBlock::FixedWidth(block) => block,
752            _ => panic!("Expected fixed-width data block"),
753        }
754    }
755
756    fn fixed_i32_block_from_array(array: Int32Array) -> FixedWidthDataBlock {
757        let num_values = array.len() as u64;
758        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
759        match block {
760            DataBlock::FixedWidth(block) => block,
761            _ => panic!("Expected fixed-width data block"),
762        }
763    }
764
765    fn variable_block_from_string_array(array: StringArray) -> VariableWidthBlock {
766        let num_values = array.len() as u64;
767        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
768        match block {
769            DataBlock::VariableWidth(block) => block,
770            _ => panic!("Expected variable-width block"),
771        }
772    }
773
774    fn variable_block_from_large_string_array(array: LargeStringArray) -> VariableWidthBlock {
775        let num_values = array.len() as u64;
776        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
777        match block {
778            DataBlock::VariableWidth(block) => block,
779            _ => panic!("Expected variable-width block"),
780        }
781    }
782
783    fn variable_block_from_binary_array(array: BinaryArray) -> VariableWidthBlock {
784        let num_values = array.len() as u64;
785        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
786        match block {
787            DataBlock::VariableWidth(block) => block,
788            _ => panic!("Expected variable-width block"),
789        }
790    }
791
792    #[test]
793    fn variable_packed_struct_round_trip() -> Result<()> {
794        let arrow_fields: Fields = vec![
795            ArrowField::new("id", DataType::UInt32, false),
796            ArrowField::new("name", DataType::Utf8, true),
797        ]
798        .into();
799        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
800        let struct_field = Field::try_from(&arrow_struct)?;
801
802        let ids = vec![1_u32, 2, 42];
803        let id_bytes = ids
804            .iter()
805            .flat_map(|value| value.to_le_bytes())
806            .collect::<Vec<_>>();
807        let mut id_block = FixedWidthDataBlock {
808            data: LanceBuffer::reinterpret_vec(ids),
809            bits_per_value: 32,
810            num_values: 3,
811            block_info: BlockInfo::new(),
812        };
813        id_block.compute_stat();
814        let id_block = DataBlock::FixedWidth(id_block);
815
816        let name_offsets = vec![0_i32, 1, 4, 4];
817        let name_bytes = b"abcz".to_vec();
818        let mut name_block = VariableWidthBlock {
819            data: LanceBuffer::from(name_bytes.clone()),
820            bits_per_offset: 32,
821            offsets: LanceBuffer::reinterpret_vec(name_offsets.clone()),
822            num_values: 3,
823            block_info: BlockInfo::new(),
824        };
825        name_block.compute_stat();
826        let name_block = DataBlock::VariableWidth(name_block);
827
828        let struct_block = StructDataBlock {
829            children: vec![id_block, name_block],
830            block_info: BlockInfo::new(),
831            validity: None,
832        };
833
834        let data_block = DataBlock::Struct(struct_block);
835
836        let compression_strategy =
837            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
838        let compressor = crate::compression::CompressionStrategy::create_per_value(
839            &compression_strategy,
840            &struct_field,
841            &data_block,
842        )?;
843        let (compressed, encoding) = compressor.compress(data_block)?;
844
845        let PerValueDataBlock::Variable(zipped) = compressed else {
846            panic!("expected variable-width packed struct output");
847        };
848
849        let decompression_strategy = DefaultDecompressionStrategy::default();
850        let decompressor =
851            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
852                &decompression_strategy,
853                &encoding,
854            )?;
855        let decoded = decompressor.decompress(zipped)?;
856
857        let DataBlock::Struct(decoded_struct) = decoded else {
858            panic!("expected struct datablock after decode");
859        };
860
861        let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
862        assert_eq!(decoded_id.bits_per_value, 32);
863        assert_eq!(decoded_id.data.as_ref(), id_bytes.as_slice());
864
865        let decoded_name = decoded_struct.children[1].as_variable_width_ref().unwrap();
866        assert_eq!(decoded_name.bits_per_offset, 32);
867        let decoded_offsets = decoded_name.offsets.borrow_to_typed_slice::<i32>();
868        assert_eq!(decoded_offsets.as_ref(), name_offsets.as_slice());
869        assert_eq!(decoded_name.data.as_ref(), name_bytes.as_slice());
870
871        Ok(())
872    }
873
874    #[test]
875    fn variable_packed_struct_large_utf8_round_trip() -> Result<()> {
876        let arrow_fields: Fields = vec![
877            ArrowField::new("value", DataType::Int64, false),
878            ArrowField::new("text", DataType::LargeUtf8, false),
879        ]
880        .into();
881        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
882        let struct_field = Field::try_from(&arrow_struct)?;
883
884        let id_block = fixed_block_from_array(Int64Array::from(vec![10, 20, 30, 40]));
885        let payload_array = LargeStringArray::from(vec![
886            "alpha",
887            "a considerably longer payload for testing",
888            "mid",
889            "z",
890        ]);
891        let payload_block = variable_block_from_large_string_array(payload_array);
892
893        let struct_block = StructDataBlock {
894            children: vec![
895                DataBlock::FixedWidth(id_block.clone()),
896                DataBlock::VariableWidth(payload_block.clone()),
897            ],
898            block_info: BlockInfo::new(),
899            validity: None,
900        };
901
902        let data_block = DataBlock::Struct(struct_block);
903
904        let compression_strategy =
905            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
906        let compressor = crate::compression::CompressionStrategy::create_per_value(
907            &compression_strategy,
908            &struct_field,
909            &data_block,
910        )?;
911        let (compressed, encoding) = compressor.compress(data_block)?;
912
913        let PerValueDataBlock::Variable(zipped) = compressed else {
914            panic!("expected variable-width packed struct output");
915        };
916
917        let decompression_strategy = DefaultDecompressionStrategy::default();
918        let decompressor =
919            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
920                &decompression_strategy,
921                &encoding,
922            )?;
923        let decoded = decompressor.decompress(zipped)?;
924
925        let DataBlock::Struct(decoded_struct) = decoded else {
926            panic!("expected struct datablock after decode");
927        };
928
929        let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
930        assert_eq!(decoded_id.bits_per_value, 64);
931        assert_eq!(decoded_id.data.as_ref(), id_block.data.as_ref());
932
933        let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
934        assert_eq!(decoded_payload.bits_per_offset, 64);
935        assert_eq!(
936            decoded_payload
937                .offsets
938                .borrow_to_typed_slice::<i64>()
939                .as_ref(),
940            payload_block
941                .offsets
942                .borrow_to_typed_slice::<i64>()
943                .as_ref()
944        );
945        assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
946
947        Ok(())
948    }
949
950    #[test]
951    fn variable_packed_struct_multi_variable_round_trip() -> Result<()> {
952        let arrow_fields: Fields = vec![
953            ArrowField::new("category", DataType::Utf8, false),
954            ArrowField::new("payload", DataType::Binary, false),
955            ArrowField::new("count", DataType::Int32, false),
956        ]
957        .into();
958        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
959        let struct_field = Field::try_from(&arrow_struct)?;
960
961        let category_array = StringArray::from(vec!["red", "blue", "green", "red"]);
962        let category_block = variable_block_from_string_array(category_array);
963        let payload_values: Vec<Vec<u8>> =
964            vec![vec![0x01, 0x02], vec![], vec![0x05, 0x06, 0x07], vec![0xff]];
965        let payload_array =
966            BinaryArray::from_iter_values(payload_values.iter().map(|v| v.as_slice()));
967        let payload_block = variable_block_from_binary_array(payload_array);
968        let count_block = fixed_i32_block_from_array(Int32Array::from(vec![1, 2, 3, 4]));
969
970        let struct_block = StructDataBlock {
971            children: vec![
972                DataBlock::VariableWidth(category_block.clone()),
973                DataBlock::VariableWidth(payload_block.clone()),
974                DataBlock::FixedWidth(count_block.clone()),
975            ],
976            block_info: BlockInfo::new(),
977            validity: None,
978        };
979
980        let data_block = DataBlock::Struct(struct_block);
981
982        let compression_strategy =
983            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
984        let compressor = crate::compression::CompressionStrategy::create_per_value(
985            &compression_strategy,
986            &struct_field,
987            &data_block,
988        )?;
989        let (compressed, encoding) = compressor.compress(data_block)?;
990
991        let PerValueDataBlock::Variable(zipped) = compressed else {
992            panic!("expected variable-width packed struct output");
993        };
994
995        let decompression_strategy = DefaultDecompressionStrategy::default();
996        let decompressor =
997            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
998                &decompression_strategy,
999                &encoding,
1000            )?;
1001        let decoded = decompressor.decompress(zipped)?;
1002
1003        let DataBlock::Struct(decoded_struct) = decoded else {
1004            panic!("expected struct datablock after decode");
1005        };
1006
1007        let decoded_category = decoded_struct.children[0].as_variable_width_ref().unwrap();
1008        assert_eq!(decoded_category.bits_per_offset, 32);
1009        assert_eq!(
1010            decoded_category
1011                .offsets
1012                .borrow_to_typed_slice::<i32>()
1013                .as_ref(),
1014            category_block
1015                .offsets
1016                .borrow_to_typed_slice::<i32>()
1017                .as_ref()
1018        );
1019        assert_eq!(decoded_category.data.as_ref(), category_block.data.as_ref());
1020
1021        let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
1022        assert_eq!(decoded_payload.bits_per_offset, 32);
1023        assert_eq!(
1024            decoded_payload
1025                .offsets
1026                .borrow_to_typed_slice::<i32>()
1027                .as_ref(),
1028            payload_block
1029                .offsets
1030                .borrow_to_typed_slice::<i32>()
1031                .as_ref()
1032        );
1033        assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
1034
1035        let decoded_count = decoded_struct.children[2].as_fixed_width_ref().unwrap();
1036        assert_eq!(decoded_count.bits_per_value, 32);
1037        assert_eq!(decoded_count.data.as_ref(), count_block.data.as_ref());
1038
1039        Ok(())
1040    }
1041
1042    #[test]
1043    fn variable_packed_struct_requires_v22() {
1044        let arrow_fields: Fields = vec![
1045            ArrowField::new("value", DataType::Int64, false),
1046            ArrowField::new("text", DataType::Utf8, false),
1047        ]
1048        .into();
1049        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1050        let struct_field = Field::try_from(&arrow_struct).unwrap();
1051
1052        let value_block = fixed_block_from_array(Int64Array::from(vec![1, 2, 3]));
1053        let text_block =
1054            variable_block_from_string_array(StringArray::from(vec!["a", "bb", "ccc"]));
1055
1056        let struct_block = StructDataBlock {
1057            children: vec![
1058                DataBlock::FixedWidth(value_block),
1059                DataBlock::VariableWidth(text_block),
1060            ],
1061            block_info: BlockInfo::new(),
1062            validity: None,
1063        };
1064
1065        let compression_strategy =
1066            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_1);
1067        let result =
1068            compression_strategy.create_per_value(&struct_field, &DataBlock::Struct(struct_block));
1069
1070        assert!(matches!(result, Err(Error::NotSupported { .. })));
1071    }
1072}