Skip to main content

nodedb_codec/
delta.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Delta codec for monotonic counter columns.
4//!
5//! Monotonic counters (bytes_sent, request_count) have small positive deltas
6//! that compress well with simple delta encoding + varint packing.
7//!
8//! Wire format:
9//! ```text
10//! [4 bytes] sample count (LE u32)
11//! [8 bytes] first value (LE i64)
12//! [N bytes] varint-encoded deltas (ZigZag + LEB128)
13//! ```
14//!
15//! Varint encoding uses ZigZag to handle negative deltas (counter resets)
16//! efficiently: small absolute values → 1-2 bytes regardless of sign.
17//!
18//! Compression: monotonic counters with small increments → ~1-2 bytes/sample.
19//! Non-monotonic data → ~2-4 bytes/sample (still better than raw 8 bytes).
20
21use crate::error::CodecError;
22
23// ---------------------------------------------------------------------------
24// ZigZag + LEB128 varint encoding
25// ---------------------------------------------------------------------------
26
27/// ZigZag-encode a signed i64 into an unsigned u64.
28///
29/// Maps signed integers to unsigned so small absolute values have small
30/// representations: 0→0, -1→1, 1→2, -2→3, 2→4, ...
31#[inline]
32fn zigzag_encode(v: i64) -> u64 {
33    ((v << 1) ^ (v >> 63)) as u64
34}
35
36/// ZigZag-decode an unsigned u64 back to signed i64.
37#[inline]
38fn zigzag_decode(v: u64) -> i64 {
39    ((v >> 1) as i64) ^ -((v & 1) as i64)
40}
41
42/// Write a varint (LEB128-encoded u64) to a buffer.
43fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
44    loop {
45        let mut byte = (value & 0x7F) as u8;
46        value >>= 7;
47        if value != 0 {
48            byte |= 0x80;
49        }
50        buf.push(byte);
51        if value == 0 {
52            break;
53        }
54    }
55}
56
57/// Read a varint (LEB128-encoded u64) from a byte slice.
58///
59/// Returns `(value, bytes_consumed)`.
60fn read_varint(data: &[u8]) -> Result<(u64, usize), CodecError> {
61    let mut value: u64 = 0;
62    let mut shift: u32 = 0;
63
64    for (i, &byte) in data.iter().enumerate() {
65        if shift >= 70 {
66            return Err(CodecError::Corrupt {
67                detail: "varint too long (>10 bytes)".into(),
68            });
69        }
70
71        value |= ((byte & 0x7F) as u64) << shift;
72        shift += 7;
73
74        if byte & 0x80 == 0 {
75            return Ok((value, i + 1));
76        }
77    }
78
79    Err(CodecError::Truncated {
80        expected: data.len() + 1,
81        actual: data.len(),
82    })
83}
84
85// ---------------------------------------------------------------------------
86// Public encode / decode API
87// ---------------------------------------------------------------------------
88
89/// Encode a slice of i64 values using Delta + ZigZag-varint compression.
90pub fn encode(values: &[i64]) -> Vec<u8> {
91    let count = values.len() as u32;
92    // Estimate: header(4) + first_value(8) + ~2 bytes per delta.
93    let mut out = Vec::with_capacity(12 + values.len() * 2);
94
95    out.extend_from_slice(&count.to_le_bytes());
96
97    if values.is_empty() {
98        return out;
99    }
100
101    out.extend_from_slice(&values[0].to_le_bytes());
102
103    for i in 1..values.len() {
104        let delta = values[i].wrapping_sub(values[i - 1]);
105        write_varint(&mut out, zigzag_encode(delta));
106    }
107
108    out
109}
110
111/// Decode Delta-compressed bytes back to i64 values.
112pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
113    if data.len() < 4 {
114        return Err(CodecError::Truncated {
115            expected: 4,
116            actual: data.len(),
117        });
118    }
119
120    let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
121        detail: "invalid header".into(),
122    })?) as usize;
123
124    if count == 0 {
125        return Ok(Vec::new());
126    }
127
128    if data.len() < 12 {
129        return Err(CodecError::Truncated {
130            expected: 12,
131            actual: data.len(),
132        });
133    }
134
135    let first_value =
136        i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
137            detail: "invalid first value".into(),
138        })?);
139
140    let mut values = Vec::with_capacity(count);
141    values.push(first_value);
142
143    let mut offset = 12;
144    for _ in 1..count {
145        if offset >= data.len() {
146            return Err(CodecError::Truncated {
147                expected: offset + 1,
148                actual: data.len(),
149            });
150        }
151        let (encoded_delta, consumed) = read_varint(&data[offset..])?;
152        let delta = zigzag_decode(encoded_delta);
153        let value = values[values.len() - 1].wrapping_add(delta);
154        values.push(value);
155        offset += consumed;
156    }
157
158    Ok(values)
159}
160
161// ---------------------------------------------------------------------------
162// Streaming encoder / decoder types
163// ---------------------------------------------------------------------------
164
165/// Streaming Delta encoder. Accumulates values and produces compressed
166/// bytes on `finish()`.
167pub struct DeltaEncoder {
168    values: Vec<i64>,
169}
170
171impl DeltaEncoder {
172    pub fn new() -> Self {
173        Self {
174            values: Vec::with_capacity(4096),
175        }
176    }
177
178    pub fn push(&mut self, value: i64) {
179        self.values.push(value);
180    }
181
182    pub fn push_batch(&mut self, values: &[i64]) {
183        self.values.extend_from_slice(values);
184    }
185
186    pub fn count(&self) -> usize {
187        self.values.len()
188    }
189
190    pub fn finish(self) -> Vec<u8> {
191        encode(&self.values)
192    }
193}
194
195impl Default for DeltaEncoder {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201/// Streaming Delta decoder.
202pub struct DeltaDecoder {
203    values: Vec<i64>,
204    pos: usize,
205}
206
207impl DeltaDecoder {
208    pub fn new(data: &[u8]) -> Result<Self, CodecError> {
209        let values = decode(data)?;
210        Ok(Self { values, pos: 0 })
211    }
212
213    pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
214        decode(data)
215    }
216
217    pub fn next_value(&mut self) -> Option<i64> {
218        if self.pos < self.values.len() {
219            let v = self.values[self.pos];
220            self.pos += 1;
221            Some(v)
222        } else {
223            None
224        }
225    }
226
227    pub fn remaining(&self) -> usize {
228        self.values.len() - self.pos
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn zigzag_roundtrip() {
238        for v in [0i64, 1, -1, 2, -2, 63, -63, 127, -128, i64::MAX, i64::MIN] {
239            assert_eq!(zigzag_decode(zigzag_encode(v)), v, "zigzag failed for {v}");
240        }
241    }
242
243    #[test]
244    fn varint_roundtrip() {
245        for v in [0u64, 1, 127, 128, 255, 16383, 16384, u64::MAX / 2, u64::MAX] {
246            let mut buf = Vec::new();
247            write_varint(&mut buf, v);
248            let (decoded, consumed) = read_varint(&buf).unwrap();
249            assert_eq!(decoded, v, "varint failed for {v}");
250            assert_eq!(consumed, buf.len());
251        }
252    }
253
254    #[test]
255    fn empty_roundtrip() {
256        let encoded = encode(&[]);
257        let decoded = decode(&encoded).unwrap();
258        assert!(decoded.is_empty());
259    }
260
261    #[test]
262    fn single_value() {
263        let encoded = encode(&[42i64]);
264        let decoded = decode(&encoded).unwrap();
265        assert_eq!(decoded, vec![42i64]);
266        assert_eq!(encoded.len(), 12); // 4 + 8
267    }
268
269    #[test]
270    fn monotonic_counter() {
271        // Bytes sent: monotonically increasing by ~1000 each step.
272        let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
273        let encoded = encode(&values);
274        let decoded = decode(&encoded).unwrap();
275        assert_eq!(decoded, values);
276
277        // All deltas are exactly 1000 → zigzag(1000) = 2000 → 2 bytes each.
278        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
279        assert!(
280            bytes_per_sample < 3.0,
281            "monotonic counter should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
282        );
283    }
284
285    #[test]
286    fn counter_with_small_increments() {
287        // Request count: increment by 1 each step.
288        let values: Vec<i64> = (0..10_000).collect();
289        let encoded = encode(&values);
290        let decoded = decode(&encoded).unwrap();
291        assert_eq!(decoded, values);
292
293        // Delta = 1 → zigzag(1) = 2 → 1 byte each.
294        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
295        assert!(
296            bytes_per_sample < 2.0,
297            "unit-increment counter should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
298        );
299    }
300
301    #[test]
302    fn counter_reset() {
303        // Counter with a reset (wrap-around): monotonic then drops to 0.
304        let mut values: Vec<i64> = (0..500).map(|i| i * 100).collect();
305        values.push(0); // reset
306        values.extend((1..500).map(|i| i * 100));
307
308        let encoded = encode(&values);
309        let decoded = decode(&encoded).unwrap();
310        assert_eq!(decoded, values);
311    }
312
313    #[test]
314    fn non_monotonic_gauge() {
315        // CPU gauge: fluctuates around a value.
316        let mut values = Vec::with_capacity(10_000);
317        let mut val = 50i64;
318        let mut rng: u64 = 12345;
319        for _ in 0..10_000 {
320            values.push(val);
321            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
322            let delta = ((rng >> 33) as i64 % 11) - 5; // -5 to +5
323            val += delta;
324        }
325        let encoded = encode(&values);
326        let decoded = decode(&encoded).unwrap();
327        assert_eq!(decoded, values);
328
329        // Small deltas → 1 byte each → ~2 bytes/sample.
330        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
331        assert!(
332            bytes_per_sample < 3.0,
333            "small-delta gauge should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
334        );
335    }
336
337    #[test]
338    fn negative_values() {
339        let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
340        let encoded = encode(&values);
341        let decoded = decode(&encoded).unwrap();
342        assert_eq!(decoded, values);
343    }
344
345    #[test]
346    fn large_values() {
347        let values: Vec<i64> = vec![i64::MAX, i64::MAX - 1, i64::MAX - 2];
348        let encoded = encode(&values);
349        let decoded = decode(&encoded).unwrap();
350        assert_eq!(decoded, values);
351    }
352
353    #[test]
354    fn boundary_values() {
355        let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX];
356        let encoded = encode(&values);
357        let decoded = decode(&encoded).unwrap();
358        assert_eq!(decoded, values);
359    }
360
361    #[test]
362    fn streaming_encoder_matches_batch() {
363        let values: Vec<i64> = (0..1000).map(|i| i * 7).collect();
364        let batch = encode(&values);
365
366        let mut enc = DeltaEncoder::new();
367        for &v in &values {
368            enc.push(v);
369        }
370        assert_eq!(enc.finish(), batch);
371    }
372
373    #[test]
374    fn streaming_decoder() {
375        let values: Vec<i64> = (0..100).map(|i| i * 10).collect();
376        let encoded = encode(&values);
377        let mut dec = DeltaDecoder::new(&encoded).unwrap();
378
379        for &expected in &values {
380            assert_eq!(dec.next_value(), Some(expected));
381        }
382        assert_eq!(dec.next_value(), None);
383    }
384
385    #[test]
386    fn truncated_input_errors() {
387        assert!(decode(&[]).is_err());
388        assert!(decode(&[1, 0, 0, 0]).is_err()); // count=1, no value
389    }
390
391    #[test]
392    fn compression_vs_raw() {
393        let values: Vec<i64> = (0..100_000).map(|i| i * 1000).collect();
394        let encoded = encode(&values);
395        let raw_size = values.len() * 8;
396        let ratio = raw_size as f64 / encoded.len() as f64;
397        assert!(
398            ratio > 3.0,
399            "expected >3x compression for monotonic counter, got {ratio:.1}x"
400        );
401    }
402}