nodedb-codec 0.0.0

Compression codecs for NodeDB timeseries columnar storage
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
//! Interleaved rANS (Asymmetric Numeral Systems) entropy coder.
//!
//! Compresses byte streams to the Shannon entropy limit — optimal
//! compression ratio at Huffman-like speed. Used as the terminal
//! compressor for cold/S3 tier partitions where ratio matters more
//! than decompression speed.
//!
//! 4-stream interleaving breaks the sequential dependency chain:
//! the CPU processes all streams simultaneously, achieving high
//! throughput despite the inherently sequential nature of ANS.
//!
//! Wire format:
//! ```text
//! [4 bytes] uncompressed size (LE u32)
//! [256 × 4 bytes] frequency table (LE u32 per byte value)
//! [4 bytes] compressed size (LE u32)
//! [N bytes] interleaved rANS bitstream (4 streams)
//! ```

use crate::error::CodecError;

/// Number of interleaved streams.
const NUM_STREAMS: usize = 4;

/// rANS probability scale (power of 2 for fast division).
const PROB_BITS: u32 = 14;
const PROB_SCALE: u32 = 1 << PROB_BITS;

/// rANS state lower bound.
const RANS_L: u32 = 1 << 23;

/// Frequency table header size: 256 × 4 bytes = 1024 bytes.
const FREQ_TABLE_SIZE: usize = 256 * 4;

/// Full header: 4 (uncomp size) + 1024 (freq table) + 4 (comp size).
const HEADER_SIZE: usize = 4 + FREQ_TABLE_SIZE + 4;

// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------

/// Compress bytes using interleaved rANS.
pub fn encode(data: &[u8]) -> Vec<u8> {
    if data.is_empty() {
        let out = vec![0u8; HEADER_SIZE];
        // uncompressed_size = 0, freq table = all zeros, compressed_size = 0
        return out;
    }

    // Build frequency table.
    let mut freqs = [0u32; 256];
    for &b in data {
        freqs[b as usize] += 1;
    }

    // Normalize frequencies to sum to PROB_SCALE.
    let norm_freqs = normalize_frequencies(&freqs, data.len());

    // Build cumulative frequency table.
    let (cum_freqs, sym_freqs) = build_cum_table(&norm_freqs);

    // Encode using 4 interleaved streams.
    // Each stream processes every 4th byte: stream 0 gets bytes 0,4,8,...
    let mut streams: [Vec<u8>; NUM_STREAMS] = std::array::from_fn(|_| Vec::new());
    let mut states = [RANS_L; NUM_STREAMS];

    // Encode in REVERSE order (rANS encodes backward, decodes forward).
    for i in (0..data.len()).rev() {
        let stream_idx = i % NUM_STREAMS;
        let sym = data[i] as usize;
        let freq = sym_freqs[sym];
        let start = cum_freqs[sym];

        if freq == 0 {
            continue; // Symbol with zero frequency — shouldn't happen after normalization.
        }

        rans_encode_symbol(
            &mut states[stream_idx],
            &mut streams[stream_idx],
            start,
            freq,
        );
    }

    // Flush final states.
    for i in 0..NUM_STREAMS {
        let s = states[i];
        streams[i].push((s & 0xFF) as u8);
        streams[i].push(((s >> 8) & 0xFF) as u8);
        streams[i].push(((s >> 16) & 0xFF) as u8);
        streams[i].push(((s >> 24) & 0xFF) as u8);
    }

    // Build output.
    let total_compressed: usize = streams.iter().map(|s| s.len()).sum();
    let mut out = Vec::with_capacity(HEADER_SIZE + total_compressed + NUM_STREAMS * 4);

    // Header: uncompressed size.
    out.extend_from_slice(&(data.len() as u32).to_le_bytes());

    // Frequency table (raw counts for decoding).
    for &f in &norm_freqs {
        out.extend_from_slice(&f.to_le_bytes());
    }

    // Compressed size.
    let comp_payload_size = total_compressed + NUM_STREAMS * 4; // streams + per-stream sizes
    out.extend_from_slice(&(comp_payload_size as u32).to_le_bytes());

    // Per-stream sizes (4 bytes each).
    for s in &streams {
        out.extend_from_slice(&(s.len() as u32).to_le_bytes());
    }

    // Stream data (reversed — rANS bitstream is read backward).
    for s in &streams {
        out.extend_from_slice(s);
    }

    out
}

/// Decompress interleaved rANS data.
pub fn decode(data: &[u8]) -> Result<Vec<u8>, CodecError> {
    if data.len() < HEADER_SIZE {
        return Err(CodecError::Truncated {
            expected: HEADER_SIZE,
            actual: data.len(),
        });
    }

    let uncompressed_size = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
    if uncompressed_size == 0 {
        return Ok(Vec::new());
    }

    // Read frequency table.
    let mut norm_freqs = [0u32; 256];
    for (i, freq) in norm_freqs.iter_mut().enumerate() {
        let pos = 4 + i * 4;
        *freq = u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]);
    }

    let (cum_freqs, sym_freqs) = build_cum_table(&norm_freqs);

    // Build reverse lookup table for decoding.
    let lookup = build_decode_table(&cum_freqs, &sym_freqs);

    let _comp_size = u32::from_le_bytes([
        data[HEADER_SIZE - 4],
        data[HEADER_SIZE - 3],
        data[HEADER_SIZE - 2],
        data[HEADER_SIZE - 1],
    ]) as usize;

    // Read per-stream sizes.
    let mut pos = HEADER_SIZE;
    if pos + NUM_STREAMS * 4 > data.len() {
        return Err(CodecError::Truncated {
            expected: pos + NUM_STREAMS * 4,
            actual: data.len(),
        });
    }

    let mut stream_sizes = [0usize; NUM_STREAMS];
    for size in stream_sizes.iter_mut() {
        *size =
            u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
        pos += 4;
    }

    // Read streams.
    let mut stream_data: [Vec<u8>; NUM_STREAMS] = std::array::from_fn(|_| Vec::new());
    for i in 0..NUM_STREAMS {
        let end = pos + stream_sizes[i];
        if end > data.len() {
            return Err(CodecError::Truncated {
                expected: end,
                actual: data.len(),
            });
        }
        stream_data[i] = data[pos..end].to_vec();
        pos += stream_sizes[i];
    }

    // Initialize states from the end of each stream.
    let mut states = [0u32; NUM_STREAMS];
    let mut stream_pos = [0usize; NUM_STREAMS];
    for i in 0..NUM_STREAMS {
        let s = &stream_data[i];
        if s.len() < 4 {
            return Err(CodecError::Corrupt {
                detail: format!("rANS stream {i} too short for state"),
            });
        }
        let end = s.len();
        states[i] = u32::from_le_bytes([s[end - 4], s[end - 3], s[end - 2], s[end - 1]]);
        stream_pos[i] = end - 4;
    }

    // Decode forward.
    let mut output = vec![0u8; uncompressed_size];
    for (i, out_byte) in output.iter_mut().enumerate() {
        let stream_idx = i % NUM_STREAMS;
        let (sym, new_state) =
            rans_decode_symbol(states[stream_idx], &lookup, &cum_freqs, &sym_freqs);
        *out_byte = sym;
        states[stream_idx] = rans_decode_renorm(
            new_state,
            &stream_data[stream_idx],
            &mut stream_pos[stream_idx],
        );
    }

    Ok(output)
}

// ---------------------------------------------------------------------------
// rANS core operations
// ---------------------------------------------------------------------------

fn rans_encode_symbol(state: &mut u32, bitstream: &mut Vec<u8>, start: u32, freq: u32) {
    // Renormalize: output bytes until state is in the correct range.
    let max_state = ((RANS_L >> PROB_BITS) << 8) * freq;
    while *state >= max_state {
        bitstream.push((*state & 0xFF) as u8);
        *state >>= 8;
    }

    // Encode symbol.
    *state = ((*state / freq) << PROB_BITS) + (*state % freq) + start;
}

fn rans_decode_symbol(
    state: u32,
    lookup: &[u8; PROB_SCALE as usize],
    cum_freqs: &[u32; 257],
    sym_freqs: &[u32; 256],
) -> (u8, u32) {
    let slot = state & (PROB_SCALE - 1);
    let sym = lookup[slot as usize];
    let start = cum_freqs[sym as usize];
    let freq = sym_freqs[sym as usize];

    let new_state = freq * (state >> PROB_BITS) + slot - start;
    (sym, new_state)
}

fn rans_decode_renorm(mut state: u32, stream: &[u8], pos: &mut usize) -> u32 {
    while state < RANS_L && *pos > 0 {
        *pos -= 1;
        state = (state << 8) | stream[*pos] as u32;
    }
    state
}

// ---------------------------------------------------------------------------
// Frequency table operations
// ---------------------------------------------------------------------------

/// Normalize raw frequencies to sum to PROB_SCALE.
fn normalize_frequencies(freqs: &[u32; 256], total: usize) -> [u32; 256] {
    let mut norm = [0u32; 256];
    let mut sum = 0u32;
    let total_f64 = total as f64;

    // First pass: proportional scaling.
    for i in 0..256 {
        if freqs[i] > 0 {
            // Ensure every present symbol gets at least frequency 1.
            norm[i] = ((freqs[i] as f64 / total_f64 * PROB_SCALE as f64).round() as u32).max(1);
            sum += norm[i];
        }
    }

    // Adjust to make sum exactly PROB_SCALE.
    if sum > 0 {
        while sum > PROB_SCALE {
            // Find the symbol with the highest frequency and reduce it.
            let max_idx = norm
                .iter()
                .enumerate()
                .filter(|(_, f)| **f > 1)
                .max_by_key(|(_, f)| **f)
                .map(|(i, _)| i)
                .unwrap_or(0);
            norm[max_idx] -= 1;
            sum -= 1;
        }
        while sum < PROB_SCALE {
            let max_idx = norm
                .iter()
                .enumerate()
                .max_by_key(|(_, f)| **f)
                .map(|(i, _)| i)
                .unwrap_or(0);
            norm[max_idx] += 1;
            sum += 1;
        }
    }

    norm
}

/// Build cumulative frequency table.
fn build_cum_table(freqs: &[u32; 256]) -> ([u32; 257], [u32; 256]) {
    let mut cum = [0u32; 257];
    let sym_freqs = *freqs;
    for i in 0..256 {
        cum[i + 1] = cum[i] + freqs[i];
    }
    (cum, sym_freqs)
}

/// Build decode lookup table: for each slot in [0, PROB_SCALE), which symbol?
fn build_decode_table(
    cum_freqs: &[u32; 257],
    _sym_freqs: &[u32; 256],
) -> [u8; PROB_SCALE as usize] {
    let mut table = [0u8; PROB_SCALE as usize];
    for sym in 0..256u16 {
        let start = cum_freqs[sym as usize] as usize;
        let end = cum_freqs[sym as usize + 1] as usize;
        for entry in table[start..end].iter_mut() {
            *entry = sym as u8;
        }
    }
    table
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn empty_roundtrip() {
        let encoded = encode(&[]);
        let decoded = decode(&encoded).unwrap();
        assert!(decoded.is_empty());
    }

    #[test]
    fn single_byte() {
        let encoded = encode(&[42]);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, vec![42]);
    }

    #[test]
    fn repeated_bytes() {
        let data = vec![0u8; 10_000];
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        // Highly repetitive → near-zero entropy → excellent compression.
        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(
            ratio > 2.0,
            "repeated bytes should compress >2x, got {ratio:.1}x"
        );
    }

    #[test]
    fn text_data() {
        let text = b"the quick brown fox jumps over the lazy dog. ";
        let data: Vec<u8> = text.iter().copied().cycle().take(10_000).collect();
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(ratio > 1.5, "text should compress >1.5x, got {ratio:.1}x");
    }

    #[test]
    fn uniform_random_data() {
        // Uniform random → ~8 bits/byte → no compression possible.
        let mut data = vec![0u8; 5000];
        let mut rng: u64 = 12345;
        for byte in &mut data {
            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
            *byte = (rng >> 33) as u8;
        }
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);
    }

    #[test]
    fn all_byte_values() {
        // All 256 byte values present.
        let data: Vec<u8> = (0..=255u8).cycle().take(4096).collect();
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);
    }

    #[test]
    fn skewed_distribution() {
        // 90% zeros, 10% ones — should compress well.
        let mut data = vec![0u8; 10_000];
        for i in 0..1000 {
            data[i * 10] = 1;
        }
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(
            ratio > 1.5,
            "skewed data should compress >1.5x, got {ratio:.1}x"
        );
    }

    #[test]
    fn better_than_raw_on_structured() {
        // Structured data after type-aware preprocessing (typical pipeline output).
        let mut data = Vec::with_capacity(10_000);
        for i in 0..10_000 {
            data.push((i % 16) as u8); // Low entropy, 4 bits/byte → 2x compression.
        }
        let encoded = encode(&data);
        let decoded = decode(&encoded).unwrap();
        assert_eq!(decoded, data);

        let ratio = data.len() as f64 / encoded.len() as f64;
        assert!(
            ratio > 1.5,
            "low-entropy data should compress >1.5x, got {ratio:.1}x"
        );
    }

    #[test]
    fn truncated_input_errors() {
        assert!(decode(&[]).is_err());
        assert!(decode(&[1, 0, 0, 0]).is_err()); // too short for freq table
    }
}