lance_encoding/encodings/physical/
packed.rs1use arrow::datatypes::UInt64Type;
13
14use lance_core::{Error, Result};
15use snafu::location;
16
17use crate::{
18 buffer::LanceBuffer,
19 compression::MiniBlockDecompressor,
20 data::{BlockInfo, DataBlock, FixedWidthDataBlock, StructDataBlock},
21 encodings::logical::primitive::miniblock::{MiniBlockCompressed, MiniBlockCompressor},
22 format::{
23 pb::{self},
24 ProtobufUtils,
25 },
26 statistics::{GetStat, Stat},
27};
28
29use super::value::{ValueDecompressor, ValueEncoder};
30
31fn struct_data_block_to_fixed_width_data_block(
35 struct_data_block: StructDataBlock,
36 bits_per_values: &[u32],
37) -> DataBlock {
38 let data_size = struct_data_block.expect_single_stat::<UInt64Type>(Stat::DataSize);
39 let mut output = Vec::with_capacity(data_size as usize);
40 let num_values = struct_data_block.children[0].num_values();
41
42 for i in 0..num_values as usize {
43 for (j, child) in struct_data_block.children.iter().enumerate() {
44 let bytes_per_value = (bits_per_values[j] / 8) as usize;
45 let this_data = child
46 .as_fixed_width_ref()
47 .unwrap()
48 .data
49 .slice_with_length(bytes_per_value * i, bytes_per_value);
50 output.extend_from_slice(&this_data);
51 }
52 }
53
54 DataBlock::FixedWidth(FixedWidthDataBlock {
55 bits_per_value: bits_per_values
56 .iter()
57 .map(|bits_per_value| *bits_per_value as u64)
58 .sum(),
59 data: LanceBuffer::Owned(output),
60 num_values,
61 block_info: BlockInfo::default(),
62 })
63}
64
65#[derive(Debug, Default)]
66pub struct PackedStructFixedWidthMiniBlockEncoder {}
67
68impl MiniBlockCompressor for PackedStructFixedWidthMiniBlockEncoder {
69 fn compress(
70 &self,
71 data: DataBlock,
72 ) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
73 match data {
74 DataBlock::Struct(struct_data_block) => {
75 let bits_per_values = struct_data_block.children.iter().map(|data_block| data_block.as_fixed_width_ref().unwrap().bits_per_value as u32).collect::<Vec<_>>();
76
77 let data_block = struct_data_block_to_fixed_width_data_block(struct_data_block, &bits_per_values);
79
80 let value_miniblock_compressor = Box::new(ValueEncoder::default()) as Box<dyn MiniBlockCompressor>;
82 let (value_miniblock_compressed, value_array_encoding) =
83 value_miniblock_compressor.compress(data_block)?;
84
85 Ok((
86 value_miniblock_compressed,
87 ProtobufUtils::packed_struct_fixed_width_mini_block(value_array_encoding, bits_per_values),
88 ))
89 }
90 _ => Err(Error::InvalidInput {
91 source: format!(
92 "Cannot compress a data block of type {} with PackedStructFixedWidthBlockEncoder",
93 data.name()
94 )
95 .into(),
96 location: location!(),
97 }),
98 }
99 }
100}
101
102#[derive(Debug)]
103pub struct PackedStructFixedWidthMiniBlockDecompressor {
104 bits_per_values: Vec<u32>,
105 array_encoding: Box<dyn MiniBlockDecompressor>,
106}
107
108impl PackedStructFixedWidthMiniBlockDecompressor {
109 pub fn new(description: &pb::PackedStructFixedWidthMiniBlock) -> Self {
110 let array_encoding: Box<dyn MiniBlockDecompressor> = match description
111 .flat
112 .as_ref()
113 .unwrap()
114 .array_encoding
115 .as_ref()
116 .unwrap()
117 {
118 pb::array_encoding::ArrayEncoding::Flat(flat) => Box::new(ValueDecompressor::from_flat(flat)),
119 _ => panic!("Currently only `ArrayEncoding::Flat` is supported in packed struct encoding in Lance 2.1."),
120 };
121 Self {
122 bits_per_values: description.bits_per_values.clone(),
123 array_encoding,
124 }
125 }
126}
127
128impl MiniBlockDecompressor for PackedStructFixedWidthMiniBlockDecompressor {
129 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
130 assert_eq!(data.len(), 1);
131 let encoded_data_block = self.array_encoding.decompress(data, num_values)?;
132 let DataBlock::FixedWidth(encoded_data_block) = encoded_data_block else {
133 panic!("ValueDecompressor should output FixedWidth DataBlock")
134 };
135
136 let bytes_per_values = self
137 .bits_per_values
138 .iter()
139 .map(|bits_per_value| *bits_per_value as usize / 8)
140 .collect::<Vec<_>>();
141
142 assert!(encoded_data_block.bits_per_value % 8 == 0);
143 let encoded_bytes_per_row = (encoded_data_block.bits_per_value / 8) as usize;
144
145 let mut prefix_sum = vec![0; self.bits_per_values.len()];
147 for i in 0..(self.bits_per_values.len() - 1) {
148 prefix_sum[i + 1] = prefix_sum[i] + bytes_per_values[i];
149 }
150
151 let mut children_data_block = vec![];
152 for i in 0..self.bits_per_values.len() {
153 let child_buf_size = bytes_per_values[i] * num_values as usize;
154 let mut child_buf: Vec<u8> = Vec::with_capacity(child_buf_size);
155
156 for j in 0..num_values as usize {
157 let this_value = encoded_data_block.data.slice_with_length(
159 prefix_sum[i] + (j * encoded_bytes_per_row),
160 bytes_per_values[i],
161 );
162
163 child_buf.extend_from_slice(&this_value);
164 }
165
166 let child = DataBlock::FixedWidth(FixedWidthDataBlock {
167 data: LanceBuffer::Owned(child_buf),
168 bits_per_value: self.bits_per_values[i] as u64,
169 num_values,
170 block_info: BlockInfo::default(),
171 });
172 children_data_block.push(child);
173 }
174 Ok(DataBlock::Struct(StructDataBlock {
175 children: children_data_block,
176 block_info: BlockInfo::default(),
177 }))
178 }
179}