lance_encoding/encodings/physical/
packed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Packed encoding
5//!
6//! These encodings take struct data and compress it in a way that all fields are collected
7//! together.
8//!
9//! This encoding can be transparent or opaque.  In order to be transparent we must use transparent
10//! compression on all children.  Then we can zip together the compressed children.
11
12use arrow::datatypes::UInt64Type;
13
14use lance_core::{Error, Result};
15use snafu::location;
16
17use crate::{
18    buffer::LanceBuffer,
19    compression::MiniBlockDecompressor,
20    data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
21    encodings::logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
22    format::{
23        pb::{self},
24        ProtobufUtils,
25    },
26    statistics::{GetStat, Stat},
27};
28
29use super::value::{ValueDecompressor, ValueEncoder};
30
31// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`.
32// Only fields with fixed-width fields are supported for now, and the
33// assumption that all fields has `bits_per_value % 8 == 0` is made.
34fn struct_data_block_to_fixed_width_data_block(
35    struct_data_block: StructDataBlock,
36    bits_per_values: &[u32],
37) -> DataBlock {
38    let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
39    let mut output = Vec::with_capacity(data_size as usize);
40    let num_values = struct_data_block.children[0].num_values();
41
42    for i in 0..num_values as usize {
43        for (j, child) in struct_data_block.children.iter().enumerate() {
44            let bytes_per_value = (bits_per_values[j] / 8) as usize;
45            let this_data = child
46                .as_fixed_width_ref()
47                .unwrap()
48                .data
49                .slice_with_length(bytes_per_value * i, bytes_per_value);
50            output.extend_from_slice(&this_data);
51        }
52    }
53
54    DataBlock::FixedWidth(FixedWidthDataBlock {
55        bits_per_value: bits_per_values
56            .iter()
57            .map(|bits_per_value| *bits_per_value as u64)
58            .sum(),
59        data: LanceBuffer::Owned(output),
60        num_values,
61        block_info: BlockInfo::default(),
62    })
63}
64
65#[derive(Debug, Default)]
66pub struct PackedStructFixedWidthMiniBlockEncoder {}
67
68impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
69    fn compress(
70        &self,
71        data: DataBlock,
72    ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
73        match data {
74            DataBlock::Struct(struct_data_block) => {
75                let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::<Vec<_>>();
76
77                // transform struct datablock to fixed-width data block.
78                let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
79
80                // store and transformed fixed-width data block.
81                let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
82                let (value_miniblock_compressed, value_array_encoding) =
83                value_miniblock_compressor.compress(data_block)?;
84
85                Ok((
86                    value_miniblock_compressed,
87                    ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values),
88                ))
89            }
90            _ => Err(Error::InvalidInput {
91                source: format!(
92                    "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
93                    data.name()
94                )
95                .into(),
96                location: location!(),
97            }),
98        }
99    }
100}
101
102#[derive(Debug)]
103pub struct PackedStructFixedWidthMiniBlockDecompressor {
104    bits_per_values: Vec<u32>,
105    array_encoding: Box<dyn MiniBlockDecompressor>,
106}
107
108impl PackedStructFixedWidthMiniBlockDecompressor {
109    pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self {
110        let array_encoding: Box<dyn MiniBlockDecompressor> = match description
111            .flat
112            .as_ref()
113            .unwrap()
114            .array_encoding
115            .as_ref()
116            .unwrap()
117        {
118            pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
119            _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
120        };
121        Self {
122            bits_per_values: description.bits_per_values.clone(),
123            array_encoding,
124        }
125    }
126}
127
128impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
129    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
130        assert_eq!(data.len(), 1);
131        let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
132        let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
133            panic!("ValueDecompressor should output FixedWidth DataBlock")
134        };
135
136        let bytes_per_values = self
137            .bits_per_values
138            .iter()
139            .map(|bits_per_value| *bits_per_value as usize / 8)
140            .collect::<Vec<_>>();
141
142        assert!(encoded_data_block.bits_per_value % 8 == 0);
143        let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
144
145        // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`.
146        let mut prefix_sum = vec![0; self.bits_per_values.len()];
147        for i in 0..(self.bits_per_values.len() - 1) {
148            prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
149        }
150
151        let mut children_data_block = vec![];
152        for i in 0..self.bits_per_values.len() {
153            let child_buf_size = bytes_per_values[i] * num_values as usize;
154            let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
155
156            for j in 0..num_values as usize {
157                // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`.
158                let this_value = encoded_data_block.data.slice_with_length(
159                    prefix_sum[i] + (j * encoded_bytes_per_row),
160                    bytes_per_values[i],
161                );
162
163                child_buf.extend_from_slice(&this_value);
164            }
165
166            let child = DataBlock::FixedWidth(FixedWidthDataBlock {
167                data: LanceBuffer::Owned(child_buf),
168                bits_per_value: self.bits_per_values[i] as u64,
169                num_values,
170                block_info: BlockInfo::default(),
171            });
172            children_data_block.push(child);
173        }
174        Ok(DataBlock::Struct(StructDataBlock {
175            children: children_data_block,
176            block_info: BlockInfo::default(),
177        }))
178    }
179}