Skip to main content

lance_encoding/encodings/physical/
byte_stream_split.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # Byte Stream Split (BSS) Miniblock Format
5//!
6//! Byte Stream Split is a data transformation technique that improves compression
7//! by reorganizing multi-byte values to group bytes from the same position together.
8//! This is particularly effective for data where some byte positions have low entropy.
9//!
10//! ## How It Works
11//!
12//! BSS splits multi-byte values by byte position, creating separate streams
13//! for each byte position across all values. This transformation is most beneficial
14//! when certain byte positions have low entropy (e.g., high-order bytes that are
15//! mostly zeros, sign-extended bytes, or floating-point sign/exponent bytes that
16//! cluster around common values).
17//!
18//! ### Example
19//!
20//! Input data (f32): `[1.0, 2.0, 3.0, 4.0]`
21//!
22//! In little-endian bytes:
23//! - 1.0 = `[00, 00, 80, 3F]`
24//! - 2.0 = `[00, 00, 00, 40]`
25//! - 3.0 = `[00, 00, 40, 40]`
26//! - 4.0 = `[00, 00, 80, 40]`
27//!
28//! After BSS transformation:
29//! - Byte stream 0: `[00, 00, 00, 00]` (all first bytes)
30//! - Byte stream 1: `[00, 00, 00, 00]` (all second bytes)
31//! - Byte stream 2: `[80, 00, 40, 80]` (all third bytes)
32//! - Byte stream 3: `[3F, 40, 40, 40]` (all fourth bytes)
33//!
34//! Output: `[00, 00, 00, 00, 00, 00, 00, 00, 80, 00, 40, 80, 3F, 40, 40, 40]`
35//!
36//! ## Compression Benefits
37//!
38//! BSS itself doesn't compress data - it reorders it. The compression benefit
39//! comes when BSS is combined with general-purpose compression (e.g., LZ4):
40//!
41//! 1. **Timestamps**: Sequential timestamps have similar high-order bytes
42//! 2. **Sensor data**: Readings often vary in a small range, sharing exponent bits
43//! 3. **Financial data**: Prices may cluster around certain values
44//!
45//! ## Supported Types
46//!
47//! - 32-bit floating point (f32)
48//! - 64-bit floating point (f64)
49//!
50//! ## Chunk Handling
51//!
52//! - Maximum chunk size depends on data type:
53//!   - f32: 1024 values (4KB per chunk)
54//!   - f64: 512 values (4KB per chunk)
55//! - All chunks share a single global buffer
56//! - Non-last chunks always contain power-of-2 values
57
58use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::ProtobufUtils21;
68use crate::format::pb21::CompressiveEncoding;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72
73/// Byte Stream Split encoder for floating point values
74///
75/// This encoding splits floating point values by byte position and stores
76/// each byte stream separately. This improves compression ratios for
77/// floating point data with similar patterns.
78#[derive(Debug, Clone)]
79pub struct ByteStreamSplitEncoder {
80    bits_per_value: usize,
81}
82
83impl ByteStreamSplitEncoder {
84    pub fn new(bits_per_value: usize) -> Self {
85        assert!(
86            bits_per_value == 32 || bits_per_value == 64,
87            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
88        );
89        Self { bits_per_value }
90    }
91
92    fn bytes_per_value(&self) -> usize {
93        self.bits_per_value / 8
94    }
95
96    fn max_chunk_size(&self) -> usize {
97        // For ByteStreamSplit, total bytes = bytes_per_value * chunk_size
98        // MAX_MINIBLOCK_BYTES = 8186
99        // For f32 (4 bytes): 8186 / 4 = 2046, so max chunk = 1024 (power of 2)
100        // For f64 (8 bytes): 8186 / 8 = 1023, so max chunk = 512 (power of 2)
101        match self.bits_per_value {
102            32 => 1024,
103            64 => 512,
104            _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
105        }
106    }
107}
108
109impl MiniBlockCompressor for ByteStreamSplitEncoder {
110    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
111        match page {
112            DataBlock::FixedWidth(data) => {
113                let num_values = data.num_values;
114                let bytes_per_value = self.bytes_per_value();
115
116                if num_values == 0 {
117                    return Ok((
118                        MiniBlockCompressed {
119                            data: vec![],
120                            chunks: vec![],
121                            num_values: 0,
122                        },
123                        ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
124                            self.bits_per_value as u64,
125                            None,
126                        )),
127                    ));
128                }
129
130                let total_size = num_values as usize * bytes_per_value;
131                let mut global_buffer = vec![0u8; total_size];
132
133                let mut chunks = Vec::new();
134                let data_slice = data.data.as_ref();
135                let mut processed_values = 0usize;
136                let max_chunk_size = self.max_chunk_size();
137
138                while processed_values < num_values as usize {
139                    let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
140                    let chunk_offset = processed_values * bytes_per_value;
141
142                    // Create chunk-local byte streams
143                    for i in 0..chunk_size {
144                        let src_offset = (processed_values + i) * bytes_per_value;
145                        for j in 0..bytes_per_value {
146                            // Store in chunk-local byte stream format
147                            let dst_offset = chunk_offset + j * chunk_size + i;
148                            global_buffer[dst_offset] = data_slice[src_offset + j];
149                        }
150                    }
151
152                    let chunk_bytes = chunk_size * bytes_per_value;
153                    let log_num_values = if processed_values + chunk_size == num_values as usize {
154                        0 // Last chunk
155                    } else {
156                        chunk_size.ilog2() as u8
157                    };
158
159                    debug_assert!(chunk_bytes > 0);
160                    chunks.push(MiniBlockChunk {
161                        buffer_sizes: vec![chunk_bytes as u32],
162                        log_num_values,
163                    });
164
165                    processed_values += chunk_size;
166                }
167
168                let data_buffers = vec![LanceBuffer::from(global_buffer)];
169
170                // TODO: Should support underlying compression
171                let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
172                    self.bits_per_value as u64,
173                    None,
174                ));
175
176                Ok((
177                    MiniBlockCompressed {
178                        data: data_buffers,
179                        chunks,
180                        num_values,
181                    },
182                    encoding,
183                ))
184            }
185            _ => Err(lance_core::Error::invalid_input_source(
186                "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
187            )),
188        }
189    }
190}
191
192/// Byte Stream Split decompressor
193#[derive(Debug)]
194pub struct ByteStreamSplitDecompressor {
195    bits_per_value: usize,
196}
197
198impl ByteStreamSplitDecompressor {
199    pub fn new(bits_per_value: usize) -> Self {
200        assert!(
201            bits_per_value == 32 || bits_per_value == 64,
202            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
203        );
204        Self { bits_per_value }
205    }
206
207    fn bytes_per_value(&self) -> usize {
208        self.bits_per_value / 8
209    }
210}
211
212impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
213    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
214        if num_values == 0 {
215            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
216                data: LanceBuffer::empty(),
217                bits_per_value: self.bits_per_value as u64,
218                num_values: 0,
219                block_info: BlockInfo::new(),
220            }));
221        }
222
223        let bytes_per_value = self.bytes_per_value();
224        let total_bytes = num_values as usize * bytes_per_value;
225
226        if data.len() != 1 {
227            return Err(lance_core::Error::invalid_input_source(
228                format!(
229                    "ByteStreamSplit decompression expects 1 buffer, but got {}",
230                    data.len()
231                )
232                .into(),
233            ));
234        }
235
236        let input_buffer = &data[0];
237
238        if input_buffer.len() != total_bytes {
239            return Err(lance_core::Error::invalid_input_source(
240                format!(
241                    "Expected {} bytes for decompression, but got {}",
242                    total_bytes,
243                    input_buffer.len()
244                )
245                .into(),
246            ));
247        }
248
249        let mut output = vec![0u8; total_bytes];
250
251        // Input buffer contains chunk-local byte streams
252        for i in 0..num_values as usize {
253            for j in 0..bytes_per_value {
254                let src_offset = j * num_values as usize + i;
255                output[i * bytes_per_value + j] = input_buffer[src_offset];
256            }
257        }
258
259        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
260            data: LanceBuffer::from(output),
261            bits_per_value: self.bits_per_value as u64,
262            num_values,
263            block_info: BlockInfo::new(),
264        }))
265    }
266}
267
268/// Determine if BSS should be used based on mode and data characteristics
269pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
270    // Only support 32-bit and 64-bit values
271    // BSS is most effective for these common types (floats, timestamps, etc.)
272    // 16-bit values have limited benefit with only 2 streams
273    if data.bits_per_value != 32 && data.bits_per_value != 64 {
274        return false;
275    }
276
277    let sensitivity = mode.to_sensitivity();
278
279    // Fast paths
280    if sensitivity <= 0.0 {
281        return false;
282    }
283    if sensitivity >= 1.0 {
284        return true;
285    }
286
287    // Auto mode: check byte position entropy
288    evaluate_entropy_for_bss(data, sensitivity)
289}
290
291/// Evaluate if BSS should be used based on byte position entropy
292fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
293    // Get the precomputed entropy statistics
294    let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
295        return false; // No entropy data available
296    };
297
298    let entropies = entropy_stat.as_primitive::<UInt64Type>();
299    if entropies.is_empty() {
300        return false;
301    }
302
303    // Calculate average entropy across all byte positions
304    let sum: u64 = entropies.values().iter().sum();
305    let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; // Scale back from integer
306
307    // Entropy threshold based on sensitivity
308    // sensitivity = 0.5 (default auto) -> threshold = 4.0 bits
309    // sensitivity = 0.0 (off) -> threshold = 0.0 (never use)
310    // sensitivity = 1.0 (on) -> threshold = 8.0 (always use)
311    let entropy_threshold = sensitivity as f64 * 8.0;
312
313    // Use BSS if average entropy is below threshold
314    // Lower entropy means more repetitive byte patterns
315    avg_entropy < entropy_threshold
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321
322    #[test]
323    fn test_round_trip_f32() {
324        let encoder = ByteStreamSplitEncoder::new(32);
325        let decompressor = ByteStreamSplitDecompressor::new(32);
326
327        // Test data
328        let values: Vec<f32> = vec![
329            1.0,
330            2.5,
331            -3.7,
332            4.2,
333            0.0,
334            -0.0,
335            f32::INFINITY,
336            f32::NEG_INFINITY,
337        ];
338        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
339
340        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
341            data: LanceBuffer::from(bytes),
342            bits_per_value: 32,
343            num_values: values.len() as u64,
344            block_info: BlockInfo::new(),
345        });
346
347        // Compress
348        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
349
350        // Decompress
351        let decompressed = decompressor
352            .decompress(compressed.data, values.len() as u64)
353            .unwrap();
354        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
355            panic!("Expected FixedWidth DataBlock")
356        };
357
358        // Verify
359        let result_bytes = decompressed_fixed.data.as_ref();
360        let result_values: Vec<f32> = result_bytes
361            .chunks_exact(4)
362            .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
363            .collect();
364
365        assert_eq!(values, result_values);
366    }
367
368    #[test]
369    fn test_round_trip_f64() {
370        let encoder = ByteStreamSplitEncoder::new(64);
371        let decompressor = ByteStreamSplitDecompressor::new(64);
372
373        // Test data
374        let values: Vec<f64> = vec![
375            1.0,
376            2.5,
377            -3.7,
378            4.2,
379            0.0,
380            -0.0,
381            f64::INFINITY,
382            f64::NEG_INFINITY,
383        ];
384        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
385
386        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
387            data: LanceBuffer::from(bytes),
388            bits_per_value: 64,
389            num_values: values.len() as u64,
390            block_info: BlockInfo::new(),
391        });
392
393        // Compress
394        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
395
396        // Decompress
397        let decompressed = decompressor
398            .decompress(compressed.data, values.len() as u64)
399            .unwrap();
400        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
401            panic!("Expected FixedWidth DataBlock")
402        };
403
404        // Verify
405        let result_bytes = decompressed_fixed.data.as_ref();
406        let result_values: Vec<f64> = result_bytes
407            .chunks_exact(8)
408            .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
409            .collect();
410
411        assert_eq!(values, result_values);
412    }
413
414    #[test]
415    fn test_empty_data() {
416        let encoder = ByteStreamSplitEncoder::new(32);
417        let decompressor = ByteStreamSplitDecompressor::new(32);
418
419        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
420            data: LanceBuffer::empty(),
421            bits_per_value: 32,
422            num_values: 0,
423            block_info: BlockInfo::new(),
424        });
425
426        // Compress empty data
427        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
428
429        // Decompress empty data
430        let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
431        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
432            panic!("Expected FixedWidth DataBlock")
433        };
434
435        assert_eq!(decompressed_fixed.num_values, 0);
436        assert_eq!(decompressed_fixed.data.len(), 0);
437    }
438}