lance_encoding/encodings/physical/
byte_stream_split.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # Byte Stream Split (BSS) Miniblock Format
5//!
6//! Byte Stream Split is a data transformation technique that improves compression
7//! by reorganizing multi-byte values to group bytes from the same position together.
8//! This is particularly effective for data where some byte positions have low entropy.
9//!
10//! ## How It Works
11//!
12//! BSS splits multi-byte values by byte position, creating separate streams
13//! for each byte position across all values. This transformation is most beneficial
14//! when certain byte positions have low entropy (e.g., high-order bytes that are
15//! mostly zeros, sign-extended bytes, or floating-point sign/exponent bytes that
16//! cluster around common values).
17//!
18//! ### Example
19//!
20//! Input data (f32): `[1.0, 2.0, 3.0, 4.0]`
21//!
22//! In little-endian bytes:
23//! - 1.0 = `[00, 00, 80, 3F]`
24//! - 2.0 = `[00, 00, 00, 40]`
25//! - 3.0 = `[00, 00, 40, 40]`
26//! - 4.0 = `[00, 00, 80, 40]`
27//!
28//! After BSS transformation:
29//! - Byte stream 0: `[00, 00, 00, 00]` (all first bytes)
30//! - Byte stream 1: `[00, 00, 00, 00]` (all second bytes)
31//! - Byte stream 2: `[80, 00, 40, 80]` (all third bytes)
32//! - Byte stream 3: `[3F, 40, 40, 40]` (all fourth bytes)
33//!
34//! Output: `[00, 00, 00, 00, 00, 00, 00, 00, 80, 00, 40, 80, 3F, 40, 40, 40]`
35//!
36//! ## Compression Benefits
37//!
38//! BSS itself doesn't compress data - it reorders it. The compression benefit
39//! comes when BSS is combined with general-purpose compression (e.g., LZ4):
40//!
41//! 1. **Timestamps**: Sequential timestamps have similar high-order bytes
42//! 2. **Sensor data**: Readings often vary in a small range, sharing exponent bits
43//! 3. **Financial data**: Prices may cluster around certain values
44//!
45//! ## Supported Types
46//!
47//! - 32-bit floating point (f32)
48//! - 64-bit floating point (f64)
49//!
50//! ## Chunk Handling
51//!
52//! - Maximum chunk size depends on data type:
53//!   - f32: 1024 values (4KB per chunk)
54//!   - f64: 512 values (4KB per chunk)
55//! - All chunks share a single global buffer
56//! - Non-last chunks always contain power-of-2 values
57
58use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::pb21::CompressiveEncoding;
68use crate::format::ProtobufUtils21;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72use snafu::location;
73
74/// Byte Stream Split encoder for floating point values
75///
76/// This encoding splits floating point values by byte position and stores
77/// each byte stream separately. This improves compression ratios for
78/// floating point data with similar patterns.
79#[derive(Debug, Clone)]
80pub struct ByteStreamSplitEncoder {
81    bits_per_value: usize,
82}
83
84impl ByteStreamSplitEncoder {
85    pub fn new(bits_per_value: usize) -> Self {
86        assert!(
87            bits_per_value == 32 || bits_per_value == 64,
88            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
89        );
90        Self { bits_per_value }
91    }
92
93    fn bytes_per_value(&self) -> usize {
94        self.bits_per_value / 8
95    }
96
97    fn max_chunk_size(&self) -> usize {
98        // For ByteStreamSplit, total bytes = bytes_per_value * chunk_size
99        // MAX_MINIBLOCK_BYTES = 8186
100        // For f32 (4 bytes): 8186 / 4 = 2046, so max chunk = 1024 (power of 2)
101        // For f64 (8 bytes): 8186 / 8 = 1023, so max chunk = 512 (power of 2)
102        match self.bits_per_value {
103            32 => 1024,
104            64 => 512,
105            _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
106        }
107    }
108}
109
110impl MiniBlockCompressor for ByteStreamSplitEncoder {
111    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
112        match page {
113            DataBlock::FixedWidth(data) => {
114                let num_values = data.num_values;
115                let bytes_per_value = self.bytes_per_value();
116
117                if num_values == 0 {
118                    return Ok((
119                        MiniBlockCompressed {
120                            data: vec![],
121                            chunks: vec![],
122                            num_values: 0,
123                        },
124                        ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
125                            self.bits_per_value as u64,
126                            None,
127                        )),
128                    ));
129                }
130
131                let total_size = num_values as usize * bytes_per_value;
132                let mut global_buffer = vec![0u8; total_size];
133
134                let mut chunks = Vec::new();
135                let data_slice = data.data.as_ref();
136                let mut processed_values = 0usize;
137                let max_chunk_size = self.max_chunk_size();
138
139                while processed_values < num_values as usize {
140                    let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
141                    let chunk_offset = processed_values * bytes_per_value;
142
143                    // Create chunk-local byte streams
144                    for i in 0..chunk_size {
145                        let src_offset = (processed_values + i) * bytes_per_value;
146                        for j in 0..bytes_per_value {
147                            // Store in chunk-local byte stream format
148                            let dst_offset = chunk_offset + j * chunk_size + i;
149                            global_buffer[dst_offset] = data_slice[src_offset + j];
150                        }
151                    }
152
153                    let chunk_bytes = chunk_size * bytes_per_value;
154                    let log_num_values = if processed_values + chunk_size == num_values as usize {
155                        0 // Last chunk
156                    } else {
157                        chunk_size.ilog2() as u8
158                    };
159
160                    debug_assert!(chunk_bytes > 0);
161                    chunks.push(MiniBlockChunk {
162                        buffer_sizes: vec![chunk_bytes as u16],
163                        log_num_values,
164                    });
165
166                    processed_values += chunk_size;
167                }
168
169                let data_buffers = vec![LanceBuffer::from(global_buffer)];
170
171                // TODO: Should support underlying compression
172                let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
173                    self.bits_per_value as u64,
174                    None,
175                ));
176
177                Ok((
178                    MiniBlockCompressed {
179                        data: data_buffers,
180                        chunks,
181                        num_values,
182                    },
183                    encoding,
184                ))
185            }
186            _ => Err(lance_core::Error::InvalidInput {
187                source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
188                location: location!(),
189            }),
190        }
191    }
192}
193
194/// Byte Stream Split decompressor
195#[derive(Debug)]
196pub struct ByteStreamSplitDecompressor {
197    bits_per_value: usize,
198}
199
200impl ByteStreamSplitDecompressor {
201    pub fn new(bits_per_value: usize) -> Self {
202        assert!(
203            bits_per_value == 32 || bits_per_value == 64,
204            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
205        );
206        Self { bits_per_value }
207    }
208
209    fn bytes_per_value(&self) -> usize {
210        self.bits_per_value / 8
211    }
212}
213
214impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
215    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
216        if num_values == 0 {
217            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
218                data: LanceBuffer::empty(),
219                bits_per_value: self.bits_per_value as u64,
220                num_values: 0,
221                block_info: BlockInfo::new(),
222            }));
223        }
224
225        let bytes_per_value = self.bytes_per_value();
226        let total_bytes = num_values as usize * bytes_per_value;
227
228        if data.len() != 1 {
229            return Err(lance_core::Error::InvalidInput {
230                source: format!(
231                    "ByteStreamSplit decompression expects 1 buffer, but got {}",
232                    data.len()
233                )
234                .into(),
235                location: location!(),
236            });
237        }
238
239        let input_buffer = &data[0];
240
241        if input_buffer.len() != total_bytes {
242            return Err(lance_core::Error::InvalidInput {
243                source: format!(
244                    "Expected {} bytes for decompression, but got {}",
245                    total_bytes,
246                    input_buffer.len()
247                )
248                .into(),
249                location: location!(),
250            });
251        }
252
253        let mut output = vec![0u8; total_bytes];
254
255        // Input buffer contains chunk-local byte streams
256        for i in 0..num_values as usize {
257            for j in 0..bytes_per_value {
258                let src_offset = j * num_values as usize + i;
259                output[i * bytes_per_value + j] = input_buffer[src_offset];
260            }
261        }
262
263        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
264            data: LanceBuffer::from(output),
265            bits_per_value: self.bits_per_value as u64,
266            num_values,
267            block_info: BlockInfo::new(),
268        }))
269    }
270}
271
272/// Determine if BSS should be used based on mode and data characteristics
273pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
274    // Only support 32-bit and 64-bit values
275    // BSS is most effective for these common types (floats, timestamps, etc.)
276    // 16-bit values have limited benefit with only 2 streams
277    if data.bits_per_value != 32 && data.bits_per_value != 64 {
278        return false;
279    }
280
281    let sensitivity = mode.to_sensitivity();
282
283    // Fast paths
284    if sensitivity <= 0.0 {
285        return false;
286    }
287    if sensitivity >= 1.0 {
288        return true;
289    }
290
291    // Auto mode: check byte position entropy
292    evaluate_entropy_for_bss(data, sensitivity)
293}
294
295/// Evaluate if BSS should be used based on byte position entropy
296fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
297    // Get the precomputed entropy statistics
298    let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
299        return false; // No entropy data available
300    };
301
302    let entropies = entropy_stat.as_primitive::<UInt64Type>();
303    if entropies.is_empty() {
304        return false;
305    }
306
307    // Calculate average entropy across all byte positions
308    let sum: u64 = entropies.values().iter().sum();
309    let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; // Scale back from integer
310
311    // Entropy threshold based on sensitivity
312    // sensitivity = 0.5 (default auto) -> threshold = 4.0 bits
313    // sensitivity = 0.0 (off) -> threshold = 0.0 (never use)
314    // sensitivity = 1.0 (on) -> threshold = 8.0 (always use)
315    let entropy_threshold = sensitivity as f64 * 8.0;
316
317    // Use BSS if average entropy is below threshold
318    // Lower entropy means more repetitive byte patterns
319    avg_entropy < entropy_threshold
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn test_round_trip_f32() {
328        let encoder = ByteStreamSplitEncoder::new(32);
329        let decompressor = ByteStreamSplitDecompressor::new(32);
330
331        // Test data
332        let values: Vec<f32> = vec![
333            1.0,
334            2.5,
335            -3.7,
336            4.2,
337            0.0,
338            -0.0,
339            f32::INFINITY,
340            f32::NEG_INFINITY,
341        ];
342        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
343
344        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
345            data: LanceBuffer::from(bytes),
346            bits_per_value: 32,
347            num_values: values.len() as u64,
348            block_info: BlockInfo::new(),
349        });
350
351        // Compress
352        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
353
354        // Decompress
355        let decompressed = decompressor
356            .decompress(compressed.data, values.len() as u64)
357            .unwrap();
358        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
359            panic!("Expected FixedWidth DataBlock")
360        };
361
362        // Verify
363        let result_bytes = decompressed_fixed.data.as_ref();
364        let result_values: Vec<f32> = result_bytes
365            .chunks_exact(4)
366            .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
367            .collect();
368
369        assert_eq!(values, result_values);
370    }
371
372    #[test]
373    fn test_round_trip_f64() {
374        let encoder = ByteStreamSplitEncoder::new(64);
375        let decompressor = ByteStreamSplitDecompressor::new(64);
376
377        // Test data
378        let values: Vec<f64> = vec![
379            1.0,
380            2.5,
381            -3.7,
382            4.2,
383            0.0,
384            -0.0,
385            f64::INFINITY,
386            f64::NEG_INFINITY,
387        ];
388        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
389
390        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
391            data: LanceBuffer::from(bytes),
392            bits_per_value: 64,
393            num_values: values.len() as u64,
394            block_info: BlockInfo::new(),
395        });
396
397        // Compress
398        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
399
400        // Decompress
401        let decompressed = decompressor
402            .decompress(compressed.data, values.len() as u64)
403            .unwrap();
404        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
405            panic!("Expected FixedWidth DataBlock")
406        };
407
408        // Verify
409        let result_bytes = decompressed_fixed.data.as_ref();
410        let result_values: Vec<f64> = result_bytes
411            .chunks_exact(8)
412            .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
413            .collect();
414
415        assert_eq!(values, result_values);
416    }
417
418    #[test]
419    fn test_empty_data() {
420        let encoder = ByteStreamSplitEncoder::new(32);
421        let decompressor = ByteStreamSplitDecompressor::new(32);
422
423        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
424            data: LanceBuffer::empty(),
425            bits_per_value: 32,
426            num_values: 0,
427            block_info: BlockInfo::new(),
428        });
429
430        // Compress empty data
431        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
432
433        // Decompress empty data
434        let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
435        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
436            panic!("Expected FixedWidth DataBlock")
437        };
438
439        assert_eq!(decompressed_fixed.num_values, 0);
440        assert_eq!(decompressed_fixed.data.len(), 0);
441    }
442}