lance_encoding/encodings/physical/
byte_stream_split.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! # Byte Stream Split (BSS) Miniblock Format
5//!
6//! Byte Stream Split is a data transformation technique optimized for floating-point
7//! data compression. It improves compression ratios by reorganizing data to group
8//! similar byte patterns together.
9//!
10//! ## How It Works
11//!
12//! BSS splits floating-point values by byte position, creating separate streams
13//! for each byte position across all values. This transformation exploits the
14//! fact that floating-point data often has patterns in specific byte positions
15//! (e.g., similar exponents or mantissa patterns).
16//!
17//! ### Example
18//!
19//! Input data (f32): `[1.0, 2.0, 3.0, 4.0]`
20//!
21//! In little-endian bytes:
22//! - 1.0 = `[00, 00, 80, 3F]`
23//! - 2.0 = `[00, 00, 00, 40]`
24//! - 3.0 = `[00, 00, 40, 40]`
25//! - 4.0 = `[00, 00, 80, 40]`
26//!
27//! After BSS transformation:
28//! - Byte stream 0: `[00, 00, 00, 00]` (all first bytes)
29//! - Byte stream 1: `[00, 00, 00, 00]` (all second bytes)
30//! - Byte stream 2: `[80, 00, 40, 80]` (all third bytes)
31//! - Byte stream 3: `[3F, 40, 40, 40]` (all fourth bytes)
32//!
33//! Output: `[00, 00, 00, 00, 00, 00, 00, 00, 80, 00, 40, 80, 3F, 40, 40, 40]`
34//!
35//! ## Compression Benefits
36//!
37//! BSS itself doesn't compress data - it reorders it. The compression benefit
38//! comes when BSS is combined with general-purpose compression (e.g., LZ4):
39//!
40//! 1. **Timestamps**: Sequential timestamps have similar high-order bytes
41//! 2. **Sensor data**: Readings often vary in a small range, sharing exponent bits
42//! 3. **Financial data**: Prices may cluster around certain values
43//!
44//! ## Supported Types
45//!
46//! - 32-bit floating point (f32)
47//! - 64-bit floating point (f64)
48//!
49//! ## Chunk Handling
50//!
51//! - Maximum chunk size depends on data type:
52//!   - f32: 1024 values (4KB per chunk)
53//!   - f64: 512 values (4KB per chunk)
54//! - All chunks share a single global buffer
55//! - Non-last chunks always contain power-of-2 values
56
57use std::fmt::Debug;
58
59use crate::buffer::LanceBuffer;
60use crate::compression::MiniBlockDecompressor;
61use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
62use crate::encodings::logical::primitive::miniblock::{
63    MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor,
64};
65use crate::format::pb;
66use crate::format::ProtobufUtils;
67use lance_core::Result;
68use snafu::location;
69
70/// Byte Stream Split encoder for floating point values
71///
72/// This encoding splits floating point values by byte position and stores
73/// each byte stream separately. This improves compression ratios for
74/// floating point data with similar patterns.
75#[derive(Debug, Clone)]
76pub struct ByteStreamSplitEncoder {
77    bits_per_value: usize,
78}
79
80impl ByteStreamSplitEncoder {
81    pub fn new(bits_per_value: usize) -> Self {
82        assert!(
83            bits_per_value == 32 || bits_per_value == 64,
84            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
85        );
86        Self { bits_per_value }
87    }
88
89    fn bytes_per_value(&self) -> usize {
90        self.bits_per_value / 8
91    }
92
93    fn max_chunk_size(&self) -> usize {
94        // For ByteStreamSplit, total bytes = bytes_per_value * chunk_size
95        // MAX_MINIBLOCK_BYTES = 8186
96        // For f32 (4 bytes): 8186 / 4 = 2046, so max chunk = 1024 (power of 2)
97        // For f64 (8 bytes): 8186 / 8 = 1023, so max chunk = 512 (power of 2)
98        match self.bits_per_value {
99            32 => 1024,
100            64 => 512,
101            _ => unreachable!("ByteStreamSplit only supports 32 or 64 bit values"),
102        }
103    }
104}
105
106impl MiniBlockCompressor for ByteStreamSplitEncoder {
107    fn compress(&self, page: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
108        match page {
109            DataBlock::FixedWidth(data) => {
110                let num_values = data.num_values;
111                let bytes_per_value = self.bytes_per_value();
112
113                if num_values == 0 {
114                    return Ok((
115                        MiniBlockCompressed {
116                            data: vec![],
117                            chunks: vec![],
118                            num_values: 0,
119                        },
120                        ProtobufUtils::byte_stream_split(self.bits_per_value as u64),
121                    ));
122                }
123
124                let total_size = num_values as usize * bytes_per_value;
125                let mut global_buffer = vec![0u8; total_size];
126
127                let mut chunks = Vec::new();
128                let data_slice = data.data.as_ref();
129                let mut processed_values = 0usize;
130                let max_chunk_size = self.max_chunk_size();
131
132                while processed_values < num_values as usize {
133                    let chunk_size = (num_values as usize - processed_values).min(max_chunk_size);
134                    let chunk_offset = processed_values * bytes_per_value;
135
136                    // Create chunk-local byte streams
137                    for i in 0..chunk_size {
138                        let src_offset = (processed_values + i) * bytes_per_value;
139                        for j in 0..bytes_per_value {
140                            // Store in chunk-local byte stream format
141                            let dst_offset = chunk_offset + j * chunk_size + i;
142                            global_buffer[dst_offset] = data_slice[src_offset + j];
143                        }
144                    }
145
146                    let chunk_bytes = chunk_size * bytes_per_value;
147                    let log_num_values = if processed_values + chunk_size == num_values as usize {
148                        0 // Last chunk
149                    } else {
150                        chunk_size.ilog2() as u8
151                    };
152
153                    chunks.push(MiniBlockChunk {
154                        buffer_sizes: vec![chunk_bytes as u16],
155                        log_num_values,
156                    });
157
158                    processed_values += chunk_size;
159                }
160
161                let data_buffers = vec![LanceBuffer::from(global_buffer)];
162
163                let encoding = ProtobufUtils::byte_stream_split(self.bits_per_value as u64);
164
165                Ok((
166                    MiniBlockCompressed {
167                        data: data_buffers,
168                        chunks,
169                        num_values,
170                    },
171                    encoding,
172                ))
173            }
174            _ => Err(lance_core::Error::InvalidInput {
175                source: "ByteStreamSplit encoding only supports FixedWidth data blocks".into(),
176                location: location!(),
177            }),
178        }
179    }
180}
181
182/// Byte Stream Split decompressor
183#[derive(Debug)]
184pub struct ByteStreamSplitDecompressor {
185    bits_per_value: usize,
186}
187
188impl ByteStreamSplitDecompressor {
189    pub fn new(bits_per_value: usize) -> Self {
190        assert!(
191            bits_per_value == 32 || bits_per_value == 64,
192            "ByteStreamSplit only supports 32-bit (f32) or 64-bit (f64) values"
193        );
194        Self { bits_per_value }
195    }
196
197    fn bytes_per_value(&self) -> usize {
198        self.bits_per_value / 8
199    }
200}
201
202impl MiniBlockDecompressor for ByteStreamSplitDecompressor {
203    fn decompress(&self, data: Vec<LanceBuffer>, num_values: u64) -> Result<DataBlock> {
204        if num_values == 0 {
205            return Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
206                data: LanceBuffer::empty(),
207                bits_per_value: self.bits_per_value as u64,
208                num_values: 0,
209                block_info: BlockInfo::new(),
210            }));
211        }
212
213        let bytes_per_value = self.bytes_per_value();
214        let total_bytes = num_values as usize * bytes_per_value;
215
216        if data.len() != 1 {
217            return Err(lance_core::Error::InvalidInput {
218                source: format!(
219                    "ByteStreamSplit decompression expects 1 buffer, but got {}",
220                    data.len()
221                )
222                .into(),
223                location: location!(),
224            });
225        }
226
227        let input_buffer = &data[0];
228
229        if input_buffer.len() != total_bytes {
230            return Err(lance_core::Error::InvalidInput {
231                source: format!(
232                    "Expected {} bytes for decompression, but got {}",
233                    total_bytes,
234                    input_buffer.len()
235                )
236                .into(),
237                location: location!(),
238            });
239        }
240
241        let mut output = vec![0u8; total_bytes];
242
243        // Input buffer contains chunk-local byte streams
244        for i in 0..num_values as usize {
245            for j in 0..bytes_per_value {
246                let src_offset = j * num_values as usize + i;
247                output[i * bytes_per_value + j] = input_buffer[src_offset];
248            }
249        }
250
251        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
252            data: LanceBuffer::from(output),
253            bits_per_value: self.bits_per_value as u64,
254            num_values,
255            block_info: BlockInfo::new(),
256        }))
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_round_trip_f32() {
266        let encoder = ByteStreamSplitEncoder::new(32);
267        let decompressor = ByteStreamSplitDecompressor::new(32);
268
269        // Test data
270        let values: Vec<f32> = vec![
271            1.0,
272            2.5,
273            -3.7,
274            4.2,
275            0.0,
276            -0.0,
277            f32::INFINITY,
278            f32::NEG_INFINITY,
279        ];
280        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
281
282        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
283            data: LanceBuffer::from(bytes),
284            bits_per_value: 32,
285            num_values: values.len() as u64,
286            block_info: BlockInfo::new(),
287        });
288
289        // Compress
290        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
291
292        // Decompress
293        let decompressed = decompressor
294            .decompress(compressed.data, values.len() as u64)
295            .unwrap();
296        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
297            panic!("Expected FixedWidth DataBlock")
298        };
299
300        // Verify
301        let result_bytes = decompressed_fixed.data.as_ref();
302        let result_values: Vec<f32> = result_bytes
303            .chunks_exact(4)
304            .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
305            .collect();
306
307        assert_eq!(values, result_values);
308    }
309
310    #[test]
311    fn test_round_trip_f64() {
312        let encoder = ByteStreamSplitEncoder::new(64);
313        let decompressor = ByteStreamSplitDecompressor::new(64);
314
315        // Test data
316        let values: Vec<f64> = vec![
317            1.0,
318            2.5,
319            -3.7,
320            4.2,
321            0.0,
322            -0.0,
323            f64::INFINITY,
324            f64::NEG_INFINITY,
325        ];
326        let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
327
328        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
329            data: LanceBuffer::from(bytes),
330            bits_per_value: 64,
331            num_values: values.len() as u64,
332            block_info: BlockInfo::new(),
333        });
334
335        // Compress
336        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
337
338        // Decompress
339        let decompressed = decompressor
340            .decompress(compressed.data, values.len() as u64)
341            .unwrap();
342        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
343            panic!("Expected FixedWidth DataBlock")
344        };
345
346        // Verify
347        let result_bytes = decompressed_fixed.data.as_ref();
348        let result_values: Vec<f64> = result_bytes
349            .chunks_exact(8)
350            .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
351            .collect();
352
353        assert_eq!(values, result_values);
354    }
355
356    #[test]
357    fn test_empty_data() {
358        let encoder = ByteStreamSplitEncoder::new(32);
359        let decompressor = ByteStreamSplitDecompressor::new(32);
360
361        let data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
362            data: LanceBuffer::empty(),
363            bits_per_value: 32,
364            num_values: 0,
365            block_info: BlockInfo::new(),
366        });
367
368        // Compress empty data
369        let (compressed, _encoding) = encoder.compress(data_block).unwrap();
370
371        // Decompress empty data
372        let decompressed = decompressor.decompress(compressed.data, 0).unwrap();
373        let DataBlock::FixedWidth(decompressed_fixed) = &decompressed else {
374            panic!("Expected FixedWidth DataBlock")
375        };
376
377        assert_eq!(decompressed_fixed.num_values, 0);
378        assert_eq!(decompressed_fixed.data.len(), 0);
379    }
380}