lance_encoding/encodings/physical/
byte_stream_split.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # Byte Stream Split (BSS) Miniblock Format
5//!
6//! Byte Stream Split is a data transformation technique that improves compression
7//! by reorganizing multi-byte values to group bytes from the same position together.
8//! This is particularly effective for data where some byte positions have low entropy.
9//!
10//! ## How It Works
11//!
12//! BSS splits multi-byte values by byte position, creating separate streams
13//! for each byte position across all values. This transformation is most beneficial
14//! when certain byte positions have low entropy (e.g., high-order bytes that are
15//! mostly zeros, sign-extended bytes, or floating-point sign/exponent bytes that
16//! cluster around common values).
17//!
18//! ### Example
19//!
20//! Input data (f32): `[1.0, 2.0, 3.0, 4.0]`
21//!
22//! In little-endian bytes:
23//! - 1.0 = `[00, 00, 80, 3F]`
24//! - 2.0 = `[00, 00, 00, 40]`
25//! - 3.0 = `[00, 00, 40, 40]`
26//! - 4.0 = `[00, 00, 80, 40]`
27//!
28//! After BSS transformation:
29//! - Byte stream 0: `[00, 00, 00, 00]` (all first bytes)
30//! - Byte stream 1: `[00, 00, 00, 00]` (all second bytes)
31//! - Byte stream 2: `[80, 00, 40, 80]` (all third bytes)
32//! - Byte stream 3: `[3F, 40, 40, 40]` (all fourth bytes)
33//!
34//! Output: `[00, 00, 00, 00, 00, 00, 00, 00, 80, 00, 40, 80, 3F, 40, 40, 40]`
35//!
36//! ## Compression Benefits
37//!
38//! BSS itself doesn't compress data - it reorders it. The compression benefit
39//! comes when BSS is combined with general-purpose compression (e.g., LZ4):
40//!
41//! 1. **Timestamps**: Sequential timestamps have similar high-order bytes
42//! 2. **Sensor data**: Readings often vary in a small range, sharing exponent bits
43//! 3. **Financial data**: Prices may cluster around certain values
44//!
45//! ## Supported Types
46//!
47//! - 32-bit floating point (f32)
48//! - 64-bit floating point (f64)
49//!
50//! ## Chunk Handling
51//!
52//! - Maximum chunk size depends on data type:
53//!   - f32: 1024 values (4KB per chunk)
54//!   - f64: 512 values (4KB per chunk)
55//! - All chunks share a single global buffer
56//! - Non-last chunks always contain power-of-2 values
57
58use std::fmt::Debug;
59
60use crate::buffer::LanceBuffer;
61use crate::compression::MiniBlockDecompressor;
62use crate::compression_config::BssMode;
63use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
64use crate::encodings::logical::primitive::miniblock::{
65    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
66};
67use crate::format::pb21::CompressiveEncoding;
68use crate::format::ProtobufUtils21;
69use crate::statistics::{GetStat, Stat};
70use arrow_array::{cast::AsArray, types::UInt64Type};
71use lance_core::Result;
72use snafu::location;
73
74/// Byte Stream Split encoder for floating point values
75///
76/// This encoding splits floating point values by byte position and stores
77/// each byte stream separately. This improves compression ratios for
78/// floating point data with similar patterns.
79#[derive(Debug, Clone)]
80pub struct ByteStreamSplitEncoder {
81    bits_per_value: usize,
82}
83
84impl ByteStreamSplitEncoder {
85    pub fn new(bits_per_value: usize) -> Self {
86        assert!(
87            bits_per_value == 32 || bits_per_value == 64,
88            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
89        );
90        Self { bits_per_value }
91    }
92
93    fn bytes_per_value(&self) -> usize {
94        self.bits_per_value / 8
95    }
96
97    fn max_chunk_size(&self) -> usize {
98        // For ByteStreamSplit, total bytes = bytes_per_value * chunk_size
99        // MAX_MINIBLOCK_BYTES = 8186
100        // For f32 (4 bytes): 8186 / 4 = 2046, so max chunk = 1024 (power of 2)
101        // For f64 (8 bytes): 8186 / 8 = 1023, so max chunk = 512 (power of 2)
102        match self.bits_per_value {
103            32 => 1024,
104            64 => 512,
105            _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
106        }
107    }
108}
109
110impl MiniBlockCompressor for ByteStreamSplitEncoder {
111    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, CompressiveEncoding)> {
112        match page {
113            DataBlock::FixedWidth(data) => {
114                let num_values = data.num_values;
115                let bytes_per_value = self.bytes_per_value();
116
117                if num_values == 0 {
118                    return Ok((
119                        MiniBlockCompressed {
120                            data: vec![],
121                            chunks: vec![],
122                            num_values: 0,
123                        },
124                        ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
125                            self.bits_per_value as u64,
126                            None,
127                        )),
128                    ));
129                }
130
131                let total_size = num_values as usize * bytes_per_value;
132                let mut global_buffer = vec![0u8; total_size];
133
134                let mut chunks = Vec::new();
135                let data_slice = data.data.as_ref();
136                let mut processed_values = 0usize;
137                let max_chunk_size = self.max_chunk_size();
138
139                while processed_values < num_values as usize {
140                    let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
141                    let chunk_offset = processed_values * bytes_per_value;
142
143                    // Create chunk-local byte streams
144                    for i in 0..chunk_size {
145                        let src_offset = (processed_values + i) * bytes_per_value;
146                        for j in 0..bytes_per_value {
147                            // Store in chunk-local byte stream format
148                            let dst_offset = chunk_offset + j * chunk_size + i;
149                            global_buffer[dst_offset] = data_slice[src_offset + j];
150                        }
151                    }
152
153                    let chunk_bytes = chunk_size * bytes_per_value;
154                    let log_num_values = if processed_values + chunk_size == num_values as usize {
155                        0 // Last chunk
156                    } else {
157                        chunk_size.ilog2() as u8
158                    };
159
160                    chunks.push(MiniBlockChunk {
161                        buffer_sizes: vec![chunk_bytes as u16],
162                        log_num_values,
163                    });
164
165                    processed_values += chunk_size;
166                }
167
168                let data_buffers = vec![LanceBuffer::from(global_buffer)];
169
170                // TODO: Should support underlying compression
171                let encoding = ProtobufUtils21::byte_stream_split(ProtobufUtils21::flat(
172                    self.bits_per_value as u64,
173                    None,
174                ));
175
176                Ok((
177                    MiniBlockCompressed {
178                        data: data_buffers,
179                        chunks,
180                        num_values,
181                    },
182                    encoding,
183                ))
184            }
185            _ => Err(lance_core::Error::InvalidInput {
186                source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
187                location: location!(),
188            }),
189        }
190    }
191}
192
193/// Byte Stream Split decompressor
194#[derive(Debug)]
195pub struct ByteStreamSplitDecompressor {
196    bits_per_value: usize,
197}
198
199impl ByteStreamSplitDecompressor {
200    pub fn new(bits_per_value: usize) -> Self {
201        assert!(
202            bits_per_value == 32 || bits_per_value == 64,
203            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
204        );
205        Self { bits_per_value }
206    }
207
208    fn bytes_per_value(&self) -> usize {
209        self.bits_per_value / 8
210    }
211}
212
213impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
214    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
215        if num_values == 0 {
216            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
217                data: LanceBuffer::empty(),
218                bits_per_value: self.bits_per_value as u64,
219                num_values: 0,
220                block_info: BlockInfo::new(),
221            }));
222        }
223
224        let bytes_per_value = self.bytes_per_value();
225        let total_bytes = num_values as usize * bytes_per_value;
226
227        if data.len() != 1 {
228            return Err(lance_core::Error::InvalidInput {
229                source: format!(
230                    "ByteStreamSplit decompression expects 1 buffer, but got {}",
231                    data.len()
232                )
233                .into(),
234                location: location!(),
235            });
236        }
237
238        let input_buffer = &data[0];
239
240        if input_buffer.len() != total_bytes {
241            return Err(lance_core::Error::InvalidInput {
242                source: format!(
243                    "Expected {} bytes for decompression, but got {}",
244                    total_bytes,
245                    input_buffer.len()
246                )
247                .into(),
248                location: location!(),
249            });
250        }
251
252        let mut output = vec![0u8; total_bytes];
253
254        // Input buffer contains chunk-local byte streams
255        for i in 0..num_values as usize {
256            for j in 0..bytes_per_value {
257                let src_offset = j * num_values as usize + i;
258                output[i * bytes_per_value + j] = input_buffer[src_offset];
259            }
260        }
261
262        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
263            data: LanceBuffer::from(output),
264            bits_per_value: self.bits_per_value as u64,
265            num_values,
266            block_info: BlockInfo::new(),
267        }))
268    }
269}
270
271/// Determine if BSS should be used based on mode and data characteristics
272pub fn should_use_bss(data: &FixedWidthDataBlock, mode: BssMode) -> bool {
273    // Only support 32-bit and 64-bit values
274    // BSS is most effective for these common types (floats, timestamps, etc.)
275    // 16-bit values have limited benefit with only 2 streams
276    if data.bits_per_value != 32 && data.bits_per_value != 64 {
277        return false;
278    }
279
280    let sensitivity = mode.to_sensitivity();
281
282    // Fast paths
283    if sensitivity <= 0.0 {
284        return false;
285    }
286    if sensitivity >= 1.0 {
287        return true;
288    }
289
290    // Auto mode: check byte position entropy
291    evaluate_entropy_for_bss(data, sensitivity)
292}
293
294/// Evaluate if BSS should be used based on byte position entropy
295fn evaluate_entropy_for_bss(data: &FixedWidthDataBlock, sensitivity: f32) -> bool {
296    // Get the precomputed entropy statistics
297    let Some(entropy_stat) = data.get_stat(Stat::BytePositionEntropy) else {
298        return false; // No entropy data available
299    };
300
301    let entropies = entropy_stat.as_primitive::<UInt64Type>();
302    if entropies.is_empty() {
303        return false;
304    }
305
306    // Calculate average entropy across all byte positions
307    let sum: u64 = entropies.values().iter().sum();
308    let avg_entropy = sum as f64 / entropies.len() as f64 / 1000.0; // Scale back from integer
309
310    // Entropy threshold based on sensitivity
311    // sensitivity = 0.5 (default auto) -> threshold = 4.0 bits
312    // sensitivity = 0.0 (off) -> threshold = 0.0 (never use)
313    // sensitivity = 1.0 (on) -> threshold = 8.0 (always use)
314    let entropy_threshold = sensitivity as f64 * 8.0;
315
316    // Use BSS if average entropy is below threshold
317    // Lower entropy means more repetitive byte patterns
318    avg_entropy < entropy_threshold
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn test_round_trip_f32() {
327        let encoder = ByteStreamSplitEncoder::new(32);
328        let decompressor = ByteStreamSplitDecompressor::new(32);
329
330        // Test data
331        let values: Vec<f32> = vec![
332            1.0,
333            2.5,
334            -3.7,
335            4.2,
336            0.0,
337            -0.0,
338            f32::INFINITY,
339            f32::NEG_INFINITY,
340        ];
341        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
342
343        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
344            data: LanceBuffer::from(bytes),
345            bits_per_value: 32,
346            num_values: values.len() as u64,
347            block_info: BlockInfo::new(),
348        });
349
350        // Compress
351        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
352
353        // Decompress
354        let decompressed = decompressor
355            .decompress(compressed.data, values.len() as u64)
356            .unwrap();
357        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
358            panic!("Expected FixedWidth DataBlock")
359        };
360
361        // Verify
362        let result_bytes = decompressed_fixed.data.as_ref();
363        let result_values: Vec<f32> = result_bytes
364            .chunks_exact(4)
365            .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
366            .collect();
367
368        assert_eq!(values, result_values);
369    }
370
371    #[test]
372    fn test_round_trip_f64() {
373        let encoder = ByteStreamSplitEncoder::new(64);
374        let decompressor = ByteStreamSplitDecompressor::new(64);
375
376        // Test data
377        let values: Vec<f64> = vec![
378            1.0,
379            2.5,
380            -3.7,
381            4.2,
382            0.0,
383            -0.0,
384            f64::INFINITY,
385            f64::NEG_INFINITY,
386        ];
387        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
388
389        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
390            data: LanceBuffer::from(bytes),
391            bits_per_value: 64,
392            num_values: values.len() as u64,
393            block_info: BlockInfo::new(),
394        });
395
396        // Compress
397        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
398
399        // Decompress
400        let decompressed = decompressor
401            .decompress(compressed.data, values.len() as u64)
402            .unwrap();
403        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
404            panic!("Expected FixedWidth DataBlock")
405        };
406
407        // Verify
408        let result_bytes = decompressed_fixed.data.as_ref();
409        let result_values: Vec<f64> = result_bytes
410            .chunks_exact(8)
411            .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
412            .collect();
413
414        assert_eq!(values, result_values);
415    }
416
417    #[test]
418    fn test_empty_data() {
419        let encoder = ByteStreamSplitEncoder::new(32);
420        let decompressor = ByteStreamSplitDecompressor::new(32);
421
422        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
423            data: LanceBuffer::empty(),
424            bits_per_value: 32,
425            num_values: 0,
426            block_info: BlockInfo::new(),
427        });
428
429        // Compress empty data
430        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
431
432        // Decompress empty data
433        let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
434        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
435            panic!("Expected FixedWidth DataBlock")
436        };
437
438        assert_eq!(decompressed_fixed.num_values, 0);
439        assert_eq!(decompressed_fixed.data.len(), 0);
440    }
441}