nodedb-codec 0.1.0

Compression codecs for NodeDB timeseries columnar storage
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
// SPDX-License-Identifier: Apache-2.0

//! LZ4 block compression codec for string/log columns.
//!
//! Uses `lz4_flex` (pure Rust, WASM-compatible) for fast decompression
//! with reasonable compression ratios (3-5x for typical log text).
//!
//! Data is split into 4KB blocks for random access: to read row N,
//! decompress only the block containing that row, not the entire column.
//!
//! Wire format:
//! ```text
//! [4 bytes] total uncompressed size (LE u32)
//! [4 bytes] block size (LE u32, default 4096)
//! [4 bytes] block count (LE u32)
//! [block_count × 4 bytes] compressed block lengths (LE u32 each)
//! [block_count × N bytes] compressed blocks (concatenated)
//! ```
//!
//! The block length table allows seeking to any block without
//! decompressing preceding blocks.

use crate::error::CodecError;

/// Default block size for LZ4 compression (4 KiB).
const DEFAULT_BLOCK_SIZE: usize = 4096;

// ---------------------------------------------------------------------------
// Public encode / decode API
// ---------------------------------------------------------------------------

/// Compress raw bytes using LZ4 block compression.
///
/// Splits input into `block_size` blocks, compresses each independently.
pub fn encode(data: &[u8]) -> Vec<u8> {
    encode_with_block_size(data, DEFAULT_BLOCK_SIZE)
}

/// Compress with a custom block size (useful for testing or tuning).
pub fn encode_with_block_size(data: &[u8], block_size: usize) -> Vec<u8> {
    let block_size = block_size.max(64); // minimum 64 bytes
    let block_count = if data.is_empty() {
        0
    } else {
        data.len().div_ceil(block_size)
    };

    // Pre-allocate: header(12) + block_lengths(4*N) + compressed_blocks.
    let mut out = Vec::with_capacity(12 + block_count * 4 + data.len());

    // Header.
    out.extend_from_slice(&(data.len() as u32).to_le_bytes());
    out.extend_from_slice(&(block_size as u32).to_le_bytes());
    out.extend_from_slice(&(block_count as u32).to_le_bytes());

    // Reserve space for block length table (filled in after compression).
    let lengths_offset = out.len();
    out.resize(lengths_offset + block_count * 4, 0);

    // Compress each block.
    for (i, chunk) in data.chunks(block_size).enumerate() {
        let compressed = lz4_flex::compress_prepend_size(chunk);
        let compressed_len = compressed.len() as u32;

        // Write block length into the table.
        let table_pos = lengths_offset + i * 4;
        out[table_pos..table_pos + 4].copy_from_slice(&compressed_len.to_le_bytes());

        // Append compressed block.
        out.extend_from_slice(&compressed);
    }

    out
}

/// Decompress LZ4 block-compressed bytes back to raw data.
pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
    let header = read_header(data)?;

    if header.block_count == 0 {
        return Ok(Vec::new());
    }

    let mut result = Vec::with_capacity(header.uncompressed_size);
    let mut block_offset = header.data_offset;

    for i in 0..header.block_count {
        let compressed_len = header.block_lengths[i];
        let block_end = block_offset + compressed_len;

        if block_end > data.len() {
            return Err(CodecError::Truncated {
                expected: block_end,
                actual: data.len(),
            });
        }

        let block_data = &data[block_offset..block_end];
        let decompressed = lz4_flex::decompress_size_prepended(block_data).map_err(|e| {
            CodecError::DecompressFailed {
                detail: format!("LZ4 block {i}: {e}"),
            }
        })?;

        result.extend_from_slice(&decompressed);
        block_offset = block_end;
    }

    if result.len() != header.uncompressed_size {
        return Err(CodecError::Corrupt {
            detail: format!(
                "uncompressed size mismatch: header says {}, got {}",
                header.uncompressed_size,
                result.len()
            ),
        });
    }

    Ok(result)
}

/// Decompress a single block by index (for random access).
///
/// Returns the decompressed bytes of just that block.
pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
    let header = read_header(data)?;

    if block_idx >= header.block_count {
        return Err(CodecError::Corrupt {
            detail: format!(
                "block index {block_idx} out of range (block_count={})",
                header.block_count
            ),
        });
    }

    // Sum lengths of preceding blocks to find this block's offset.
    let mut block_offset = header.data_offset;
    for i in 0..block_idx {
        block_offset += header.block_lengths[i];
    }

    let compressed_len = header.block_lengths[block_idx];
    let block_end = block_offset + compressed_len;

    if block_end > data.len() {
        return Err(CodecError::Truncated {
            expected: block_end,
            actual: data.len(),
        });
    }

    let block_data = &data[block_offset..block_end];
    lz4_flex::decompress_size_prepended(block_data).map_err(|e| CodecError::DecompressFailed {
        detail: format!("LZ4 block {block_idx}: {e}"),
    })
}

// ---------------------------------------------------------------------------
// Header parsing
// ---------------------------------------------------------------------------

struct Lz4Header {
    uncompressed_size: usize,
    block_count: usize,
    block_lengths: Vec<usize>,
    /// Byte offset where compressed block data starts.
    data_offset: usize,
}

fn read_header(data: &[u8]) -> Result<Lz4Header, CodecError> {
    if data.len() < 12 {
        return Err(CodecError::Truncated {
            expected: 12,
            actual: data.len(),
        });
    }

    let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
    let _block_size = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
    let block_count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;

    let lengths_end = 12 + block_count * 4;
    if data.len() < lengths_end {
        return Err(CodecError::Truncated {
            expected: lengths_end,
            actual: data.len(),
        });
    }

    let block_lengths: Vec<usize> = data[12..lengths_end]
        .chunks_exact(4)
        .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as usize)
        .collect();

    Ok(Lz4Header {
        uncompressed_size,
        block_count,
        block_lengths,
        data_offset: lengths_end,
    })
}

// ---------------------------------------------------------------------------
// Streaming encoder / decoder types
// ---------------------------------------------------------------------------

/// Streaming LZ4 encoder. Accumulates data and compresses on `finish()`.
pub struct Lz4Encoder {
    buf: Vec<u8>,
    block_size: usize,
}

impl Lz4Encoder {
    pub fn new() -> Self {
        Self {
            buf: Vec::with_capacity(4096),
            block_size: DEFAULT_BLOCK_SIZE,
        }
    }

    pub fn with_block_size(block_size: usize) -> Self {
        Self {
            buf: Vec::with_capacity(block_size),
            block_size: block_size.max(64),
        }
    }

    pub fn push(&mut self, data: &[u8]) {
        self.buf.extend_from_slice(data);
    }

    pub fn len(&self) -> usize {
        self.buf.len()
    }

    pub fn is_empty(&self) -> bool {
        self.buf.is_empty()
    }

    pub fn finish(self) -> Vec<u8> {
        encode_with_block_size(&self.buf, self.block_size)
    }
}

impl Default for Lz4Encoder {
    fn default() -> Self {
        Self::new()
    }
}

/// LZ4 decoder wrapper.
pub struct Lz4Decoder;

impl Lz4Decoder {
    /// Decompress all blocks.
    pub fn decode_all(data: &[u8]) -> Result<Vec<u8>, CodecError> {
        decode(data)
    }

    /// Decompress a single block by index.
    pub fn decode_block(data: &[u8], block_idx: usize) -> Result<Vec<u8>, CodecError> {
        decode_block(data, block_idx)
    }

    /// Number of blocks in the compressed data.
    pub fn block_count(data: &[u8]) -> Result<usize, CodecError> {
        let header = read_header(data)?;
        Ok(header.block_count)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn empty_roundtrip() {
        let encoded = encode(&[]);
        let decoded = decode(&encoded).unwrap();
        assert!(decoded.is_empty());
    }

    #[test]
    fn small_data_roundtrip() {
        let data = b"hello world, this is a log message";
        let encoded = encode(data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);
    }

    #[test]
    fn large_data_multiple_blocks() {
        // Generate ~40KB of log-like data (10 blocks of 4KB each).
        let mut data = Vec::new();
        for i in 0..1000 {
            let line = format!(
                "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}\n",
                i % 60,
                10000 + i,
                i * 3 + 1
            );
            data.extend_from_slice(line.as_bytes());
        }

        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        // LZ4 should achieve at least 2x compression on structured logs.
        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(
            ratio > 2.0,
            "expected >2x compression for structured logs, got {ratio:.1}x"
        );
    }

    #[test]
    fn random_access_block() {
        let data: Vec<u8> = (0..20000).map(|i| (i % 256) as u8).collect();
        let block_size = 4096;
        let encoded = encode_with_block_size(&data, block_size);

        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
        assert_eq!(block_count, data.len().div_ceil(block_size));

        // Decompress each block individually and verify.
        let mut reassembled = Vec::new();
        for i in 0..block_count {
            let block = decode_block(&encoded, i).unwrap();
            reassembled.extend_from_slice(&block);
        }
        assert_eq!(reassembled, data);
    }

    #[test]
    fn out_of_range_block_index() {
        let data = b"some data here";
        let encoded = encode(data);
        assert!(decode_block(&encoded, 999).is_err());
    }

    #[test]
    fn compressible_log_data() {
        // Highly repetitive log lines.
        let line = "2024-01-15 ERROR database connection timeout host=db-prod-01 retry=3\n";
        let data: Vec<u8> = line.as_bytes().repeat(500);
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(
            ratio > 3.0,
            "highly repetitive logs should compress >3x, got {ratio:.1}x"
        );
    }

    #[test]
    fn incompressible_data() {
        // Random bytes — LZ4 may expand slightly but shouldn't fail.
        let mut data = vec![0u8; 10_000];
        let mut rng: u64 = 9999;
        for byte in &mut data {
            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
            *byte = (rng >> 33) as u8;
        }
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);
    }

    #[test]
    fn streaming_encoder() {
        let parts = [b"hello ".as_ref(), b"world".as_ref(), b" test".as_ref()];
        let full: Vec<u8> = parts.iter().flat_map(|p| p.iter().copied()).collect();

        let mut enc = Lz4Encoder::new();
        for part in &parts {
            enc.push(part);
        }
        let encoded = enc.finish();
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, full);
    }

    #[test]
    fn custom_block_size() {
        let data = vec![42u8; 10_000];
        let encoded = encode_with_block_size(&data, 1024);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        let block_count = Lz4Decoder::block_count(&encoded).unwrap();
        assert_eq!(block_count, 10); // 10000 / 1024 rounded up
    }

    #[test]
    fn truncated_input_errors() {
        assert!(decode(&[]).is_err());
        assert!(decode(&[0; 8]).is_err()); // too short for header
    }
}