lance_encoding/encodings/physical/
byte_stream_split.rs1use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65 MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::pb21::CompressiveEncoding;
68use crate::format::ProtobufUtils21;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72use snafu::location;
73
74#[derive(Debug, Clone)]
80pub struct ByteStreamSplitEncoder {
81 bits_per_value: usize,
82}
83
84impl ByteStreamSplitEncoder {
85 pub fn new(bits_per_value: usize) -> Self {
86 assert!(
87 bits_per_value == 32 || bits_per_value == 64,
88 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
89 );
90 Self { bits_per_value }
91 }
92
93 fn bytes_per_value(&self) -> usize {
94 self.bits_per_value / 8
95 }
96
97 fn max_chunk_size(&self) -> usize {
98 match self.bits_per_value {
103 32 => 1024,
104 64 => 512,
105 _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
106 }
107 }
108}
109
110impl MiniBlockCompressor for ByteStreamSplitEncoder {
111 fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
112 match page {
113 DataBlock::FixedWidth(data) => {
114 let num_values = data.num_values;
115 let bytes_per_value = self.bytes_per_value();
116
117 if num_values == 0 {
118 return Ok((
119 MiniBlockCompressed {
120 data: vec![],
121 chunks: vec![],
122 num_values: 0,
123 },
124 ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
125 self.bits_per_value as u64,
126 None,
127 )),
128 ));
129 }
130
131 let total_size = num_values as usize * bytes_per_value;
132 let mut global_buffer = vec![0u8; total_size];
133
134 let mut chunks = Vec::new();
135 let data_slice = data.data.as_ref();
136 let mut processed_values = 0usize;
137 let max_chunk_size = self.max_chunk_size();
138
139 while processed_values < num_values as usize {
140 let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
141 let chunk_offset = processed_values * bytes_per_value;
142
143 for i in 0..chunk_size {
145 let src_offset = (processed_values + i) * bytes_per_value;
146 for j in 0..bytes_per_value {
147 let dst_offset = chunk_offset + j * chunk_size + i;
149 global_buffer[dst_offset] = data_slice[src_offset + j];
150 }
151 }
152
153 let chunk_bytes = chunk_size * bytes_per_value;
154 let log_num_values = if processed_values + chunk_size == num_values as usize {
155 0 } else {
157 chunk_size.ilog2() as u8
158 };
159
160 debug_assert!(chunk_bytes > 0);
161 chunks.push(MiniBlockChunk {
162 buffer_sizes: vec![chunk_bytes as u16],
163 log_num_values,
164 });
165
166 processed_values += chunk_size;
167 }
168
169 let data_buffers = vec![LanceBuffer::from(global_buffer)];
170
171 let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
173 self.bits_per_value as u64,
174 None,
175 ));
176
177 Ok((
178 MiniBlockCompressed {
179 data: data_buffers,
180 chunks,
181 num_values,
182 },
183 encoding,
184 ))
185 }
186 _ => Err(lance_core::Error::InvalidInput {
187 source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
188 location: location!(),
189 }),
190 }
191 }
192}
193
194#[derive(Debug)]
196pub struct ByteStreamSplitDecompressor {
197 bits_per_value: usize,
198}
199
200impl ByteStreamSplitDecompressor {
201 pub fn new(bits_per_value: usize) -> Self {
202 assert!(
203 bits_per_value == 32 || bits_per_value == 64,
204 "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
205 );
206 Self { bits_per_value }
207 }
208
209 fn bytes_per_value(&self) -> usize {
210 self.bits_per_value / 8
211 }
212}
213
214impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
215 fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
216 if num_values == 0 {
217 return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
218 data: LanceBuffer::empty(),
219 bits_per_value: self.bits_per_value as u64,
220 num_values: 0,
221 block_info: BlockInfo::new(),
222 }));
223 }
224
225 let bytes_per_value = self.bytes_per_value();
226 let total_bytes = num_values as usize * bytes_per_value;
227
228 if data.len() != 1 {
229 return Err(lance_core::Error::InvalidInput {
230 source: format!(
231 "ByteStreamSplit decompression expects 1 buffer, but got {}",
232 data.len()
233 )
234 .into(),
235 location: location!(),
236 });
237 }
238
239 let input_buffer = &data[0];
240
241 if input_buffer.len() != total_bytes {
242 return Err(lance_core::Error::InvalidInput {
243 source: format!(
244 "Expected {} bytes for decompression, but got {}",
245 total_bytes,
246 input_buffer.len()
247 )
248 .into(),
249 location: location!(),
250 });
251 }
252
253 let mut output = vec![0u8; total_bytes];
254
255 for i in 0..num_values as usize {
257 for j in 0..bytes_per_value {
258 let src_offset = j * num_values as usize + i;
259 output[i * bytes_per_value + j] = input_buffer[src_offset];
260 }
261 }
262
263 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
264 data: LanceBuffer::from(output),
265 bits_per_value: self.bits_per_value as u64,
266 num_values,
267 block_info: BlockInfo::new(),
268 }))
269 }
270}
271
272pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
274 if data.bits_per_value != 32 && data.bits_per_value != 64 {
278 return false;
279 }
280
281 let sensitivity = mode.to_sensitivity();
282
283 if sensitivity <= 0.0 {
285 return false;
286 }
287 if sensitivity >= 1.0 {
288 return true;
289 }
290
291 evaluate_entropy_for_bss(data, sensitivity)
293}
294
295fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
297 let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
299 return false; };
301
302 let entropies = entropy_stat.as_primitive::<UInt64Type>();
303 if entropies.is_empty() {
304 return false;
305 }
306
307 let sum: u64 = entropies.values().iter().sum();
309 let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; let entropy_threshold = sensitivity as f64 * 8.0;
316
317 avg_entropy < entropy_threshold
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
327 fn test_round_trip_f32() {
328 let encoder = ByteStreamSplitEncoder::new(32);
329 let decompressor = ByteStreamSplitDecompressor::new(32);
330
331 let values: Vec<f32> = vec![
333 1.0,
334 2.5,
335 -3.7,
336 4.2,
337 0.0,
338 -0.0,
339 f32::INFINITY,
340 f32::NEG_INFINITY,
341 ];
342 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
343
344 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
345 data: LanceBuffer::from(bytes),
346 bits_per_value: 32,
347 num_values: values.len() as u64,
348 block_info: BlockInfo::new(),
349 });
350
351 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
353
354 let decompressed = decompressor
356 .decompress(compressed.data, values.len() as u64)
357 .unwrap();
358 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
359 panic!("Expected FixedWidth DataBlock")
360 };
361
362 let result_bytes = decompressed_fixed.data.as_ref();
364 let result_values: Vec<f32> = result_bytes
365 .chunks_exact(4)
366 .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
367 .collect();
368
369 assert_eq!(values, result_values);
370 }
371
372 #[test]
373 fn test_round_trip_f64() {
374 let encoder = ByteStreamSplitEncoder::new(64);
375 let decompressor = ByteStreamSplitDecompressor::new(64);
376
377 let values: Vec<f64> = vec![
379 1.0,
380 2.5,
381 -3.7,
382 4.2,
383 0.0,
384 -0.0,
385 f64::INFINITY,
386 f64::NEG_INFINITY,
387 ];
388 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
389
390 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
391 data: LanceBuffer::from(bytes),
392 bits_per_value: 64,
393 num_values: values.len() as u64,
394 block_info: BlockInfo::new(),
395 });
396
397 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
399
400 let decompressed = decompressor
402 .decompress(compressed.data, values.len() as u64)
403 .unwrap();
404 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
405 panic!("Expected FixedWidth DataBlock")
406 };
407
408 let result_bytes = decompressed_fixed.data.as_ref();
410 let result_values: Vec<f64> = result_bytes
411 .chunks_exact(8)
412 .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
413 .collect();
414
415 assert_eq!(values, result_values);
416 }
417
418 #[test]
419 fn test_empty_data() {
420 let encoder = ByteStreamSplitEncoder::new(32);
421 let decompressor = ByteStreamSplitDecompressor::new(32);
422
423 let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
424 data: LanceBuffer::empty(),
425 bits_per_value: 32,
426 num_values: 0,
427 block_info: BlockInfo::new(),
428 });
429
430 let (compressed, _encoding) = encoder.compress(data_block).unwrap();
432
433 let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
435 let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
436 panic!("Expected FixedWidth DataBlock")
437 };
438
439 assert_eq!(decompressed_fixed.num_values, 0);
440 assert_eq!(decompressed_fixed.data.len(), 0);
441 }
442}