lance_encoding/encodings/physical/
byte_stream_split.rs1use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::pb21::CompressiveEncoding;
68use crate::format::ProtobufUtils21;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72use snafu::location;
73
74#[derive(Debug, Clone)]
80pub struct ByteStreamSplitEncoder {
81 bits_per_value: usize,
82}
83
84impl ByteStreamSplitEncoder {
85 pub fn new(bits_per_value: usize) -> Self {
86 assert!(
87 bits_per_value == 32 || bits_per_value == 64,
88 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
89 );
90 Self { bits_per_value }
91 }
92
93 fn bytes_per_value(&self) -> usize {
94 self.bits_per_value / 8
95 }
96
97 fn max_chunk_size(&self) -> usize {
98 match self.bits_per_value {
103 32 => 1024,
104 64 => 512,
105 _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
106 }
107 }
108}
109
110impl MiniBlockCompressor for ByteStreamSplitEncoder {
111 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
112 match page {
113 DataBlock::FixedWidth(data) => {
114 let num_values = data.num_values;
115 let bytes_per_value = self.bytes_per_value();
116
117 if num_values == 0 {
118 return Ok((
119 MiniBlockCompressed {
120 data: vec![],
121 chunks: vec![],
122 num_values: 0,
123 },
124 ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
125 self.bits_per_value as u64,
126 None,
127 )),
128 ));
129 }
130
131 let total_size = num_values as usize * bytes_per_value;
132 let mut global_buffer = vec![0u8; total_size];
133
134 let mut chunks = Vec::new();
135 let data_slice = data.data.as_ref();
136 let mut processed_values = 0usize;
137 let max_chunk_size = self.max_chunk_size();
138
139 while processed_values < num_values as usize {
140 let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
141 let chunk_offset = processed_values * bytes_per_value;
142
143 for i in 0..chunk_size {
145 let src_offset = (processed_values + i) * bytes_per_value;
146 for j in 0..bytes_per_value {
147 let dst_offset = chunk_offset + j * chunk_size + i;
149 global_buffer[dst_offset] = data_slice[src_offset + j];
150 }
151 }
152
153 let chunk_bytes = chunk_size * bytes_per_value;
154 let log_num_values = if processed_values + chunk_size == num_values as usize {
155 0 } else {
157 chunk_size.ilog2() as u8
158 };
159
160 chunks.push(MiniBlockChunk {
161 buffer_sizes: vec![chunk_bytes as u16],
162 log_num_values,
163 });
164
165 processed_values += chunk_size;
166 }
167
168 let data_buffers = vec![LanceBuffer::from(global_buffer)];
169
170 let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
172 self.bits_per_value as u64,
173 None,
174 ));
175
176 Ok((
177 MiniBlockCompressed {
178 data: data_buffers,
179 chunks,
180 num_values,
181 },
182 encoding,
183 ))
184 }
185 _ => Err(lance_core::Error::InvalidInput {
186 source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
187 location: location!(),
188 }),
189 }
190 }
191}
192
193#[derive(Debug)]
195pub struct ByteStreamSplitDecompressor {
196 bits_per_value: usize,
197}
198
199impl ByteStreamSplitDecompressor {
200 pub fn new(bits_per_value: usize) -> Self {
201 assert!(
202 bits_per_value == 32 || bits_per_value == 64,
203 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
204 );
205 Self { bits_per_value }
206 }
207
208 fn bytes_per_value(&self) -> usize {
209 self.bits_per_value / 8
210 }
211}
212
213impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
214 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
215 if num_values == 0 {
216 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
217 data: LanceBuffer::empty(),
218 bits_per_value: self.bits_per_value as u64,
219 num_values: 0,
220 block_info: BlockInfo::new(),
221 }));
222 }
223
224 let bytes_per_value = self.bytes_per_value();
225 let total_bytes = num_values as usize * bytes_per_value;
226
227 if data.len() != 1 {
228 return Err(lance_core::Error::InvalidInput {
229 source: format!(
230 "ByteStreamSplit decompression expects 1 buffer, but got {}",
231 data.len()
232 )
233 .into(),
234 location: location!(),
235 });
236 }
237
238 let input_buffer = &data[0];
239
240 if input_buffer.len() != total_bytes {
241 return Err(lance_core::Error::InvalidInput {
242 source: format!(
243 "Expected {} bytes for decompression, but got {}",
244 total_bytes,
245 input_buffer.len()
246 )
247 .into(),
248 location: location!(),
249 });
250 }
251
252 let mut output = vec![0u8; total_bytes];
253
254 for i in 0..num_values as usize {
256 for j in 0..bytes_per_value {
257 let src_offset = j * num_values as usize + i;
258 output[i * bytes_per_value + j] = input_buffer[src_offset];
259 }
260 }
261
262 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
263 data: LanceBuffer::from(output),
264 bits_per_value: self.bits_per_value as u64,
265 num_values,
266 block_info: BlockInfo::new(),
267 }))
268 }
269}
270
271pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
273 if data.bits_per_value != 32 && data.bits_per_value != 64 {
277 return false;
278 }
279
280 let sensitivity = mode.to_sensitivity();
281
282 if sensitivity <= 0.0 {
284 return false;
285 }
286 if sensitivity >= 1.0 {
287 return true;
288 }
289
290 evaluate_entropy_for_bss(data, sensitivity)
292}
293
294fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
296 let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
298 return false; };
300
301 let entropies = entropy_stat.as_primitive::<UInt64Type>();
302 if entropies.is_empty() {
303 return false;
304 }
305
306 let sum: u64 = entropies.values().iter().sum();
308 let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; let entropy_threshold = sensitivity as f64 * 8.0;
315
316 avg_entropy < entropy_threshold
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn test_round_trip_f32() {
327 let encoder = ByteStreamSplitEncoder::new(32);
328 let decompressor = ByteStreamSplitDecompressor::new(32);
329
330 let values: Vec<f32> = vec![
332 1.0,
333 2.5,
334 -3.7,
335 4.2,
336 0.0,
337 -0.0,
338 f32::INFINITY,
339 f32::NEG_INFINITY,
340 ];
341 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
342
343 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
344 data: LanceBuffer::from(bytes),
345 bits_per_value: 32,
346 num_values: values.len() as u64,
347 block_info: BlockInfo::new(),
348 });
349
350 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
352
353 let decompressed = decompressor
355 .decompress(compressed.data, values.len() as u64)
356 .unwrap();
357 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
358 panic!("Expected FixedWidth DataBlock")
359 };
360
361 let result_bytes = decompressed_fixed.data.as_ref();
363 let result_values: Vec<f32> = result_bytes
364 .chunks_exact(4)
365 .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
366 .collect();
367
368 assert_eq!(values, result_values);
369 }
370
371 #[test]
372 fn test_round_trip_f64() {
373 let encoder = ByteStreamSplitEncoder::new(64);
374 let decompressor = ByteStreamSplitDecompressor::new(64);
375
376 let values: Vec<f64> = vec![
378 1.0,
379 2.5,
380 -3.7,
381 4.2,
382 0.0,
383 -0.0,
384 f64::INFINITY,
385 f64::NEG_INFINITY,
386 ];
387 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
388
389 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
390 data: LanceBuffer::from(bytes),
391 bits_per_value: 64,
392 num_values: values.len() as u64,
393 block_info: BlockInfo::new(),
394 });
395
396 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
398
399 let decompressed = decompressor
401 .decompress(compressed.data, values.len() as u64)
402 .unwrap();
403 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
404 panic!("Expected FixedWidth DataBlock")
405 };
406
407 let result_bytes = decompressed_fixed.data.as_ref();
409 let result_values: Vec<f64> = result_bytes
410 .chunks_exact(8)
411 .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
412 .collect();
413
414 assert_eq!(values, result_values);
415 }
416
417 #[test]
418 fn test_empty_data() {
419 let encoder = ByteStreamSplitEncoder::new(32);
420 let decompressor = ByteStreamSplitDecompressor::new(32);
421
422 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
423 data: LanceBuffer::empty(),
424 bits_per_value: 32,
425 num_values: 0,
426 block_info: BlockInfo::new(),
427 });
428
429 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
431
432 let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
434 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
435 panic!("Expected FixedWidth DataBlock")
436 };
437
438 assert_eq!(decompressed_fixed.num_values, 0);
439 assert_eq!(decompressed_fixed.data.len(), 0);
440 }
441}