Skip to main content

lance_encoding/encodings/physical/
packed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Packed encoding
5//!
6//! These encodings take struct data and compress it in a way that all fields are collected
7//! together.
8//!
9//! This encoding can be transparent or opaque.  In order to be transparent we must use transparent
10//! compression on all children.  Then we can zip together the compressed children.
11
12use std::{convert::TryInto, sync::Arc};
13
14use arrow_array::types::UInt64Type;
15
16use lance_core::{Error, Result, datatypes::Field};
17
18use crate::{
19    buffer::LanceBuffer,
20    compression::{
21        DefaultCompressionStrategy, FixedPerValueDecompressor, MiniBlockDecompressor,
22        VariablePerValueDecompressor,
23    },
24    data::{
25        BlockInfo, DataBlock, DataBlockBuilder, FixedWidthDataBlock, StructDataBlock,
26        VariableWidthBlock,
27    },
28    encodings::logical::primitive::{
29        fullzip::{PerValueCompressor, PerValueDataBlock},
30        miniblock::{MiniBlockCompressed, MiniBlockCompressor},
31    },
32    format::{
33        ProtobufUtils21,
34        pb21::{CompressiveEncoding, PackedStruct, compressive_encoding::Compression},
35    },
36    statistics::{GetStat, Stat},
37};
38
39use super::value::{ValueDecompressor, ValueEncoder};
40
41// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`.
42// Only fields with fixed-width fields are supported for now, and the
43// assumption that all fields has `bits_per_value % 8 == 0` is made.
44fn struct_data_block_to_fixed_width_data_block(
45    struct_data_block: StructDataBlock,
46    bits_per_values: &[u64],
47) -> DataBlock {
48    let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
49    let mut output = Vec::with_capacity(data_size as usize);
50    let num_values = struct_data_block.children[0].num_values();
51
52    for i in 0..num_values as usize {
53        for (j, child) in struct_data_block.children.iter().enumerate() {
54            let bytes_per_value = (bits_per_values[j] / 8) as usize;
55            let this_data = child
56                .as_fixed_width_ref()
57                .unwrap()
58                .data
59                .slice_with_length(bytes_per_value * i, bytes_per_value);
60            output.extend_from_slice(&this_data);
61        }
62    }
63
64    DataBlock::FixedWidth(FixedWidthDataBlock {
65        bits_per_value: bits_per_values.iter().copied().sum(),
66        data: LanceBuffer::from(output),
67        num_values,
68        block_info: BlockInfo::default(),
69    })
70}
71
72#[derive(Debug, Default)]
73pub struct PackedStructFixedWidthMiniBlockEncoder {}
74
75impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
76    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
77        match data {
78            DataBlock::Struct(struct_data_block) => {
79                let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
80
81                // transform struct datablock to fixed-width data block.
82                let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
83
84                // store and transformed fixed-width data block.
85                let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
86                let (value_miniblock_compressed, value_array_encoding) =
87                value_miniblock_compressor.compress(data_block)?;
88
89                Ok((
90                    value_miniblock_compressed,
91                    ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
92                ))
93            }
94            _ => Err(Error::invalid_input_source(format!(
95                "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
96                data.name()
97            )
98            .into())),
99        }
100    }
101}
102
103#[derive(Debug)]
104pub struct PackedStructFixedWidthMiniBlockDecompressor {
105    bits_per_values: Vec<u64>,
106    array_encoding: Box<dyn MiniBlockDecompressor>,
107}
108
109impl PackedStructFixedWidthMiniBlockDecompressor {
110    pub fn new(description: &PackedStruct) -> Self {
111        let array_encoding: Box<dyn MiniBlockDecompressor> = match description
112            .values
113            .as_ref()
114            .unwrap()
115            .compression
116            .as_ref()
117            .unwrap()
118        {
119            Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
120            _ => panic!(
121                "Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."
122            ),
123        };
124        Self {
125            bits_per_values: description.bits_per_value.clone(),
126            array_encoding,
127        }
128    }
129}
130
131impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
132    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
133        assert_eq!(data.len(), 1);
134        let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
135        let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
136            panic!("ValueDecompressor should output FixedWidth DataBlock")
137        };
138
139        let bytes_per_values = self
140            .bits_per_values
141            .iter()
142            .map(|bits_per_value| *bits_per_value as usize / 8)
143            .collect::<Vec<_>>();
144
145        assert!(encoded_data_block.bits_per_value % 8 == 0);
146        let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
147
148        // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`.
149        let mut prefix_sum = vec![0; self.bits_per_values.len()];
150        for i in 0..(self.bits_per_values.len() - 1) {
151            prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
152        }
153
154        let mut children_data_block = vec![];
155        for i in 0..self.bits_per_values.len() {
156            let child_buf_size = bytes_per_values[i] * num_values as usize;
157            let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
158
159            for j in 0..num_values as usize {
160                // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`.
161                let this_value = encoded_data_block.data.slice_with_length(
162                    prefix_sum[i] + (j * encoded_bytes_per_row),
163                    bytes_per_values[i],
164                );
165
166                child_buf.extend_from_slice(&this_value);
167            }
168
169            let child = DataBlock::FixedWidth(FixedWidthDataBlock {
170                data: LanceBuffer::from(child_buf),
171                bits_per_value: self.bits_per_values[i],
172                num_values,
173                block_info: BlockInfo::default(),
174            });
175            children_data_block.push(child);
176        }
177        Ok(DataBlock::Struct(StructDataBlock {
178            children: children_data_block,
179            block_info: BlockInfo::default(),
180            validity: None,
181        }))
182    }
183}
184
185#[derive(Debug)]
186enum VariablePackedFieldData {
187    Fixed {
188        block: FixedWidthDataBlock,
189    },
190    Variable {
191        block: VariableWidthBlock,
192        bits_per_length: u64,
193    },
194}
195
196impl VariablePackedFieldData {
197    fn append_row_bytes(&self, row_idx: usize, output: &mut Vec<u8>) -> Result<()> {
198        match self {
199            Self::Fixed { block } => {
200                let bits_per_value = block.bits_per_value;
201                if bits_per_value % 8 != 0 {
202                    return Err(Error::invalid_input(
203                        "Packed struct variable encoding requires byte-aligned fixed-width children",
204                    ));
205                }
206                let bytes_per_value = (bits_per_value / 8) as usize;
207                let start = row_idx
208                    .checked_mul(bytes_per_value)
209                    .ok_or_else(|| Error::invalid_input("Packed struct row size overflow"))?;
210                let end = start + bytes_per_value;
211                let data = block.data.as_ref();
212                if end > data.len() {
213                    return Err(Error::invalid_input(
214                        "Packed struct fixed child out of bounds",
215                    ));
216                }
217                output.extend_from_slice(&data[start..end]);
218                Ok(())
219            }
220            Self::Variable {
221                block,
222                bits_per_length,
223            } => {
224                if bits_per_length % 8 != 0 {
225                    return Err(Error::invalid_input(
226                        "Packed struct variable children must have byte-aligned length prefixes",
227                    ));
228                }
229                let prefix_bytes = (*bits_per_length / 8) as usize;
230                if !(prefix_bytes == 4 || prefix_bytes == 8) {
231                    return Err(Error::invalid_input(
232                        "Packed struct variable children must use 32 or 64-bit length prefixes",
233                    ));
234                }
235                match block.bits_per_offset {
236                    32 => {
237                        let offsets = block.offsets.borrow_to_typed_slice::<u32>();
238                        let start = offsets[row_idx] as usize;
239                        let end = offsets[row_idx + 1] as usize;
240                        if end > block.data.len() {
241                            return Err(Error::invalid_input(
242                                "Packed struct variable child offsets out of bounds",
243                            ));
244                        }
245                        let len = (end - start) as u32;
246                        if prefix_bytes != std::mem::size_of::<u32>() {
247                            return Err(Error::invalid_input(
248                                "Packed struct variable child length prefix mismatch",
249                            ));
250                        }
251                        output.extend_from_slice(&len.to_le_bytes());
252                        output.extend_from_slice(&block.data[start..end]);
253                        Ok(())
254                    }
255                    64 => {
256                        let offsets = block.offsets.borrow_to_typed_slice::<u64>();
257                        let start = offsets[row_idx] as usize;
258                        let end = offsets[row_idx + 1] as usize;
259                        if end > block.data.len() {
260                            return Err(Error::invalid_input(
261                                "Packed struct variable child offsets out of bounds",
262                            ));
263                        }
264                        let len = (end - start) as u64;
265                        if prefix_bytes != std::mem::size_of::<u64>() {
266                            return Err(Error::invalid_input(
267                                "Packed struct variable child length prefix mismatch",
268                            ));
269                        }
270                        output.extend_from_slice(&len.to_le_bytes());
271                        output.extend_from_slice(&block.data[start..end]);
272                        Ok(())
273                    }
274                    _ => Err(Error::invalid_input(
275                        "Packed struct variable child must use 32 or 64-bit offsets",
276                    )),
277                }
278            }
279        }
280    }
281}
282
283#[derive(Debug)]
284pub struct PackedStructVariablePerValueEncoder {
285    strategy: DefaultCompressionStrategy,
286    fields: Vec<Field>,
287}
288
289impl PackedStructVariablePerValueEncoder {
290    pub fn new(strategy: DefaultCompressionStrategy, fields: Vec<Field>) -> Self {
291        Self { strategy, fields }
292    }
293}
294
295impl PerValueCompressor for PackedStructVariablePerValueEncoder {
296    fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, CompressiveEncoding)> {
297        let DataBlock::Struct(struct_block) = data else {
298            return Err(Error::invalid_input(
299                "Packed struct encoder requires Struct data block",
300            ));
301        };
302
303        if struct_block.children.is_empty() {
304            return Err(Error::invalid_input(
305                "Packed struct encoder requires at least one child field",
306            ));
307        }
308        if struct_block.children.len() != self.fields.len() {
309            return Err(Error::invalid_input(
310                "Struct field metadata does not match number of children",
311            ));
312        }
313
314        let num_values = struct_block.children[0].num_values();
315        for child in struct_block.children.iter() {
316            if child.num_values() != num_values {
317                return Err(Error::invalid_input(
318                    "Packed struct children must have matching value counts",
319                ));
320            }
321        }
322
323        let mut field_data = Vec::with_capacity(self.fields.len());
324        let mut field_metadata = Vec::with_capacity(self.fields.len());
325
326        for (field, child_block) in self.fields.iter().zip(struct_block.children.into_iter()) {
327            let compressor = crate::compression::CompressionStrategy::create_per_value(
328                &self.strategy,
329                field,
330                &child_block,
331            )?;
332            let (compressed, encoding) = compressor.compress(child_block)?;
333            match compressed {
334                PerValueDataBlock::Fixed(block) => {
335                    field_metadata.push(ProtobufUtils21::packed_struct_field_fixed(
336                        encoding,
337                        block.bits_per_value,
338                    ));
339                    field_data.push(VariablePackedFieldData::Fixed { block });
340                }
341                PerValueDataBlock::Variable(block) => {
342                    let bits_per_length = block.bits_per_offset as u64;
343                    field_metadata.push(ProtobufUtils21::packed_struct_field_variable(
344                        encoding,
345                        bits_per_length,
346                    ));
347                    field_data.push(VariablePackedFieldData::Variable {
348                        block,
349                        bits_per_length,
350                    });
351                }
352            }
353        }
354
355        let mut row_data: Vec<u8> = Vec::new();
356        let mut row_offsets: Vec<u64> = Vec::with_capacity(num_values as usize + 1);
357        row_offsets.push(0);
358        let mut total_bytes: usize = 0;
359        let mut max_row_len: usize = 0;
360        for row in 0..num_values as usize {
361            let start = row_data.len();
362            for field in &field_data {
363                field.append_row_bytes(row, &mut row_data)?;
364            }
365            let end = row_data.len();
366            let row_len = end - start;
367            max_row_len = max_row_len.max(row_len);
368            total_bytes = total_bytes
369                .checked_add(row_len)
370                .ok_or_else(|| Error::invalid_input("Packed struct row data size overflow"))?;
371            row_offsets.push(end as u64);
372        }
373        debug_assert_eq!(total_bytes, row_data.len());
374
375        let use_u32_offsets = total_bytes <= u32::MAX as usize && max_row_len <= u32::MAX as usize;
376        let bits_per_offset = if use_u32_offsets { 32 } else { 64 };
377        let offsets_buffer = if use_u32_offsets {
378            let offsets_u32 = row_offsets
379                .iter()
380                .map(|&offset| offset as u32)
381                .collect::<Vec<_>>();
382            LanceBuffer::reinterpret_vec(offsets_u32)
383        } else {
384            LanceBuffer::reinterpret_vec(row_offsets)
385        };
386
387        let data_block = VariableWidthBlock {
388            data: LanceBuffer::from(row_data),
389            bits_per_offset,
390            offsets: offsets_buffer,
391            num_values,
392            block_info: BlockInfo::new(),
393        };
394
395        Ok((
396            PerValueDataBlock::Variable(data_block),
397            ProtobufUtils21::packed_struct_variable(field_metadata),
398        ))
399    }
400}
401
402#[derive(Debug)]
403pub(crate) enum VariablePackedStructFieldKind {
404    Fixed {
405        bits_per_value: u64,
406        decompressor: Arc<dyn FixedPerValueDecompressor>,
407    },
408    Variable {
409        bits_per_length: u64,
410        decompressor: Arc<dyn VariablePerValueDecompressor>,
411    },
412}
413
414#[derive(Debug)]
415pub(crate) struct VariablePackedStructFieldDecoder {
416    pub(crate) kind: VariablePackedStructFieldKind,
417}
418
419#[derive(Debug)]
420pub struct PackedStructVariablePerValueDecompressor {
421    fields: Vec<VariablePackedStructFieldDecoder>,
422}
423
424impl PackedStructVariablePerValueDecompressor {
425    pub(crate) fn new(fields: Vec<VariablePackedStructFieldDecoder>) -> Self {
426        Self { fields }
427    }
428}
429
430enum FieldAccumulator {
431    Fixed {
432        builder: DataBlockBuilder,
433        bits_per_value: u64,
434        empty_value: DataBlock,
435    },
436    Variable32 {
437        builder: DataBlockBuilder,
438        empty_value: DataBlock,
439    },
440    Variable64 {
441        builder: DataBlockBuilder,
442        empty_value: DataBlock,
443    },
444}
445
446impl FieldAccumulator {
447    // In full-zip variable packed decoding, rep/def may produce a visible row
448    // with an empty payload (e.g. null/invalid item). We still need to append
449    // one placeholder per child so child row counts remain aligned.
450    fn append_empty(&mut self) {
451        match self {
452            Self::Fixed {
453                builder,
454                empty_value,
455                ..
456            } => builder.append(empty_value, 0..1),
457            Self::Variable32 {
458                builder,
459                empty_value,
460            } => builder.append(empty_value, 0..1),
461            Self::Variable64 {
462                builder,
463                empty_value,
464            } => builder.append(empty_value, 0..1),
465        }
466    }
467}
468
469impl VariablePerValueDecompressor for PackedStructVariablePerValueDecompressor {
470    fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
471        let num_values = data.num_values;
472        let offsets_u64 = match data.bits_per_offset {
473            32 => data
474                .offsets
475                .borrow_to_typed_slice::<u32>()
476                .iter()
477                .map(|v| *v as u64)
478                .collect::<Vec<_>>(),
479            64 => data
480                .offsets
481                .borrow_to_typed_slice::<u64>()
482                .as_ref()
483                .to_vec(),
484            _ => {
485                return Err(Error::invalid_input(
486                    "Packed struct row offsets must be 32 or 64 bits",
487                ));
488            }
489        };
490
491        if offsets_u64.len() != num_values as usize + 1 {
492            return Err(Error::invalid_input(
493                "Packed struct row offsets length mismatch",
494            ));
495        }
496
497        let mut accumulators = Vec::with_capacity(self.fields.len());
498        for field in &self.fields {
499            match &field.kind {
500                VariablePackedStructFieldKind::Fixed { bits_per_value, .. } => {
501                    if bits_per_value % 8 != 0 {
502                        return Err(Error::invalid_input(
503                            "Packed struct fixed child must be byte-aligned",
504                        ));
505                    }
506                    let bytes_per_value = bits_per_value.checked_div(8).ok_or_else(|| {
507                        Error::invalid_input("Invalid bits per value for packed struct field")
508                    })?;
509                    let estimate = bytes_per_value.checked_mul(num_values).ok_or_else(|| {
510                        Error::invalid_input("Packed struct fixed child allocation overflow")
511                    })?;
512                    let empty_value = DataBlock::FixedWidth(FixedWidthDataBlock {
513                        data: LanceBuffer::from(vec![0_u8; bytes_per_value as usize]),
514                        bits_per_value: *bits_per_value,
515                        num_values: 1,
516                        block_info: BlockInfo::new(),
517                    });
518                    accumulators.push(FieldAccumulator::Fixed {
519                        builder: DataBlockBuilder::with_capacity_estimate(estimate),
520                        bits_per_value: *bits_per_value,
521                        empty_value,
522                    });
523                }
524                VariablePackedStructFieldKind::Variable {
525                    bits_per_length, ..
526                } => match bits_per_length {
527                    32 => accumulators.push(FieldAccumulator::Variable32 {
528                        builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
529                        empty_value: DataBlock::VariableWidth(VariableWidthBlock {
530                            data: LanceBuffer::empty(),
531                            bits_per_offset: 32,
532                            offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 0_u32]),
533                            num_values: 1,
534                            block_info: BlockInfo::new(),
535                        }),
536                    }),
537                    64 => accumulators.push(FieldAccumulator::Variable64 {
538                        builder: DataBlockBuilder::with_capacity_estimate(data.data.len() as u64),
539                        empty_value: DataBlock::VariableWidth(VariableWidthBlock {
540                            data: LanceBuffer::empty(),
541                            bits_per_offset: 64,
542                            offsets: LanceBuffer::reinterpret_vec(vec![0_u64, 0_u64]),
543                            num_values: 1,
544                            block_info: BlockInfo::new(),
545                        }),
546                    }),
547                    _ => {
548                        return Err(Error::invalid_input(
549                            "Packed struct variable child must use 32 or 64-bit length prefixes",
550                        ));
551                    }
552                },
553            }
554        }
555
556        for row_idx in 0..num_values as usize {
557            let row_start = offsets_u64[row_idx] as usize;
558            let row_end = offsets_u64[row_idx + 1] as usize;
559            if row_end > data.data.len() || row_start > row_end {
560                return Err(Error::invalid_input(
561                    "Packed struct row bounds exceed buffer",
562                ));
563            }
564            if row_start == row_end {
565                for accumulator in accumulators.iter_mut() {
566                    accumulator.append_empty();
567                }
568                continue;
569            }
570            let mut cursor = row_start;
571            for (field, accumulator) in self.fields.iter().zip(accumulators.iter_mut()) {
572                match (&field.kind, accumulator) {
573                    (
574                        VariablePackedStructFieldKind::Fixed { bits_per_value, .. },
575                        FieldAccumulator::Fixed {
576                            builder,
577                            bits_per_value: acc_bits,
578                            ..
579                        },
580                    ) => {
581                        debug_assert_eq!(bits_per_value, acc_bits);
582                        let bytes_per_value = (bits_per_value / 8) as usize;
583                        let end = cursor + bytes_per_value;
584                        if end > row_end {
585                            return Err(Error::invalid_input(
586                                "Packed struct fixed child exceeds row bounds",
587                            ));
588                        }
589                        let value_block = DataBlock::FixedWidth(FixedWidthDataBlock {
590                            data: LanceBuffer::from(data.data[cursor..end].to_vec()),
591                            bits_per_value: *bits_per_value,
592                            num_values: 1,
593                            block_info: BlockInfo::new(),
594                        });
595                        builder.append(&value_block, 0..1);
596                        cursor = end;
597                    }
598                    (
599                        VariablePackedStructFieldKind::Variable {
600                            bits_per_length, ..
601                        },
602                        FieldAccumulator::Variable32 { builder, .. },
603                    ) => {
604                        if *bits_per_length != 32 {
605                            return Err(Error::invalid_input(
606                                "Packed struct length prefix size mismatch",
607                            ));
608                        }
609                        let end = cursor + std::mem::size_of::<u32>();
610                        if end > row_end {
611                            return Err(Error::invalid_input(
612                                "Packed struct variable child length prefix out of bounds",
613                            ));
614                        }
615                        let len = u32::from_le_bytes(
616                            data.data[cursor..end]
617                                .try_into()
618                                .expect("slice has exact length"),
619                        ) as usize;
620                        cursor = end;
621                        let value_end = cursor + len;
622                        if value_end > row_end {
623                            return Err(Error::invalid_input(
624                                "Packed struct variable child exceeds row bounds",
625                            ));
626                        }
627                        let value_block = DataBlock::VariableWidth(VariableWidthBlock {
628                            data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
629                            bits_per_offset: 32,
630                            offsets: LanceBuffer::reinterpret_vec(vec![0_u32, len as u32]),
631                            num_values: 1,
632                            block_info: BlockInfo::new(),
633                        });
634                        builder.append(&value_block, 0..1);
635                        cursor = value_end;
636                    }
637                    (
638                        VariablePackedStructFieldKind::Variable {
639                            bits_per_length, ..
640                        },
641                        FieldAccumulator::Variable64 { builder, .. },
642                    ) => {
643                        if *bits_per_length != 64 {
644                            return Err(Error::invalid_input(
645                                "Packed struct length prefix size mismatch",
646                            ));
647                        }
648                        let end = cursor + std::mem::size_of::<u64>();
649                        if end > row_end {
650                            return Err(Error::invalid_input(
651                                "Packed struct variable child length prefix out of bounds",
652                            ));
653                        }
654                        let len = u64::from_le_bytes(
655                            data.data[cursor..end]
656                                .try_into()
657                                .expect("slice has exact length"),
658                        ) as usize;
659                        cursor = end;
660                        let value_end = cursor + len;
661                        if value_end > row_end {
662                            return Err(Error::invalid_input(
663                                "Packed struct variable child exceeds row bounds",
664                            ));
665                        }
666                        let value_block = DataBlock::VariableWidth(VariableWidthBlock {
667                            data: LanceBuffer::from(data.data[cursor..value_end].to_vec()),
668                            bits_per_offset: 64,
669                            offsets: LanceBuffer::reinterpret_vec(vec![0_u64, len as u64]),
670                            num_values: 1,
671                            block_info: BlockInfo::new(),
672                        });
673                        builder.append(&value_block, 0..1);
674                        cursor = value_end;
675                    }
676                    _ => {
677                        return Err(Error::invalid_input(
678                            "Packed struct accumulator kind mismatch",
679                        ));
680                    }
681                }
682            }
683            if cursor != row_end {
684                return Err(Error::invalid_input(
685                    "Packed struct row parsing did not consume full row",
686                ));
687            }
688        }
689
690        let mut children = Vec::with_capacity(self.fields.len());
691        for (field, accumulator) in self.fields.iter().zip(accumulators.into_iter()) {
692            match (field, accumulator) {
693                (
694                    VariablePackedStructFieldDecoder {
695                        kind: VariablePackedStructFieldKind::Fixed { decompressor, .. },
696                    },
697                    FieldAccumulator::Fixed { builder, .. },
698                ) => {
699                    let DataBlock::FixedWidth(block) = builder.finish() else {
700                        panic!("Expected fixed-width datablock from builder");
701                    };
702                    let decoded = decompressor.decompress(block, num_values)?;
703                    children.push(decoded);
704                }
705                (
706                    VariablePackedStructFieldDecoder {
707                        kind:
708                            VariablePackedStructFieldKind::Variable {
709                                bits_per_length,
710                                decompressor,
711                            },
712                    },
713                    FieldAccumulator::Variable32 { builder, .. },
714                ) => {
715                    let DataBlock::VariableWidth(mut block) = builder.finish() else {
716                        panic!("Expected variable-width datablock from builder");
717                    };
718                    debug_assert_eq!(block.bits_per_offset, 32);
719                    block.bits_per_offset = (*bits_per_length) as u8;
720                    let decoded = decompressor.decompress(block)?;
721                    children.push(decoded);
722                }
723                (
724                    VariablePackedStructFieldDecoder {
725                        kind:
726                            VariablePackedStructFieldKind::Variable {
727                                bits_per_length,
728                                decompressor,
729                            },
730                    },
731                    FieldAccumulator::Variable64 { builder, .. },
732                ) => {
733                    let DataBlock::VariableWidth(mut block) = builder.finish() else {
734                        panic!("Expected variable-width datablock from builder");
735                    };
736                    debug_assert_eq!(block.bits_per_offset, 64);
737                    block.bits_per_offset = (*bits_per_length) as u8;
738                    let decoded = decompressor.decompress(block)?;
739                    children.push(decoded);
740                }
741                _ => {
742                    return Err(Error::invalid_input(
743                        "Packed struct accumulator mismatch during finalize",
744                    ));
745                }
746            }
747        }
748
749        Ok(DataBlock::Struct(StructDataBlock {
750            children,
751            block_info: BlockInfo::new(),
752            validity: None,
753        }))
754    }
755}
756
757#[cfg(test)]
758mod tests {
759    use super::*;
760    use crate::{
761        compression::CompressionStrategy,
762        compression::{DefaultCompressionStrategy, DefaultDecompressionStrategy},
763        constants::PACKED_STRUCT_META_KEY,
764        statistics::ComputeStat,
765        testing::{TestCases, check_round_trip_encoding_of_data},
766        version::LanceFileVersion,
767    };
768    use arrow_array::{
769        Array, ArrayRef, BinaryArray, Int32Array, Int64Array, LargeStringArray, StringArray,
770        StructArray, UInt32Array,
771    };
772    use arrow_schema::{DataType, Field as ArrowField, Fields};
773    use std::collections::HashMap;
774    use std::sync::Arc;
775
776    fn fixed_block_from_array(array: Int64Array) -> FixedWidthDataBlock {
777        let num_values = array.len() as u64;
778        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
779        match block {
780            DataBlock::FixedWidth(block) => block,
781            _ => panic!("Expected fixed-width data block"),
782        }
783    }
784
785    fn fixed_i32_block_from_array(array: Int32Array) -> FixedWidthDataBlock {
786        let num_values = array.len() as u64;
787        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
788        match block {
789            DataBlock::FixedWidth(block) => block,
790            _ => panic!("Expected fixed-width data block"),
791        }
792    }
793
794    fn variable_block_from_string_array(array: StringArray) -> VariableWidthBlock {
795        let num_values = array.len() as u64;
796        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
797        match block {
798            DataBlock::VariableWidth(block) => block,
799            _ => panic!("Expected variable-width block"),
800        }
801    }
802
803    fn variable_block_from_large_string_array(array: LargeStringArray) -> VariableWidthBlock {
804        let num_values = array.len() as u64;
805        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
806        match block {
807            DataBlock::VariableWidth(block) => block,
808            _ => panic!("Expected variable-width block"),
809        }
810    }
811
812    fn variable_block_from_binary_array(array: BinaryArray) -> VariableWidthBlock {
813        let num_values = array.len() as u64;
814        let block = DataBlock::from_arrays(&[Arc::new(array) as ArrayRef], num_values);
815        match block {
816            DataBlock::VariableWidth(block) => block,
817            _ => panic!("Expected variable-width block"),
818        }
819    }
820
821    #[test]
822    fn variable_packed_struct_round_trip() -> Result<()> {
823        let arrow_fields: Fields = vec![
824            ArrowField::new("id", DataType::UInt32, false),
825            ArrowField::new("name", DataType::Utf8, true),
826        ]
827        .into();
828        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
829        let struct_field = Field::try_from(&arrow_struct)?;
830
831        let ids = vec![1_u32, 2, 42];
832        let id_bytes = ids
833            .iter()
834            .flat_map(|value| value.to_le_bytes())
835            .collect::<Vec<_>>();
836        let mut id_block = FixedWidthDataBlock {
837            data: LanceBuffer::reinterpret_vec(ids),
838            bits_per_value: 32,
839            num_values: 3,
840            block_info: BlockInfo::new(),
841        };
842        id_block.compute_stat();
843        let id_block = DataBlock::FixedWidth(id_block);
844
845        let name_offsets = vec![0_i32, 1, 4, 4];
846        let name_bytes = b"abcz".to_vec();
847        let mut name_block = VariableWidthBlock {
848            data: LanceBuffer::from(name_bytes.clone()),
849            bits_per_offset: 32,
850            offsets: LanceBuffer::reinterpret_vec(name_offsets.clone()),
851            num_values: 3,
852            block_info: BlockInfo::new(),
853        };
854        name_block.compute_stat();
855        let name_block = DataBlock::VariableWidth(name_block);
856
857        let struct_block = StructDataBlock {
858            children: vec![id_block, name_block],
859            block_info: BlockInfo::new(),
860            validity: None,
861        };
862
863        let data_block = DataBlock::Struct(struct_block);
864
865        let compression_strategy =
866            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
867        let compressor = crate::compression::CompressionStrategy::create_per_value(
868            &compression_strategy,
869            &struct_field,
870            &data_block,
871        )?;
872        let (compressed, encoding) = compressor.compress(data_block)?;
873
874        let PerValueDataBlock::Variable(zipped) = compressed else {
875            panic!("expected variable-width packed struct output");
876        };
877
878        let decompression_strategy = DefaultDecompressionStrategy::default();
879        let decompressor =
880            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
881                &decompression_strategy,
882                &encoding,
883            )?;
884        let decoded = decompressor.decompress(zipped)?;
885
886        let DataBlock::Struct(decoded_struct) = decoded else {
887            panic!("expected struct datablock after decode");
888        };
889
890        let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
891        assert_eq!(decoded_id.bits_per_value, 32);
892        assert_eq!(decoded_id.data.as_ref(), id_bytes.as_slice());
893
894        let decoded_name = decoded_struct.children[1].as_variable_width_ref().unwrap();
895        assert_eq!(decoded_name.bits_per_offset, 32);
896        let decoded_offsets = decoded_name.offsets.borrow_to_typed_slice::<i32>();
897        assert_eq!(decoded_offsets.as_ref(), name_offsets.as_slice());
898        assert_eq!(decoded_name.data.as_ref(), name_bytes.as_slice());
899
900        Ok(())
901    }
902
903    #[test]
904    fn variable_packed_struct_large_utf8_round_trip() -> Result<()> {
905        let arrow_fields: Fields = vec![
906            ArrowField::new("value", DataType::Int64, false),
907            ArrowField::new("text", DataType::LargeUtf8, false),
908        ]
909        .into();
910        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
911        let struct_field = Field::try_from(&arrow_struct)?;
912
913        let id_block = fixed_block_from_array(Int64Array::from(vec![10, 20, 30, 40]));
914        let payload_array = LargeStringArray::from(vec![
915            "alpha",
916            "a considerably longer payload for testing",
917            "mid",
918            "z",
919        ]);
920        let payload_block = variable_block_from_large_string_array(payload_array);
921
922        let struct_block = StructDataBlock {
923            children: vec![
924                DataBlock::FixedWidth(id_block.clone()),
925                DataBlock::VariableWidth(payload_block.clone()),
926            ],
927            block_info: BlockInfo::new(),
928            validity: None,
929        };
930
931        let data_block = DataBlock::Struct(struct_block);
932
933        let compression_strategy =
934            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
935        let compressor = crate::compression::CompressionStrategy::create_per_value(
936            &compression_strategy,
937            &struct_field,
938            &data_block,
939        )?;
940        let (compressed, encoding) = compressor.compress(data_block)?;
941
942        let PerValueDataBlock::Variable(zipped) = compressed else {
943            panic!("expected variable-width packed struct output");
944        };
945
946        let decompression_strategy = DefaultDecompressionStrategy::default();
947        let decompressor =
948            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
949                &decompression_strategy,
950                &encoding,
951            )?;
952        let decoded = decompressor.decompress(zipped)?;
953
954        let DataBlock::Struct(decoded_struct) = decoded else {
955            panic!("expected struct datablock after decode");
956        };
957
958        let decoded_id = decoded_struct.children[0].as_fixed_width_ref().unwrap();
959        assert_eq!(decoded_id.bits_per_value, 64);
960        assert_eq!(decoded_id.data.as_ref(), id_block.data.as_ref());
961
962        let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
963        assert_eq!(decoded_payload.bits_per_offset, 64);
964        assert_eq!(
965            decoded_payload
966                .offsets
967                .borrow_to_typed_slice::<i64>()
968                .as_ref(),
969            payload_block
970                .offsets
971                .borrow_to_typed_slice::<i64>()
972                .as_ref()
973        );
974        assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
975
976        Ok(())
977    }
978
979    #[tokio::test]
980    async fn variable_packed_struct_utf8_round_trip() {
981        // schema: Struct<id: UInt32, uri: Utf8, long_text: LargeUtf8>
982        let fields = Fields::from(vec![
983            Arc::new(ArrowField::new("id", DataType::UInt32, false)),
984            Arc::new(ArrowField::new("uri", DataType::Utf8, false)),
985            Arc::new(ArrowField::new("long_text", DataType::LargeUtf8, false)),
986        ]);
987
988        // mark struct as packed
989        let mut meta = HashMap::new();
990        meta.insert(PACKED_STRUCT_META_KEY.to_string(), "true".to_string());
991
992        let array = Arc::new(StructArray::from(vec![
993            (
994                fields[0].clone(),
995                Arc::new(UInt32Array::from(vec![1, 2, 3])) as ArrayRef,
996            ),
997            (
998                fields[1].clone(),
999                Arc::new(StringArray::from(vec![
1000                    Some("a"),
1001                    Some("b"),
1002                    Some("/tmp/x"),
1003                ])) as ArrayRef,
1004            ),
1005            (
1006                fields[2].clone(),
1007                Arc::new(LargeStringArray::from(vec![
1008                    Some("alpha"),
1009                    Some("a considerably longer payload for testing"),
1010                    Some("mid"),
1011                ])) as ArrayRef,
1012            ),
1013        ]));
1014
1015        let test_cases = TestCases::default()
1016            .with_min_file_version(LanceFileVersion::V2_2)
1017            .with_expected_encoding("variable_packed_struct");
1018
1019        check_round_trip_encoding_of_data(vec![array], &test_cases, meta).await;
1020    }
1021
1022    #[test]
1023    fn variable_packed_struct_multi_variable_round_trip() -> Result<()> {
1024        let arrow_fields: Fields = vec![
1025            ArrowField::new("category", DataType::Utf8, false),
1026            ArrowField::new("payload", DataType::Binary, false),
1027            ArrowField::new("count", DataType::Int32, false),
1028        ]
1029        .into();
1030        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1031        let struct_field = Field::try_from(&arrow_struct)?;
1032
1033        let category_array = StringArray::from(vec!["red", "blue", "green", "red"]);
1034        let category_block = variable_block_from_string_array(category_array);
1035        let payload_values: Vec<Vec<u8>> =
1036            vec![vec![0x01, 0x02], vec![], vec![0x05, 0x06, 0x07], vec![0xff]];
1037        let payload_array =
1038            BinaryArray::from_iter_values(payload_values.iter().map(|v| v.as_slice()));
1039        let payload_block = variable_block_from_binary_array(payload_array);
1040        let count_block = fixed_i32_block_from_array(Int32Array::from(vec![1, 2, 3, 4]));
1041
1042        let struct_block = StructDataBlock {
1043            children: vec![
1044                DataBlock::VariableWidth(category_block.clone()),
1045                DataBlock::VariableWidth(payload_block.clone()),
1046                DataBlock::FixedWidth(count_block.clone()),
1047            ],
1048            block_info: BlockInfo::new(),
1049            validity: None,
1050        };
1051
1052        let data_block = DataBlock::Struct(struct_block);
1053
1054        let compression_strategy =
1055            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_2);
1056        let compressor = crate::compression::CompressionStrategy::create_per_value(
1057            &compression_strategy,
1058            &struct_field,
1059            &data_block,
1060        )?;
1061        let (compressed, encoding) = compressor.compress(data_block)?;
1062
1063        let PerValueDataBlock::Variable(zipped) = compressed else {
1064            panic!("expected variable-width packed struct output");
1065        };
1066
1067        let decompression_strategy = DefaultDecompressionStrategy::default();
1068        let decompressor =
1069            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
1070                &decompression_strategy,
1071                &encoding,
1072            )?;
1073        let decoded = decompressor.decompress(zipped)?;
1074
1075        let DataBlock::Struct(decoded_struct) = decoded else {
1076            panic!("expected struct datablock after decode");
1077        };
1078
1079        let decoded_category = decoded_struct.children[0].as_variable_width_ref().unwrap();
1080        assert_eq!(decoded_category.bits_per_offset, 32);
1081        assert_eq!(
1082            decoded_category
1083                .offsets
1084                .borrow_to_typed_slice::<i32>()
1085                .as_ref(),
1086            category_block
1087                .offsets
1088                .borrow_to_typed_slice::<i32>()
1089                .as_ref()
1090        );
1091        assert_eq!(decoded_category.data.as_ref(), category_block.data.as_ref());
1092
1093        let decoded_payload = decoded_struct.children[1].as_variable_width_ref().unwrap();
1094        assert_eq!(decoded_payload.bits_per_offset, 32);
1095        assert_eq!(
1096            decoded_payload
1097                .offsets
1098                .borrow_to_typed_slice::<i32>()
1099                .as_ref(),
1100            payload_block
1101                .offsets
1102                .borrow_to_typed_slice::<i32>()
1103                .as_ref()
1104        );
1105        assert_eq!(decoded_payload.data.as_ref(), payload_block.data.as_ref());
1106
1107        let decoded_count = decoded_struct.children[2].as_fixed_width_ref().unwrap();
1108        assert_eq!(decoded_count.bits_per_value, 32);
1109        assert_eq!(decoded_count.data.as_ref(), count_block.data.as_ref());
1110
1111        Ok(())
1112    }
1113
1114    #[test]
1115    fn variable_packed_struct_requires_v22() {
1116        let arrow_fields: Fields = vec![
1117            ArrowField::new("value", DataType::Int64, false),
1118            ArrowField::new("text", DataType::Utf8, false),
1119        ]
1120        .into();
1121        let arrow_struct = ArrowField::new("item", DataType::Struct(arrow_fields), false);
1122        let struct_field = Field::try_from(&arrow_struct).unwrap();
1123
1124        let value_block = fixed_block_from_array(Int64Array::from(vec![1, 2, 3]));
1125        let text_block =
1126            variable_block_from_string_array(StringArray::from(vec!["a", "bb", "ccc"]));
1127
1128        let struct_block = StructDataBlock {
1129            children: vec![
1130                DataBlock::FixedWidth(value_block),
1131                DataBlock::VariableWidth(text_block),
1132            ],
1133            block_info: BlockInfo::new(),
1134            validity: None,
1135        };
1136
1137        let compression_strategy =
1138            DefaultCompressionStrategy::new().with_version(LanceFileVersion::V2_1);
1139        let result =
1140            compression_strategy.create_per_value(&struct_field, &DataBlock::Struct(struct_block));
1141
1142        assert!(matches!(result, Err(Error::NotSupported { .. })));
1143    }
1144
1145    #[test]
1146    fn variable_packed_struct_decompress_empty_row() -> Result<()> {
1147        let strategy = DefaultDecompressionStrategy::default();
1148        let fixed_decompressor = Arc::from(
1149            crate::compression::DecompressionStrategy::create_fixed_per_value_decompressor(
1150                &strategy,
1151                &ProtobufUtils21::flat(32, None),
1152            )?,
1153        );
1154        let variable_decompressor = Arc::from(
1155            crate::compression::DecompressionStrategy::create_variable_per_value_decompressor(
1156                &strategy,
1157                &ProtobufUtils21::variable(ProtobufUtils21::flat(32, None), None),
1158            )?,
1159        );
1160
1161        let decompressor = PackedStructVariablePerValueDecompressor::new(vec![
1162            VariablePackedStructFieldDecoder {
1163                kind: VariablePackedStructFieldKind::Fixed {
1164                    bits_per_value: 32,
1165                    decompressor: fixed_decompressor,
1166                },
1167            },
1168            VariablePackedStructFieldDecoder {
1169                kind: VariablePackedStructFieldKind::Variable {
1170                    bits_per_length: 32,
1171                    decompressor: variable_decompressor,
1172                },
1173            },
1174        ]);
1175
1176        let mut row_data = Vec::new();
1177        row_data.extend_from_slice(&1_u32.to_le_bytes());
1178        row_data.extend_from_slice(&1_u32.to_le_bytes());
1179        row_data.extend_from_slice(b"a");
1180        row_data.extend_from_slice(&2_u32.to_le_bytes());
1181        row_data.extend_from_slice(&0_u32.to_le_bytes());
1182
1183        let input = VariableWidthBlock {
1184            data: LanceBuffer::from(row_data),
1185            bits_per_offset: 32,
1186            offsets: LanceBuffer::reinterpret_vec(vec![0_u32, 9_u32, 9_u32, 17_u32]),
1187            num_values: 3,
1188            block_info: BlockInfo::new(),
1189        };
1190
1191        let decoded = decompressor.decompress(input)?;
1192        let DataBlock::Struct(decoded_struct) = decoded else {
1193            panic!("expected struct output");
1194        };
1195
1196        let fixed = decoded_struct.children[0].as_fixed_width_ref().unwrap();
1197        assert_eq!(fixed.bits_per_value, 32);
1198        assert_eq!(
1199            fixed.data.borrow_to_typed_slice::<u32>().as_ref(),
1200            &[1, 0, 2]
1201        );
1202
1203        let variable = decoded_struct.children[1].as_variable_width_ref().unwrap();
1204        assert_eq!(variable.bits_per_offset, 32);
1205        assert_eq!(
1206            variable.offsets.borrow_to_typed_slice::<u32>().as_ref(),
1207            &[0_u32, 1_u32, 1_u32, 1_u32]
1208        );
1209        assert_eq!(variable.data.as_ref(), b"a");
1210
1211        Ok(())
1212    }
1213}