lance_encoding/encodings/physical/
packed.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Packed encoding
5//!
6//! These encodings take struct data and compress it in a way that all fields are collected
7//! together.
8//!
9//! This encoding can be transparent or opaque.  In order to be transparent we must use transparent
10//! compression on all children.  Then we can zip together the compressed children.
11
12use arrow_array::types::UInt64Type;
13
14use lance_core::{Error, Result};
15use snafu::location;
16
17use crate::{
18    buffer::LanceBuffer,
19    compression::MiniBlockDecompressor,
20    data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
21    encodings::logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
22    format::{
23        pb21::{compressive_encoding::Compression, CompressiveEncoding, PackedStruct},
24        ProtobufUtils21,
25    },
26    statistics::{GetStat, Stat},
27};
28
29use super::value::{ValueDecompressor, ValueEncoder};
30
31// Transforms a `StructDataBlock` into a row major `FixedWidthDataBlock`.
32// Only fields with fixed-width fields are supported for now, and the
33// assumption that all fields has `bits_per_value % 8 == 0` is made.
34fn struct_data_block_to_fixed_width_data_block(
35    struct_data_block: StructDataBlock,
36    bits_per_values: &[u64],
37) -> DataBlock {
38    let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
39    let mut output = Vec::with_capacity(data_size as usize);
40    let num_values = struct_data_block.children[0].num_values();
41
42    for i in 0..num_values as usize {
43        for (j, child) in struct_data_block.children.iter().enumerate() {
44            let bytes_per_value = (bits_per_values[j] / 8) as usize;
45            let this_data = child
46                .as_fixed_width_ref()
47                .unwrap()
48                .data
49                .slice_with_length(bytes_per_value * i, bytes_per_value);
50            output.extend_from_slice(&this_data);
51        }
52    }
53
54    DataBlock::FixedWidth(FixedWidthDataBlock {
55        bits_per_value: bits_per_values.iter().copied().sum(),
56        data: LanceBuffer::from(output),
57        num_values,
58        block_info: BlockInfo::default(),
59    })
60}
61
62#[derive(Debug, Default)]
63pub struct PackedStructFixedWidthMiniBlockEncoder {}
64
65impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
66    fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
67        match data {
68            DataBlock::Struct(struct_data_block) => {
69                let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
70
71                // transform struct datablock to fixed-width data block.
72                let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
73
74                // store and transformed fixed-width data block.
75                let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
76                let (value_miniblock_compressed, value_array_encoding) =
77                value_miniblock_compressor.compress(data_block)?;
78
79                Ok((
80                    value_miniblock_compressed,
81                    ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
82                ))
83            }
84            _ => Err(Error::InvalidInput {
85                source: format!(
86                    "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
87                    data.name()
88                )
89                .into(),
90                location: location!(),
91            }),
92        }
93    }
94}
95
96#[derive(Debug)]
97pub struct PackedStructFixedWidthMiniBlockDecompressor {
98    bits_per_values: Vec<u64>,
99    array_encoding: Box<dyn MiniBlockDecompressor>,
100}
101
102impl PackedStructFixedWidthMiniBlockDecompressor {
103    pub fn new(description: &PackedStruct) -> Self {
104        let array_encoding: Box<dyn MiniBlockDecompressor> = match description.values.as_ref().unwrap().compression.as_ref().unwrap() {
105            Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
106            _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
107        };
108        Self {
109            bits_per_values: description.bits_per_value.clone(),
110            array_encoding,
111        }
112    }
113}
114
115impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
116    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
117        assert_eq!(data.len(), 1);
118        let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
119        let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
120            panic!("ValueDecompressor should output FixedWidth DataBlock")
121        };
122
123        let bytes_per_values = self
124            .bits_per_values
125            .iter()
126            .map(|bits_per_value| *bits_per_value as usize / 8)
127            .collect::<Vec<_>>();
128
129        assert!(encoded_data_block.bits_per_value % 8 == 0);
130        let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
131
132        // use a prefix_sum vector as a helper to reconstruct to `StructDataBlock`.
133        let mut prefix_sum = vec![0; self.bits_per_values.len()];
134        for i in 0..(self.bits_per_values.len() - 1) {
135            prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
136        }
137
138        let mut children_data_block = vec![];
139        for i in 0..self.bits_per_values.len() {
140            let child_buf_size = bytes_per_values[i] * num_values as usize;
141            let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
142
143            for j in 0..num_values as usize {
144                // the start of the data at this row is `j * encoded_bytes_per_row`, and the offset for this field is `prefix_sum[i]`, this field has length `bytes_per_values[i]`.
145                let this_value = encoded_data_block.data.slice_with_length(
146                    prefix_sum[i] + (j * encoded_bytes_per_row),
147                    bytes_per_values[i],
148                );
149
150                child_buf.extend_from_slice(&this_value);
151            }
152
153            let child = DataBlock::FixedWidth(FixedWidthDataBlock {
154                data: LanceBuffer::from(child_buf),
155                bits_per_value: self.bits_per_values[i],
156                num_values,
157                block_info: BlockInfo::default(),
158            });
159            children_data_block.push(child);
160        }
161        Ok(DataBlock::Struct(StructDataBlock {
162            children: children_data_block,
163            block_info: BlockInfo::default(),
164            validity: None,
165        }))
166    }
167}