lance_encoding/encodings/physical/
byte_stream_split.rs1use std::fmt::Debug;
58
59use crate::buffer::LanceBuffer;
60use crate::compression::MiniBlockDecompressor;
61use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
62use crate::encodings::logical::primitive::miniblock::{
63 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
64};
65use crate::format::pb;
66use crate::format::ProtobufUtils;
67use lance_core::Result;
68use snafu::location;
69
70#[derive(Debug, Clone)]
76pub struct ByteStreamSplitEncoder {
77 bits_per_value: usize,
78}
79
80impl ByteStreamSplitEncoder {
81 pub fn new(bits_per_value: usize) -> Self {
82 assert!(
83 bits_per_value == 32 || bits_per_value == 64,
84 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
85 );
86 Self { bits_per_value }
87 }
88
89 fn bytes_per_value(&self) -> usize {
90 self.bits_per_value / 8
91 }
92
93 fn max_chunk_size(&self) -> usize {
94 match self.bits_per_value {
99 32 => 1024,
100 64 => 512,
101 _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
102 }
103 }
104}
105
106impl MiniBlockCompressor for ByteStreamSplitEncoder {
107 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
108 match page {
109 DataBlock::FixedWidth(data) => {
110 let num_values = data.num_values;
111 let bytes_per_value = self.bytes_per_value();
112
113 if num_values == 0 {
114 return Ok((
115 MiniBlockCompressed {
116 data: vec![],
117 chunks: vec![],
118 num_values: 0,
119 },
120 ProtobufUtils::byte_stream_split(self.bits_per_value as u64),
121 ));
122 }
123
124 let total_size = num_values as usize * bytes_per_value;
125 let mut global_buffer = vec![0u8; total_size];
126
127 let mut chunks = Vec::new();
128 let data_slice = data.data.as_ref();
129 let mut processed_values = 0usize;
130 let max_chunk_size = self.max_chunk_size();
131
132 while processed_values < num_values as usize {
133 let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
134 let chunk_offset = processed_values * bytes_per_value;
135
136 for i in 0..chunk_size {
138 let src_offset = (processed_values + i) * bytes_per_value;
139 for j in 0..bytes_per_value {
140 let dst_offset = chunk_offset + j * chunk_size + i;
142 global_buffer[dst_offset] = data_slice[src_offset + j];
143 }
144 }
145
146 let chunk_bytes = chunk_size * bytes_per_value;
147 let log_num_values = if processed_values + chunk_size == num_values as usize {
148 0 } else {
150 chunk_size.ilog2() as u8
151 };
152
153 chunks.push(MiniBlockChunk {
154 buffer_sizes: vec![chunk_bytes as u16],
155 log_num_values,
156 });
157
158 processed_values += chunk_size;
159 }
160
161 let data_buffers = vec![LanceBuffer::from(global_buffer)];
162
163 let encoding = ProtobufUtils::byte_stream_split(self.bits_per_value as u64);
164
165 Ok((
166 MiniBlockCompressed {
167 data: data_buffers,
168 chunks,
169 num_values,
170 },
171 encoding,
172 ))
173 }
174 _ => Err(lance_core::Error::InvalidInput {
175 source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
176 location: location!(),
177 }),
178 }
179 }
180}
181
182#[derive(Debug)]
184pub struct ByteStreamSplitDecompressor {
185 bits_per_value: usize,
186}
187
188impl ByteStreamSplitDecompressor {
189 pub fn new(bits_per_value: usize) -> Self {
190 assert!(
191 bits_per_value == 32 || bits_per_value == 64,
192 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
193 );
194 Self { bits_per_value }
195 }
196
197 fn bytes_per_value(&self) -> usize {
198 self.bits_per_value / 8
199 }
200}
201
202impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
203 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
204 if num_values == 0 {
205 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
206 data: LanceBuffer::empty(),
207 bits_per_value: self.bits_per_value as u64,
208 num_values: 0,
209 block_info: BlockInfo::new(),
210 }));
211 }
212
213 let bytes_per_value = self.bytes_per_value();
214 let total_bytes = num_values as usize * bytes_per_value;
215
216 if data.len() != 1 {
217 return Err(lance_core::Error::InvalidInput {
218 source: format!(
219 "ByteStreamSplit decompression expects 1 buffer, but got {}",
220 data.len()
221 )
222 .into(),
223 location: location!(),
224 });
225 }
226
227 let input_buffer = &data[0];
228
229 if input_buffer.len() != total_bytes {
230 return Err(lance_core::Error::InvalidInput {
231 source: format!(
232 "Expected {} bytes for decompression, but got {}",
233 total_bytes,
234 input_buffer.len()
235 )
236 .into(),
237 location: location!(),
238 });
239 }
240
241 let mut output = vec![0u8; total_bytes];
242
243 for i in 0..num_values as usize {
245 for j in 0..bytes_per_value {
246 let src_offset = j * num_values as usize + i;
247 output[i * bytes_per_value + j] = input_buffer[src_offset];
248 }
249 }
250
251 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
252 data: LanceBuffer::from(output),
253 bits_per_value: self.bits_per_value as u64,
254 num_values,
255 block_info: BlockInfo::new(),
256 }))
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263
264 #[test]
265 fn test_round_trip_f32() {
266 let encoder = ByteStreamSplitEncoder::new(32);
267 let decompressor = ByteStreamSplitDecompressor::new(32);
268
269 let values: Vec<f32> = vec![
271 1.0,
272 2.5,
273 -3.7,
274 4.2,
275 0.0,
276 -0.0,
277 f32::INFINITY,
278 f32::NEG_INFINITY,
279 ];
280 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
281
282 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
283 data: LanceBuffer::from(bytes),
284 bits_per_value: 32,
285 num_values: values.len() as u64,
286 block_info: BlockInfo::new(),
287 });
288
289 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
291
292 let decompressed = decompressor
294 .decompress(compressed.data, values.len() as u64)
295 .unwrap();
296 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
297 panic!("Expected FixedWidth DataBlock")
298 };
299
300 let result_bytes = decompressed_fixed.data.as_ref();
302 let result_values: Vec<f32> = result_bytes
303 .chunks_exact(4)
304 .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
305 .collect();
306
307 assert_eq!(values, result_values);
308 }
309
310 #[test]
311 fn test_round_trip_f64() {
312 let encoder = ByteStreamSplitEncoder::new(64);
313 let decompressor = ByteStreamSplitDecompressor::new(64);
314
315 let values: Vec<f64> = vec![
317 1.0,
318 2.5,
319 -3.7,
320 4.2,
321 0.0,
322 -0.0,
323 f64::INFINITY,
324 f64::NEG_INFINITY,
325 ];
326 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
327
328 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
329 data: LanceBuffer::from(bytes),
330 bits_per_value: 64,
331 num_values: values.len() as u64,
332 block_info: BlockInfo::new(),
333 });
334
335 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
337
338 let decompressed = decompressor
340 .decompress(compressed.data, values.len() as u64)
341 .unwrap();
342 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
343 panic!("Expected FixedWidth DataBlock")
344 };
345
346 let result_bytes = decompressed_fixed.data.as_ref();
348 let result_values: Vec<f64> = result_bytes
349 .chunks_exact(8)
350 .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
351 .collect();
352
353 assert_eq!(values, result_values);
354 }
355
356 #[test]
357 fn test_empty_data() {
358 let encoder = ByteStreamSplitEncoder::new(32);
359 let decompressor = ByteStreamSplitDecompressor::new(32);
360
361 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
362 data: LanceBuffer::empty(),
363 bits_per_value: 32,
364 num_values: 0,
365 block_info: BlockInfo::new(),
366 });
367
368 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
370
371 let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
373 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
374 panic!("Expected FixedWidth DataBlock")
375 };
376
377 assert_eq!(decompressed_fixed.num_values, 0);
378 assert_eq!(decompressed_fixed.data.len(), 0);
379 }
380}