Skip to main content

nodedb_codec/
lz4.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! LZ4 block compression codec for string/log columns.
4//!
5//! Uses `lz4_flex` (pure Rust, WASM-compatible) for fast decompression
6//! with reasonable compression ratios (3-5x for typical log text).
7//!
8//! Data is split into 4KB blocks for random access: to read row N,
9//! decompress only the block containing that row, not the entire column.
10//!
11//! Wire format:
12//! ```text
13//! [4 bytes] total uncompressed size (LE u32)
14//! [4 bytes] block size (LE u32, default 4096)
15//! [4 bytes] block count (LE u32)
16//! [block_count × 4 bytes] compressed block lengths (LE u32 each)
17//! [block_count × N bytes] compressed blocks (concatenated)
18//! ```
19//!
20//! The block length table allows seeking to any block without
21//! decompressing preceding blocks.
22
23use crate::error::CodecError;
24
25/// Default block size for LZ4 compression (4 KiB).
26const DEFAULT_BLOCK_SIZE: usize = 4096;
27
28// ---------------------------------------------------------------------------
29// Public encode / decode API
30// ---------------------------------------------------------------------------
31
32/// Compress raw bytes using LZ4 block compression.
33///
34/// Splits input into `block_size` blocks, compresses each independently.
35pub fn encode(data: &[u8]) -> Vec<u8> {
36    encode_with_block_size(data, DEFAULT_BLOCK_SIZE)
37}
38
39/// Compress with a custom block size (useful for testing or tuning).
40pub fn encode_with_block_size(data: &[u8], block_size: usize) -> Vec<u8> {
41    let block_size = block_size.max(64); // minimum 64 bytes
42    let block_count = if data.is_empty() {
43        0
44    } else {
45        data.len().div_ceil(block_size)
46    };
47
48    // Pre-allocate: header(12) + block_lengths(4*N) + compressed_blocks.
49    let mut out = Vec::with_capacity(12 + block_count * 4 + data.len());
50
51    // Header.
52    out.extend_from_slice(&(data.len() as u32).to_le_bytes());
53    out.extend_from_slice(&(block_size as u32).to_le_bytes());
54    out.extend_from_slice(&(block_count as u32).to_le_bytes());
55
56    // Reserve space for block length table (filled in after compression).
57    let lengths_offset = out.len();
58    out.resize(lengths_offset + block_count * 4, 0);
59
60    // Compress each block.
61    for (i, chunk) in data.chunks(block_size).enumerate() {
62        let compressed = lz4_flex::compress_prepend_size(chunk);
63        let compressed_len = compressed.len() as u32;
64
65        // Write block length into the table.
66        let table_pos = lengths_offset + i * 4;
67        out[table_pos..table_pos + 4].copy_from_slice(&compressed_len.to_le_bytes());
68
69        // Append compressed block.
70        out.extend_from_slice(&compressed);
71    }
72
73    out
74}
75
76/// Decompress LZ4 block-compressed bytes back to raw data.
77pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
78    let header = read_header(data)?;
79
80    if header.block_count == 0 {
81        return Ok(Vec::new());
82    }
83
84    let mut result = Vec::with_capacity(header.uncompressed_size);
85    let mut block_offset = header.data_offset;
86
87    for i in 0..header.block_count {
88        let compressed_len = header.block_lengths[i];
89        let block_end = block_offset + compressed_len;
90
91        if block_end > data.len() {
92            return Err(CodecError::Truncated {
93                expected: block_end,
94                actual: data.len(),
95            });
96        }
97
98        let block_data = &data[block_offset..block_end];
99        let decompressed = lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
100            CodecError::DecompressFailed {
101                detail: format!("LZ4 block {i}: {e}"),
102            }
103        })?;
104
105        result.extend_from_slice(&decompressed);
106        block_offset = block_end;
107    }
108
109    if result.len() != header.uncompressed_size {
110        return Err(CodecError::Corrupt {
111            detail: format!(
112                "uncompressed size mismatch: header says {}, got {}",
113                header.uncompressed_size,
114                result.len()
115            ),
116        });
117    }
118
119    Ok(result)
120}
121
122/// Decompress a single block by index (for random access).
123///
124/// Returns the decompressed bytes of just that block.
125pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
126    let header = read_header(data)?;
127
128    if block_idx >= header.block_count {
129        return Err(CodecError::Corrupt {
130            detail: format!(
131                "block index {block_idx} out of range (block_count={})",
132                header.block_count
133            ),
134        });
135    }
136
137    // Sum lengths of preceding blocks to find this block's offset.
138    let mut block_offset = header.data_offset;
139    for i in 0..block_idx {
140        block_offset += header.block_lengths[i];
141    }
142
143    let compressed_len = header.block_lengths[block_idx];
144    let block_end = block_offset + compressed_len;
145
146    if block_end > data.len() {
147        return Err(CodecError::Truncated {
148            expected: block_end,
149            actual: data.len(),
150        });
151    }
152
153    let block_data = &data[block_offset..block_end];
154    lz4_flex::decompress_size_prepended(block_data).map_err(|e| CodecError::DecompressFailed {
155        detail: format!("LZ4 block {block_idx}: {e}"),
156    })
157}
158
159// ---------------------------------------------------------------------------
160// Header parsing
161// ---------------------------------------------------------------------------
162
163struct Lz4Header {
164    uncompressed_size: usize,
165    block_count: usize,
166    block_lengths: Vec<usize>,
167    /// Byte offset where compressed block data starts.
168    data_offset: usize,
169}
170
171fn read_header(data: &[u8]) -> Result<Lz4Header, CodecError> {
172    if data.len() < 12 {
173        return Err(CodecError::Truncated {
174            expected: 12,
175            actual: data.len(),
176        });
177    }
178
179    let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
180    let _block_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
181    let block_count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
182
183    let lengths_end = 12 + block_count * 4;
184    if data.len() < lengths_end {
185        return Err(CodecError::Truncated {
186            expected: lengths_end,
187            actual: data.len(),
188        });
189    }
190
191    let block_lengths: Vec<usize> = data[12..lengths_end]
192        .chunks_exact(4)
193        .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as usize)
194        .collect();
195
196    Ok(Lz4Header {
197        uncompressed_size,
198        block_count,
199        block_lengths,
200        data_offset: lengths_end,
201    })
202}
203
204// ---------------------------------------------------------------------------
205// Streaming encoder / decoder types
206// ---------------------------------------------------------------------------
207
208/// Streaming LZ4 encoder. Accumulates data and compresses on `finish()`.
209pub struct Lz4Encoder {
210    buf: Vec<u8>,
211    block_size: usize,
212}
213
214impl Lz4Encoder {
215    pub fn new() -> Self {
216        Self {
217            buf: Vec::with_capacity(4096),
218            block_size: DEFAULT_BLOCK_SIZE,
219        }
220    }
221
222    pub fn with_block_size(block_size: usize) -> Self {
223        Self {
224            buf: Vec::with_capacity(block_size),
225            block_size: block_size.max(64),
226        }
227    }
228
229    pub fn push(&mut self, data: &[u8]) {
230        self.buf.extend_from_slice(data);
231    }
232
233    pub fn len(&self) -> usize {
234        self.buf.len()
235    }
236
237    pub fn is_empty(&self) -> bool {
238        self.buf.is_empty()
239    }
240
241    pub fn finish(self) -> Vec<u8> {
242        encode_with_block_size(&self.buf, self.block_size)
243    }
244}
245
246impl Default for Lz4Encoder {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252/// LZ4 decoder wrapper.
253pub struct Lz4Decoder;
254
255impl Lz4Decoder {
256    /// Decompress all blocks.
257    pub fn decode_all(data: &[u8]) -> Result<Vec<u8>, CodecError> {
258        decode(data)
259    }
260
261    /// Decompress a single block by index.
262    pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
263        decode_block(data, block_idx)
264    }
265
266    /// Number of blocks in the compressed data.
267    pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
268        let header = read_header(data)?;
269        Ok(header.block_count)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn empty_roundtrip() {
279        let encoded = encode(&[]);
280        let decoded = decode(&encoded).unwrap();
281        assert!(decoded.is_empty());
282    }
283
284    #[test]
285    fn small_data_roundtrip() {
286        let data = b"hello world, this is a log message";
287        let encoded = encode(data);
288        let decoded = decode(&encoded).unwrap();
289        assert_eq!(decoded, data);
290    }
291
292    #[test]
293    fn large_data_multiple_blocks() {
294        // Generate ~40KB of log-like data (10 blocks of 4KB each).
295        let mut data = Vec::new();
296        for i in 0..1000 {
297            let line = format!(
298                "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}\n",
299                i % 60,
300                10000 + i,
301                i * 3 + 1
302            );
303            data.extend_from_slice(line.as_bytes());
304        }
305
306        let encoded = encode(&data);
307        let decoded = decode(&encoded).unwrap();
308        assert_eq!(decoded, data);
309
310        // LZ4 should achieve at least 2x compression on structured logs.
311        let ratio = data.len() as f64 / encoded.len() as f64;
312        assert!(
313            ratio > 2.0,
314            "expected >2x compression for structured logs, got {ratio:.1}x"
315        );
316    }
317
318    #[test]
319    fn random_access_block() {
320        let data: Vec<u8> = (0..20000).map(|i| (i % 256) as u8).collect();
321        let block_size = 4096;
322        let encoded = encode_with_block_size(&data, block_size);
323
324        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
325        assert_eq!(block_count, data.len().div_ceil(block_size));
326
327        // Decompress each block individually and verify.
328        let mut reassembled = Vec::new();
329        for i in 0..block_count {
330            let block = decode_block(&encoded, i).unwrap();
331            reassembled.extend_from_slice(&block);
332        }
333        assert_eq!(reassembled, data);
334    }
335
336    #[test]
337    fn out_of_range_block_index() {
338        let data = b"some data here";
339        let encoded = encode(data);
340        assert!(decode_block(&encoded, 999).is_err());
341    }
342
343    #[test]
344    fn compressible_log_data() {
345        // Highly repetitive log lines.
346        let line = "2024-01-15 ERROR database connection timeout host=db-prod-01 retry=3\n";
347        let data: Vec<u8> = line.as_bytes().repeat(500);
348        let encoded = encode(&data);
349        let decoded = decode(&encoded).unwrap();
350        assert_eq!(decoded, data);
351
352        let ratio = data.len() as f64 / encoded.len() as f64;
353        assert!(
354            ratio > 3.0,
355            "highly repetitive logs should compress >3x, got {ratio:.1}x"
356        );
357    }
358
359    #[test]
360    fn incompressible_data() {
361        // Random bytes — LZ4 may expand slightly but shouldn't fail.
362        let mut data = vec![0u8; 10_000];
363        let mut rng: u64 = 9999;
364        for byte in &mut data {
365            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
366            *byte = (rng >> 33) as u8;
367        }
368        let encoded = encode(&data);
369        let decoded = decode(&encoded).unwrap();
370        assert_eq!(decoded, data);
371    }
372
373    #[test]
374    fn streaming_encoder() {
375        let parts = [b"hello ".as_ref(), b"world".as_ref(), b" test".as_ref()];
376        let full: Vec<u8> = parts.iter().flat_map(|p| p.iter().copied()).collect();
377
378        let mut enc = Lz4Encoder::new();
379        for part in &parts {
380            enc.push(part);
381        }
382        let encoded = enc.finish();
383        let decoded = decode(&encoded).unwrap();
384        assert_eq!(decoded, full);
385    }
386
387    #[test]
388    fn custom_block_size() {
389        let data = vec![42u8; 10_000];
390        let encoded = encode_with_block_size(&data, 1024);
391        let decoded = decode(&encoded).unwrap();
392        assert_eq!(decoded, data);
393
394        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
395        assert_eq!(block_count, 10); // 10000 / 1024 rounded up
396    }
397
398    #[test]
399    fn truncated_input_errors() {
400        assert!(decode(&[]).is_err());
401        assert!(decode(&[0; 8]).is_err()); // too short for header
402    }
403}