Skip to main content

nodedb_codec/
delta.rs

1//! Delta codec for monotonic counter columns.
2//!
3//! Monotonic counters (bytes_sent, request_count) have small positive deltas
4//! that compress well with simple delta encoding + varint packing.
5//!
6//! Wire format:
7//! ```text
8//! [4 bytes] sample count (LE u32)
9//! [8 bytes] first value (LE i64)
10//! [N bytes] varint-encoded deltas (ZigZag + LEB128)
11//! ```
12//!
13//! Varint encoding uses ZigZag to handle negative deltas (counter resets)
14//! efficiently: small absolute values → 1-2 bytes regardless of sign.
15//!
16//! Compression: monotonic counters with small increments → ~1-2 bytes/sample.
17//! Non-monotonic data → ~2-4 bytes/sample (still better than raw 8 bytes).
18
19use crate::error::CodecError;
20
21// ---------------------------------------------------------------------------
22// ZigZag + LEB128 varint encoding
23// ---------------------------------------------------------------------------
24
25/// ZigZag-encode a signed i64 into an unsigned u64.
26///
27/// Maps signed integers to unsigned so small absolute values have small
28/// representations: 0→0, -1→1, 1→2, -2→3, 2→4, ...
29#[inline]
30fn zigzag_encode(v: i64) -> u64 {
31    ((v << 1) ^ (v >> 63)) as u64
32}
33
34/// ZigZag-decode an unsigned u64 back to signed i64.
35#[inline]
36fn zigzag_decode(v: u64) -> i64 {
37    ((v >> 1) as i64) ^ -((v & 1) as i64)
38}
39
40/// Write a varint (LEB128-encoded u64) to a buffer.
41fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
42    loop {
43        let mut byte = (value & 0x7F) as u8;
44        value >>= 7;
45        if value != 0 {
46            byte |= 0x80;
47        }
48        buf.push(byte);
49        if value == 0 {
50            break;
51        }
52    }
53}
54
55/// Read a varint (LEB128-encoded u64) from a byte slice.
56///
57/// Returns `(value, bytes_consumed)`.
58fn read_varint(data: &[u8]) -> Result<(u64, usize), CodecError> {
59    let mut value: u64 = 0;
60    let mut shift: u32 = 0;
61
62    for (i, &byte) in data.iter().enumerate() {
63        if shift >= 70 {
64            return Err(CodecError::Corrupt {
65                detail: "varint too long (>10 bytes)".into(),
66            });
67        }
68
69        value |= ((byte & 0x7F) as u64) << shift;
70        shift += 7;
71
72        if byte & 0x80 == 0 {
73            return Ok((value, i + 1));
74        }
75    }
76
77    Err(CodecError::Truncated {
78        expected: data.len() + 1,
79        actual: data.len(),
80    })
81}
82
83// ---------------------------------------------------------------------------
84// Public encode / decode API
85// ---------------------------------------------------------------------------
86
87/// Encode a slice of i64 values using Delta + ZigZag-varint compression.
88pub fn encode(values: &[i64]) -> Vec<u8> {
89    let count = values.len() as u32;
90    // Estimate: header(4) + first_value(8) + ~2 bytes per delta.
91    let mut out = Vec::with_capacity(12 + values.len() * 2);
92
93    out.extend_from_slice(&count.to_le_bytes());
94
95    if values.is_empty() {
96        return out;
97    }
98
99    out.extend_from_slice(&values[0].to_le_bytes());
100
101    for i in 1..values.len() {
102        let delta = values[i].wrapping_sub(values[i - 1]);
103        write_varint(&mut out, zigzag_encode(delta));
104    }
105
106    out
107}
108
109/// Decode Delta-compressed bytes back to i64 values.
110pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
111    if data.len() < 4 {
112        return Err(CodecError::Truncated {
113            expected: 4,
114            actual: data.len(),
115        });
116    }
117
118    let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
119        detail: "invalid header".into(),
120    })?) as usize;
121
122    if count == 0 {
123        return Ok(Vec::new());
124    }
125
126    if data.len() < 12 {
127        return Err(CodecError::Truncated {
128            expected: 12,
129            actual: data.len(),
130        });
131    }
132
133    let first_value =
134        i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
135            detail: "invalid first value".into(),
136        })?);
137
138    let mut values = Vec::with_capacity(count);
139    values.push(first_value);
140
141    let mut offset = 12;
142    for _ in 1..count {
143        if offset >= data.len() {
144            return Err(CodecError::Truncated {
145                expected: offset + 1,
146                actual: data.len(),
147            });
148        }
149        let (encoded_delta, consumed) = read_varint(&data[offset..])?;
150        let delta = zigzag_decode(encoded_delta);
151        let value = values[values.len() - 1].wrapping_add(delta);
152        values.push(value);
153        offset += consumed;
154    }
155
156    Ok(values)
157}
158
159// ---------------------------------------------------------------------------
160// Streaming encoder / decoder types
161// ---------------------------------------------------------------------------
162
163/// Streaming Delta encoder. Accumulates values and produces compressed
164/// bytes on `finish()`.
165pub struct DeltaEncoder {
166    values: Vec<i64>,
167}
168
169impl DeltaEncoder {
170    pub fn new() -> Self {
171        Self {
172            values: Vec::with_capacity(4096),
173        }
174    }
175
176    pub fn push(&mut self, value: i64) {
177        self.values.push(value);
178    }
179
180    pub fn push_batch(&mut self, values: &[i64]) {
181        self.values.extend_from_slice(values);
182    }
183
184    pub fn count(&self) -> usize {
185        self.values.len()
186    }
187
188    pub fn finish(self) -> Vec<u8> {
189        encode(&self.values)
190    }
191}
192
193impl Default for DeltaEncoder {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199/// Streaming Delta decoder.
200pub struct DeltaDecoder {
201    values: Vec<i64>,
202    pos: usize,
203}
204
205impl DeltaDecoder {
206    pub fn new(data: &[u8]) -> Result<Self, CodecError> {
207        let values = decode(data)?;
208        Ok(Self { values, pos: 0 })
209    }
210
211    pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
212        decode(data)
213    }
214
215    pub fn next_value(&mut self) -> Option<i64> {
216        if self.pos < self.values.len() {
217            let v = self.values[self.pos];
218            self.pos += 1;
219            Some(v)
220        } else {
221            None
222        }
223    }
224
225    pub fn remaining(&self) -> usize {
226        self.values.len() - self.pos
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn zigzag_roundtrip() {
236        for v in [0i64, 1, -1, 2, -2, 63, -63, 127, -128, i64::MAX, i64::MIN] {
237            assert_eq!(zigzag_decode(zigzag_encode(v)), v, "zigzag failed for {v}");
238        }
239    }
240
241    #[test]
242    fn varint_roundtrip() {
243        for v in [0u64, 1, 127, 128, 255, 16383, 16384, u64::MAX / 2, u64::MAX] {
244            let mut buf = Vec::new();
245            write_varint(&mut buf, v);
246            let (decoded, consumed) = read_varint(&buf).unwrap();
247            assert_eq!(decoded, v, "varint failed for {v}");
248            assert_eq!(consumed, buf.len());
249        }
250    }
251
252    #[test]
253    fn empty_roundtrip() {
254        let encoded = encode(&[]);
255        let decoded = decode(&encoded).unwrap();
256        assert!(decoded.is_empty());
257    }
258
259    #[test]
260    fn single_value() {
261        let encoded = encode(&[42i64]);
262        let decoded = decode(&encoded).unwrap();
263        assert_eq!(decoded, vec![42i64]);
264        assert_eq!(encoded.len(), 12); // 4 + 8
265    }
266
267    #[test]
268    fn monotonic_counter() {
269        // Bytes sent: monotonically increasing by ~1000 each step.
270        let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
271        let encoded = encode(&values);
272        let decoded = decode(&encoded).unwrap();
273        assert_eq!(decoded, values);
274
275        // All deltas are exactly 1000 → zigzag(1000) = 2000 → 2 bytes each.
276        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
277        assert!(
278            bytes_per_sample < 3.0,
279            "monotonic counter should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
280        );
281    }
282
283    #[test]
284    fn counter_with_small_increments() {
285        // Request count: increment by 1 each step.
286        let values: Vec<i64> = (0..10_000).collect();
287        let encoded = encode(&values);
288        let decoded = decode(&encoded).unwrap();
289        assert_eq!(decoded, values);
290
291        // Delta = 1 → zigzag(1) = 2 → 1 byte each.
292        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
293        assert!(
294            bytes_per_sample < 2.0,
295            "unit-increment counter should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
296        );
297    }
298
299    #[test]
300    fn counter_reset() {
301        // Counter with a reset (wrap-around): monotonic then drops to 0.
302        let mut values: Vec<i64> = (0..500).map(|i| i * 100).collect();
303        values.push(0); // reset
304        values.extend((1..500).map(|i| i * 100));
305
306        let encoded = encode(&values);
307        let decoded = decode(&encoded).unwrap();
308        assert_eq!(decoded, values);
309    }
310
311    #[test]
312    fn non_monotonic_gauge() {
313        // CPU gauge: fluctuates around a value.
314        let mut values = Vec::with_capacity(10_000);
315        let mut val = 50i64;
316        let mut rng: u64 = 12345;
317        for _ in 0..10_000 {
318            values.push(val);
319            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
320            let delta = ((rng >> 33) as i64 % 11) - 5; // -5 to +5
321            val += delta;
322        }
323        let encoded = encode(&values);
324        let decoded = decode(&encoded).unwrap();
325        assert_eq!(decoded, values);
326
327        // Small deltas → 1 byte each → ~2 bytes/sample.
328        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
329        assert!(
330            bytes_per_sample < 3.0,
331            "small-delta gauge should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
332        );
333    }
334
335    #[test]
336    fn negative_values() {
337        let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
338        let encoded = encode(&values);
339        let decoded = decode(&encoded).unwrap();
340        assert_eq!(decoded, values);
341    }
342
343    #[test]
344    fn large_values() {
345        let values: Vec<i64> = vec![i64::MAX, i64::MAX - 1, i64::MAX - 2];
346        let encoded = encode(&values);
347        let decoded = decode(&encoded).unwrap();
348        assert_eq!(decoded, values);
349    }
350
351    #[test]
352    fn boundary_values() {
353        let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX];
354        let encoded = encode(&values);
355        let decoded = decode(&encoded).unwrap();
356        assert_eq!(decoded, values);
357    }
358
359    #[test]
360    fn streaming_encoder_matches_batch() {
361        let values: Vec<i64> = (0..1000).map(|i| i * 7).collect();
362        let batch = encode(&values);
363
364        let mut enc = DeltaEncoder::new();
365        for &v in &values {
366            enc.push(v);
367        }
368        assert_eq!(enc.finish(), batch);
369    }
370
371    #[test]
372    fn streaming_decoder() {
373        let values: Vec<i64> = (0..100).map(|i| i * 10).collect();
374        let encoded = encode(&values);
375        let mut dec = DeltaDecoder::new(&encoded).unwrap();
376
377        for &expected in &values {
378            assert_eq!(dec.next_value(), Some(expected));
379        }
380        assert_eq!(dec.next_value(), None);
381    }
382
383    #[test]
384    fn truncated_input_errors() {
385        assert!(decode(&[]).is_err());
386        assert!(decode(&[1, 0, 0, 0]).is_err()); // count=1, no value
387    }
388
389    #[test]
390    fn compression_vs_raw() {
391        let values: Vec<i64> = (0..100_000).map(|i| i * 1000).collect();
392        let encoded = encode(&values);
393        let raw_size = values.len() * 8;
394        let ratio = raw_size as f64 / encoded.len() as f64;
395        assert!(
396            ratio > 3.0,
397            "expected >3x compression for monotonic counter, got {ratio:.1}x"
398        );
399    }
400}