lance_encoding/encodings/physical/
byte_stream_split.rs1use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::ProtobufUtils21;
68use crate::format::pb21::CompressiveEncoding;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72
73#[derive(Debug, Clone)]
79pub struct ByteStreamSplitEncoder {
80 bits_per_value: usize,
81}
82
83impl ByteStreamSplitEncoder {
84 pub fn new(bits_per_value: usize) -> Self {
85 assert!(
86 bits_per_value == 32 || bits_per_value == 64,
87 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
88 );
89 Self { bits_per_value }
90 }
91
92 fn bytes_per_value(&self) -> usize {
93 self.bits_per_value / 8
94 }
95
96 fn max_chunk_size(&self) -> usize {
97 match self.bits_per_value {
102 32 => 1024,
103 64 => 512,
104 _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
105 }
106 }
107}
108
109impl MiniBlockCompressor for ByteStreamSplitEncoder {
110 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
111 match page {
112 DataBlock::FixedWidth(data) => {
113 let num_values = data.num_values;
114 let bytes_per_value = self.bytes_per_value();
115
116 if num_values == 0 {
117 return Ok((
118 MiniBlockCompressed {
119 data: vec![],
120 chunks: vec![],
121 num_values: 0,
122 },
123 ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
124 self.bits_per_value as u64,
125 None,
126 )),
127 ));
128 }
129
130 let total_size = num_values as usize * bytes_per_value;
131 let mut global_buffer = vec![0u8; total_size];
132
133 let mut chunks = Vec::new();
134 let data_slice = data.data.as_ref();
135 let mut processed_values = 0usize;
136 let max_chunk_size = self.max_chunk_size();
137
138 while processed_values < num_values as usize {
139 let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
140 let chunk_offset = processed_values * bytes_per_value;
141
142 for i in 0..chunk_size {
144 let src_offset = (processed_values + i) * bytes_per_value;
145 for j in 0..bytes_per_value {
146 let dst_offset = chunk_offset + j * chunk_size + i;
148 global_buffer[dst_offset] = data_slice[src_offset + j];
149 }
150 }
151
152 let chunk_bytes = chunk_size * bytes_per_value;
153 let log_num_values = if processed_values + chunk_size == num_values as usize {
154 0 } else {
156 chunk_size.ilog2() as u8
157 };
158
159 debug_assert!(chunk_bytes > 0);
160 chunks.push(MiniBlockChunk {
161 buffer_sizes: vec![chunk_bytes as u32],
162 log_num_values,
163 });
164
165 processed_values += chunk_size;
166 }
167
168 let data_buffers = vec![LanceBuffer::from(global_buffer)];
169
170 let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
172 self.bits_per_value as u64,
173 None,
174 ));
175
176 Ok((
177 MiniBlockCompressed {
178 data: data_buffers,
179 chunks,
180 num_values,
181 },
182 encoding,
183 ))
184 }
185 _ => Err(lance_core::Error::invalid_input_source(
186 "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
187 )),
188 }
189 }
190}
191
192#[derive(Debug)]
194pub struct ByteStreamSplitDecompressor {
195 bits_per_value: usize,
196}
197
198impl ByteStreamSplitDecompressor {
199 pub fn new(bits_per_value: usize) -> Self {
200 assert!(
201 bits_per_value == 32 || bits_per_value == 64,
202 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
203 );
204 Self { bits_per_value }
205 }
206
207 fn bytes_per_value(&self) -> usize {
208 self.bits_per_value / 8
209 }
210}
211
212impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
213 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
214 if num_values == 0 {
215 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
216 data: LanceBuffer::empty(),
217 bits_per_value: self.bits_per_value as u64,
218 num_values: 0,
219 block_info: BlockInfo::new(),
220 }));
221 }
222
223 let bytes_per_value = self.bytes_per_value();
224 let total_bytes = num_values as usize * bytes_per_value;
225
226 if data.len() != 1 {
227 return Err(lance_core::Error::invalid_input_source(
228 format!(
229 "ByteStreamSplit decompression expects 1 buffer, but got {}",
230 data.len()
231 )
232 .into(),
233 ));
234 }
235
236 let input_buffer = &data[0];
237
238 if input_buffer.len() != total_bytes {
239 return Err(lance_core::Error::invalid_input_source(
240 format!(
241 "Expected {} bytes for decompression, but got {}",
242 total_bytes,
243 input_buffer.len()
244 )
245 .into(),
246 ));
247 }
248
249 let mut output = vec![0u8; total_bytes];
250
251 for i in 0..num_values as usize {
253 for j in 0..bytes_per_value {
254 let src_offset = j * num_values as usize + i;
255 output[i * bytes_per_value + j] = input_buffer[src_offset];
256 }
257 }
258
259 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
260 data: LanceBuffer::from(output),
261 bits_per_value: self.bits_per_value as u64,
262 num_values,
263 block_info: BlockInfo::new(),
264 }))
265 }
266}
267
268pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
270 if data.bits_per_value != 32 && data.bits_per_value != 64 {
274 return false;
275 }
276
277 let sensitivity = mode.to_sensitivity();
278
279 if sensitivity <= 0.0 {
281 return false;
282 }
283 if sensitivity >= 1.0 {
284 return true;
285 }
286
287 evaluate_entropy_for_bss(data, sensitivity)
289}
290
291fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
293 let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
295 return false; };
297
298 let entropies = entropy_stat.as_primitive::<UInt64Type>();
299 if entropies.is_empty() {
300 return false;
301 }
302
303 let sum: u64 = entropies.values().iter().sum();
305 let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; let entropy_threshold = sensitivity as f64 * 8.0;
312
313 avg_entropy < entropy_threshold
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321
322 #[test]
323 fn test_round_trip_f32() {
324 let encoder = ByteStreamSplitEncoder::new(32);
325 let decompressor = ByteStreamSplitDecompressor::new(32);
326
327 let values: Vec<f32> = vec![
329 1.0,
330 2.5,
331 -3.7,
332 4.2,
333 0.0,
334 -0.0,
335 f32::INFINITY,
336 f32::NEG_INFINITY,
337 ];
338 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
339
340 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
341 data: LanceBuffer::from(bytes),
342 bits_per_value: 32,
343 num_values: values.len() as u64,
344 block_info: BlockInfo::new(),
345 });
346
347 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
349
350 let decompressed = decompressor
352 .decompress(compressed.data, values.len() as u64)
353 .unwrap();
354 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
355 panic!("Expected FixedWidth DataBlock")
356 };
357
358 let result_bytes = decompressed_fixed.data.as_ref();
360 let result_values: Vec<f32> = result_bytes
361 .chunks_exact(4)
362 .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
363 .collect();
364
365 assert_eq!(values, result_values);
366 }
367
368 #[test]
369 fn test_round_trip_f64() {
370 let encoder = ByteStreamSplitEncoder::new(64);
371 let decompressor = ByteStreamSplitDecompressor::new(64);
372
373 let values: Vec<f64> = vec![
375 1.0,
376 2.5,
377 -3.7,
378 4.2,
379 0.0,
380 -0.0,
381 f64::INFINITY,
382 f64::NEG_INFINITY,
383 ];
384 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
385
386 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
387 data: LanceBuffer::from(bytes),
388 bits_per_value: 64,
389 num_values: values.len() as u64,
390 block_info: BlockInfo::new(),
391 });
392
393 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
395
396 let decompressed = decompressor
398 .decompress(compressed.data, values.len() as u64)
399 .unwrap();
400 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
401 panic!("Expected FixedWidth DataBlock")
402 };
403
404 let result_bytes = decompressed_fixed.data.as_ref();
406 let result_values: Vec<f64> = result_bytes
407 .chunks_exact(8)
408 .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
409 .collect();
410
411 assert_eq!(values, result_values);
412 }
413
414 #[test]
415 fn test_empty_data() {
416 let encoder = ByteStreamSplitEncoder::new(32);
417 let decompressor = ByteStreamSplitDecompressor::new(32);
418
419 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
420 data: LanceBuffer::empty(),
421 bits_per_value: 32,
422 num_values: 0,
423 block_info: BlockInfo::new(),
424 });
425
426 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
428
429 let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
431 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
432 panic!("Expected FixedWidth DataBlock")
433 };
434
435 assert_eq!(decompressed_fixed.num_values, 0);
436 assert_eq!(decompressed_fixed.data.len(), 0);
437 }
438}