lance_encoding/encodings/physical/
packed.rs1use arrow_array::types::UInt64Type;
13
14use lance_core::{Error, Result};
15use snafu::location;
16
17use crate::{
18 buffer::LanceBuffer,
19 compression::MiniBlockDecompressor,
20 data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
21 encodings::logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
22 format::{
23 pb21::{compressive_encoding::Compression, CompressiveEncoding, PackedStruct},
24 ProtobufUtils21,
25 },
26 statistics::{GetStat, Stat},
27};
28
29use super::value::{ValueDecompressor, ValueEncoder};
30
31fn struct_data_block_to_fixed_width_data_block(
35 struct_data_block: StructDataBlock,
36 bits_per_values: &[u64],
37) -> DataBlock {
38 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
39 let mut output = Vec::with_capacity(data_size as usize);
40 let num_values = struct_data_block.children[0].num_values();
41
42 for i in 0..num_values as usize {
43 for (j, child) in struct_data_block.children.iter().enumerate() {
44 let bytes_per_value = (bits_per_values[j] / 8) as usize;
45 let this_data = child
46 .as_fixed_width_ref()
47 .unwrap()
48 .data
49 .slice_with_length(bytes_per_value * i, bytes_per_value);
50 output.extend_from_slice(&this_data);
51 }
52 }
53
54 DataBlock::FixedWidth(FixedWidthDataBlock {
55 bits_per_value: bits_per_values.iter().copied().sum(),
56 data: LanceBuffer::from(output),
57 num_values,
58 block_info: BlockInfo::default(),
59 })
60}
61
62#[derive(Debug, Default)]
63pub struct PackedStructFixedWidthMiniBlockEncoder {}
64
65impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
66 fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
67 match data {
68 DataBlock::Struct(struct_data_block) => {
69 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value).collect::<Vec<_>>();
70
71 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
73
74 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
76 let (value_miniblock_compressed, value_array_encoding) =
77 value_miniblock_compressor.compress(data_block)?;
78
79 Ok((
80 value_miniblock_compressed,
81 ProtobufUtils21::packed_struct(value_array_encoding, bits_per_values),
82 ))
83 }
84 _ => Err(Error::InvalidInput {
85 source: format!(
86 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
87 data.name()
88 )
89 .into(),
90 location: location!(),
91 }),
92 }
93 }
94}
95
96#[derive(Debug)]
97pub struct PackedStructFixedWidthMiniBlockDecompressor {
98 bits_per_values: Vec<u64>,
99 array_encoding: Box<dyn MiniBlockDecompressor>,
100}
101
102impl PackedStructFixedWidthMiniBlockDecompressor {
103 pub fn new(description: &PackedStruct) -> Self {
104 let array_encoding: Box<dyn MiniBlockDecompressor> = match description.values.as_ref().unwrap().compression.as_ref().unwrap() {
105 Compression::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
106 _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
107 };
108 Self {
109 bits_per_values: description.bits_per_value.clone(),
110 array_encoding,
111 }
112 }
113}
114
115impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
116 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
117 assert_eq!(data.len(), 1);
118 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
119 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
120 panic!("ValueDecompressor should output FixedWidth DataBlock")
121 };
122
123 let bytes_per_values = self
124 .bits_per_values
125 .iter()
126 .map(|bits_per_value| *bits_per_value as usize / 8)
127 .collect::<Vec<_>>();
128
129 assert!(encoded_data_block.bits_per_value % 8 == 0);
130 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
131
132 let mut prefix_sum = vec![0; self.bits_per_values.len()];
134 for i in 0..(self.bits_per_values.len() - 1) {
135 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
136 }
137
138 let mut children_data_block = vec![];
139 for i in 0..self.bits_per_values.len() {
140 let child_buf_size = bytes_per_values[i] * num_values as usize;
141 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
142
143 for j in 0..num_values as usize {
144 let this_value = encoded_data_block.data.slice_with_length(
146 prefix_sum[i] + (j * encoded_bytes_per_row),
147 bytes_per_values[i],
148 );
149
150 child_buf.extend_from_slice(&this_value);
151 }
152
153 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
154 data: LanceBuffer::from(child_buf),
155 bits_per_value: self.bits_per_values[i],
156 num_values,
157 block_info: BlockInfo::default(),
158 });
159 children_data_block.push(child);
160 }
161 Ok(DataBlock::Struct(StructDataBlock {
162 children: children_data_block,
163 block_info: BlockInfo::default(),
164 validity: None,
165 }))
166 }
167}