Skip to main content

nodedb_codec/
lz4.rs

1//! LZ4 block compression codec for string/log columns.
2//!
3//! Uses `lz4_flex` (pure Rust, WASM-compatible) for fast decompression
4//! with reasonable compression ratios (3-5x for typical log text).
5//!
6//! Data is split into 4KB blocks for random access: to read row N,
7//! decompress only the block containing that row, not the entire column.
8//!
9//! Wire format:
10//! ```text
11//! [4 bytes] total uncompressed size (LE u32)
12//! [4 bytes] block size (LE u32, default 4096)
13//! [4 bytes] block count (LE u32)
14//! [block_count × 4 bytes] compressed block lengths (LE u32 each)
15//! [block_count × N bytes] compressed blocks (concatenated)
16//! ```
17//!
18//! The block length table allows seeking to any block without
19//! decompressing preceding blocks.
20
21use crate::error::CodecError;
22
23/// Default block size for LZ4 compression (4 KiB).
24const DEFAULT_BLOCK_SIZE: usize = 4096;
25
26// ---------------------------------------------------------------------------
27// Public encode / decode API
28// ---------------------------------------------------------------------------
29
30/// Compress raw bytes using LZ4 block compression.
31///
32/// Splits input into `block_size` blocks, compresses each independently.
33pub fn encode(data: &[u8]) -> Vec<u8> {
34    encode_with_block_size(data, DEFAULT_BLOCK_SIZE)
35}
36
37/// Compress with a custom block size (useful for testing or tuning).
38pub fn encode_with_block_size(data: &[u8], block_size: usize) -> Vec<u8> {
39    let block_size = block_size.max(64); // minimum 64 bytes
40    let block_count = if data.is_empty() {
41        0
42    } else {
43        data.len().div_ceil(block_size)
44    };
45
46    // Pre-allocate: header(12) + block_lengths(4*N) + compressed_blocks.
47    let mut out = Vec::with_capacity(12 + block_count * 4 + data.len());
48
49    // Header.
50    out.extend_from_slice(&(data.len() as u32).to_le_bytes());
51    out.extend_from_slice(&(block_size as u32).to_le_bytes());
52    out.extend_from_slice(&(block_count as u32).to_le_bytes());
53
54    // Reserve space for block length table (filled in after compression).
55    let lengths_offset = out.len();
56    out.resize(lengths_offset + block_count * 4, 0);
57
58    // Compress each block.
59    for (i, chunk) in data.chunks(block_size).enumerate() {
60        let compressed = lz4_flex::compress_prepend_size(chunk);
61        let compressed_len = compressed.len() as u32;
62
63        // Write block length into the table.
64        let table_pos = lengths_offset + i * 4;
65        out[table_pos..table_pos + 4].copy_from_slice(&compressed_len.to_le_bytes());
66
67        // Append compressed block.
68        out.extend_from_slice(&compressed);
69    }
70
71    out
72}
73
74/// Decompress LZ4 block-compressed bytes back to raw data.
75pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
76    let header = read_header(data)?;
77
78    if header.block_count == 0 {
79        return Ok(Vec::new());
80    }
81
82    let mut result = Vec::with_capacity(header.uncompressed_size);
83    let mut block_offset = header.data_offset;
84
85    for i in 0..header.block_count {
86        let compressed_len = header.block_lengths[i];
87        let block_end = block_offset + compressed_len;
88
89        if block_end > data.len() {
90            return Err(CodecError::Truncated {
91                expected: block_end,
92                actual: data.len(),
93            });
94        }
95
96        let block_data = &data[block_offset..block_end];
97        let decompressed = lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
98            CodecError::DecompressFailed {
99                detail: format!("LZ4 block {i}: {e}"),
100            }
101        })?;
102
103        result.extend_from_slice(&decompressed);
104        block_offset = block_end;
105    }
106
107    if result.len() != header.uncompressed_size {
108        return Err(CodecError::Corrupt {
109            detail: format!(
110                "uncompressed size mismatch: header says {}, got {}",
111                header.uncompressed_size,
112                result.len()
113            ),
114        });
115    }
116
117    Ok(result)
118}
119
120/// Decompress a single block by index (for random access).
121///
122/// Returns the decompressed bytes of just that block.
123pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
124    let header = read_header(data)?;
125
126    if block_idx >= header.block_count {
127        return Err(CodecError::Corrupt {
128            detail: format!(
129                "block index {block_idx} out of range (block_count={})",
130                header.block_count
131            ),
132        });
133    }
134
135    // Sum lengths of preceding blocks to find this block's offset.
136    let mut block_offset = header.data_offset;
137    for i in 0..block_idx {
138        block_offset += header.block_lengths[i];
139    }
140
141    let compressed_len = header.block_lengths[block_idx];
142    let block_end = block_offset + compressed_len;
143
144    if block_end > data.len() {
145        return Err(CodecError::Truncated {
146            expected: block_end,
147            actual: data.len(),
148        });
149    }
150
151    let block_data = &data[block_offset..block_end];
152    lz4_flex::decompress_size_prepended(block_data).map_err(|e| CodecError::DecompressFailed {
153        detail: format!("LZ4 block {block_idx}: {e}"),
154    })
155}
156
157// ---------------------------------------------------------------------------
158// Header parsing
159// ---------------------------------------------------------------------------
160
161struct Lz4Header {
162    uncompressed_size: usize,
163    block_count: usize,
164    block_lengths: Vec<usize>,
165    /// Byte offset where compressed block data starts.
166    data_offset: usize,
167}
168
169fn read_header(data: &[u8]) -> Result<Lz4Header, CodecError> {
170    if data.len() < 12 {
171        return Err(CodecError::Truncated {
172            expected: 12,
173            actual: data.len(),
174        });
175    }
176
177    let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
178    let _block_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
179    let block_count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
180
181    let lengths_end = 12 + block_count * 4;
182    if data.len() < lengths_end {
183        return Err(CodecError::Truncated {
184            expected: lengths_end,
185            actual: data.len(),
186        });
187    }
188
189    let block_lengths: Vec<usize> = data[12..lengths_end]
190        .chunks_exact(4)
191        .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as usize)
192        .collect();
193
194    Ok(Lz4Header {
195        uncompressed_size,
196        block_count,
197        block_lengths,
198        data_offset: lengths_end,
199    })
200}
201
202// ---------------------------------------------------------------------------
203// Streaming encoder / decoder types
204// ---------------------------------------------------------------------------
205
206/// Streaming LZ4 encoder. Accumulates data and compresses on `finish()`.
207pub struct Lz4Encoder {
208    buf: Vec<u8>,
209    block_size: usize,
210}
211
212impl Lz4Encoder {
213    pub fn new() -> Self {
214        Self {
215            buf: Vec::with_capacity(4096),
216            block_size: DEFAULT_BLOCK_SIZE,
217        }
218    }
219
220    pub fn with_block_size(block_size: usize) -> Self {
221        Self {
222            buf: Vec::with_capacity(block_size),
223            block_size: block_size.max(64),
224        }
225    }
226
227    pub fn push(&mut self, data: &[u8]) {
228        self.buf.extend_from_slice(data);
229    }
230
231    pub fn len(&self) -> usize {
232        self.buf.len()
233    }
234
235    pub fn is_empty(&self) -> bool {
236        self.buf.is_empty()
237    }
238
239    pub fn finish(self) -> Vec<u8> {
240        encode_with_block_size(&self.buf, self.block_size)
241    }
242}
243
244impl Default for Lz4Encoder {
245    fn default() -> Self {
246        Self::new()
247    }
248}
249
250/// LZ4 decoder wrapper.
251pub struct Lz4Decoder;
252
253impl Lz4Decoder {
254    /// Decompress all blocks.
255    pub fn decode_all(data: &[u8]) -> Result<Vec<u8>, CodecError> {
256        decode(data)
257    }
258
259    /// Decompress a single block by index.
260    pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
261        decode_block(data, block_idx)
262    }
263
264    /// Number of blocks in the compressed data.
265    pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
266        let header = read_header(data)?;
267        Ok(header.block_count)
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn empty_roundtrip() {
277        let encoded = encode(&[]);
278        let decoded = decode(&encoded).unwrap();
279        assert!(decoded.is_empty());
280    }
281
282    #[test]
283    fn small_data_roundtrip() {
284        let data = b"hello world, this is a log message";
285        let encoded = encode(data);
286        let decoded = decode(&encoded).unwrap();
287        assert_eq!(decoded, data);
288    }
289
290    #[test]
291    fn large_data_multiple_blocks() {
292        // Generate ~40KB of log-like data (10 blocks of 4KB each).
293        let mut data = Vec::new();
294        for i in 0..1000 {
295            let line = format!(
296                "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}\n",
297                i % 60,
298                10000 + i,
299                i * 3 + 1
300            );
301            data.extend_from_slice(line.as_bytes());
302        }
303
304        let encoded = encode(&data);
305        let decoded = decode(&encoded).unwrap();
306        assert_eq!(decoded, data);
307
308        // LZ4 should achieve at least 2x compression on structured logs.
309        let ratio = data.len() as f64 / encoded.len() as f64;
310        assert!(
311            ratio > 2.0,
312            "expected >2x compression for structured logs, got {ratio:.1}x"
313        );
314    }
315
316    #[test]
317    fn random_access_block() {
318        let data: Vec<u8> = (0..20000).map(|i| (i % 256) as u8).collect();
319        let block_size = 4096;
320        let encoded = encode_with_block_size(&data, block_size);
321
322        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
323        assert_eq!(block_count, data.len().div_ceil(block_size));
324
325        // Decompress each block individually and verify.
326        let mut reassembled = Vec::new();
327        for i in 0..block_count {
328            let block = decode_block(&encoded, i).unwrap();
329            reassembled.extend_from_slice(&block);
330        }
331        assert_eq!(reassembled, data);
332    }
333
334    #[test]
335    fn out_of_range_block_index() {
336        let data = b"some data here";
337        let encoded = encode(data);
338        assert!(decode_block(&encoded, 999).is_err());
339    }
340
341    #[test]
342    fn compressible_log_data() {
343        // Highly repetitive log lines.
344        let line = "2024-01-15 ERROR database connection timeout host=db-prod-01 retry=3\n";
345        let data: Vec<u8> = line.as_bytes().repeat(500);
346        let encoded = encode(&data);
347        let decoded = decode(&encoded).unwrap();
348        assert_eq!(decoded, data);
349
350        let ratio = data.len() as f64 / encoded.len() as f64;
351        assert!(
352            ratio > 3.0,
353            "highly repetitive logs should compress >3x, got {ratio:.1}x"
354        );
355    }
356
357    #[test]
358    fn incompressible_data() {
359        // Random bytes — LZ4 may expand slightly but shouldn't fail.
360        let mut data = vec![0u8; 10_000];
361        let mut rng: u64 = 9999;
362        for byte in &mut data {
363            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
364            *byte = (rng >> 33) as u8;
365        }
366        let encoded = encode(&data);
367        let decoded = decode(&encoded).unwrap();
368        assert_eq!(decoded, data);
369    }
370
371    #[test]
372    fn streaming_encoder() {
373        let parts = [b"hello ".as_ref(), b"world".as_ref(), b" test".as_ref()];
374        let full: Vec<u8> = parts.iter().flat_map(|p| p.iter().copied()).collect();
375
376        let mut enc = Lz4Encoder::new();
377        for part in &parts {
378            enc.push(part);
379        }
380        let encoded = enc.finish();
381        let decoded = decode(&encoded).unwrap();
382        assert_eq!(decoded, full);
383    }
384
385    #[test]
386    fn custom_block_size() {
387        let data = vec![42u8; 10_000];
388        let encoded = encode_with_block_size(&data, 1024);
389        let decoded = decode(&encoded).unwrap();
390        assert_eq!(decoded, data);
391
392        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
393        assert_eq!(block_count, 10); // 10000 / 1024 rounded up
394    }
395
396    #[test]
397    fn truncated_input_errors() {
398        assert!(decode(&[]).is_err());
399        assert!(decode(&[0; 8]).is_err()); // too short for header
400    }
401}