Skip to main content

nodedb_codec/
double_delta.rs

1//! DoubleDelta codec for timestamp columns.
2//!
3//! Timestamps are monotonically increasing with near-constant intervals
4//! (e.g., every 10s). DoubleDelta encodes the difference-of-differences:
5//!
6//! ```text
7//! value[0]              → stored raw (8 bytes)
8//! delta[0] = v[1]-v[0]  → stored raw (8 bytes)
9//! dod[i]   = delta[i] - delta[i-1]  → bit-packed (usually 0 → 1 bit)
10//! ```
11//!
12//! For constant-rate timestamps, all delta-of-deltas are 0, achieving ~1 bit
13//! per sample after the header. 4x better than Gorilla for timestamp columns.
14//!
15//! Wire format:
16//! ```text
17//! [4 bytes] sample count (LE u32)
18//! [8 bytes] first value (LE i64)
19//! [8 bytes] first delta (LE i64)  — only present if count >= 2
20//! [N bytes] bitstream of delta-of-deltas — only present if count >= 3
21//! ```
22//!
23//! DoD bit-packing uses the same bucket scheme as Gorilla timestamps:
24//! - `0`              → dod == 0
25//! - `10` + 7 bits    → dod in [-63, 64]
26//! - `110` + 9 bits   → dod in [-255, 256]
27//! - `1110` + 12 bits → dod in [-2047, 2048]
28//! - `1111` + 64 bits → arbitrary dod
29
30use crate::error::CodecError;
31
32// ---------------------------------------------------------------------------
33// Bit I/O helpers
34// ---------------------------------------------------------------------------
35
36#[derive(Debug)]
37pub(crate) struct BitWriter {
38    buf: Vec<u8>,
39    bit_pos: usize,
40}
41
42impl BitWriter {
43    pub(crate) fn new() -> Self {
44        Self {
45            buf: Vec::with_capacity(1024),
46            bit_pos: 0,
47        }
48    }
49
50    pub(crate) fn write_bit(&mut self, bit: bool) {
51        let byte_idx = self.bit_pos / 8;
52        let bit_idx = 7 - (self.bit_pos % 8);
53        if byte_idx >= self.buf.len() {
54            self.buf.push(0);
55        }
56        if bit {
57            self.buf[byte_idx] |= 1 << bit_idx;
58        }
59        self.bit_pos += 1;
60    }
61
62    pub(crate) fn write_bits(&mut self, value: u64, num_bits: usize) {
63        for i in (0..num_bits).rev() {
64            self.write_bit((value >> i) & 1 == 1);
65        }
66    }
67
68    pub(crate) fn as_bytes(&self) -> &[u8] {
69        &self.buf
70    }
71
72    pub(crate) fn bit_len(&self) -> usize {
73        self.bit_pos
74    }
75}
76
77pub(crate) struct BitReader<'a> {
78    buf: &'a [u8],
79    bit_pos: usize,
80}
81
82impl<'a> BitReader<'a> {
83    pub(crate) fn new(buf: &'a [u8]) -> Self {
84        Self { buf, bit_pos: 0 }
85    }
86
87    pub(crate) fn read_bit(&mut self) -> Result<bool, CodecError> {
88        let byte_idx = self.bit_pos / 8;
89        if byte_idx >= self.buf.len() {
90            return Err(CodecError::Truncated {
91                expected: byte_idx + 1,
92                actual: self.buf.len(),
93            });
94        }
95        let bit_idx = 7 - (self.bit_pos % 8);
96        let bit = (self.buf[byte_idx] >> bit_idx) & 1 == 1;
97        self.bit_pos += 1;
98        Ok(bit)
99    }
100
101    pub(crate) fn read_bits(&mut self, num_bits: usize) -> Result<u64, CodecError> {
102        let mut value = 0u64;
103        for _ in 0..num_bits {
104            value = (value << 1) | u64::from(self.read_bit()?);
105        }
106        Ok(value)
107    }
108}
109
110// ---------------------------------------------------------------------------
111// Public encode / decode API
112// ---------------------------------------------------------------------------
113
114/// Encode a slice of i64 values using DoubleDelta compression.
115pub fn encode(values: &[i64]) -> Vec<u8> {
116    let count = values.len() as u32;
117    let mut out = Vec::with_capacity(20 + values.len() / 4);
118
119    out.extend_from_slice(&count.to_le_bytes());
120
121    if values.is_empty() {
122        return out;
123    }
124
125    out.extend_from_slice(&values[0].to_le_bytes());
126
127    if values.len() == 1 {
128        return out;
129    }
130
131    let first_delta = values[1].wrapping_sub(values[0]);
132    out.extend_from_slice(&first_delta.to_le_bytes());
133
134    if values.len() == 2 {
135        return out;
136    }
137
138    let mut bs = BitWriter::new();
139    let mut prev_delta = first_delta;
140
141    for i in 2..values.len() {
142        let delta = values[i].wrapping_sub(values[i - 1]);
143        let dod = delta.wrapping_sub(prev_delta);
144        encode_dod(&mut bs, dod);
145        prev_delta = delta;
146    }
147
148    out.extend_from_slice(bs.as_bytes());
149    out
150}
151
152/// Decode DoubleDelta-compressed bytes back to i64 values.
153pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
154    if data.len() < 4 {
155        return Err(CodecError::Truncated {
156            expected: 4,
157            actual: data.len(),
158        });
159    }
160
161    let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
162        detail: "invalid header".into(),
163    })?) as usize;
164
165    if count == 0 {
166        return Ok(Vec::new());
167    }
168
169    if data.len() < 12 {
170        return Err(CodecError::Truncated {
171            expected: 12,
172            actual: data.len(),
173        });
174    }
175
176    let first_value =
177        i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
178            detail: "invalid first value".into(),
179        })?);
180
181    let mut values = Vec::with_capacity(count);
182    values.push(first_value);
183
184    if count == 1 {
185        return Ok(values);
186    }
187
188    if data.len() < 20 {
189        return Err(CodecError::Truncated {
190            expected: 20,
191            actual: data.len(),
192        });
193    }
194
195    let first_delta =
196        i64::from_le_bytes(data[12..20].try_into().map_err(|_| CodecError::Corrupt {
197            detail: "invalid first delta".into(),
198        })?);
199    values.push(first_value.wrapping_add(first_delta));
200
201    if count == 2 {
202        return Ok(values);
203    }
204
205    let mut reader = BitReader::new(&data[20..]);
206    let mut prev_delta = first_delta;
207
208    for _ in 2..count {
209        let dod = decode_dod(&mut reader)?;
210        let delta = prev_delta.wrapping_add(dod);
211        let value = values[values.len() - 1].wrapping_add(delta);
212        values.push(value);
213        prev_delta = delta;
214    }
215
216    Ok(values)
217}
218
219// ---------------------------------------------------------------------------
220// DoD bit encoding / decoding
221// ---------------------------------------------------------------------------
222
223fn encode_dod(bs: &mut BitWriter, dod: i64) {
224    if dod == 0 {
225        bs.write_bit(false);
226    } else if (-64..=63).contains(&dod) {
227        bs.write_bits(0b10, 2);
228        bs.write_bits((dod as u64) & 0x7F, 7);
229    } else if (-256..=255).contains(&dod) {
230        bs.write_bits(0b110, 3);
231        bs.write_bits((dod as u64) & 0x1FF, 9);
232    } else if (-2048..=2047).contains(&dod) {
233        bs.write_bits(0b1110, 4);
234        bs.write_bits((dod as u64) & 0xFFF, 12);
235    } else {
236        bs.write_bits(0b1111, 4);
237        bs.write_bits(dod as u64, 64);
238    }
239}
240
241fn decode_dod(reader: &mut BitReader<'_>) -> Result<i64, CodecError> {
242    let bit = reader.read_bit()?;
243    if !bit {
244        return Ok(0);
245    }
246
247    let bit2 = reader.read_bit()?;
248    if !bit2 {
249        let raw = reader.read_bits(7)? as i64;
250        return Ok(sign_extend(raw, 7));
251    }
252
253    let bit3 = reader.read_bit()?;
254    if !bit3 {
255        let raw = reader.read_bits(9)? as i64;
256        return Ok(sign_extend(raw, 9));
257    }
258
259    let bit4 = reader.read_bit()?;
260    if !bit4 {
261        let raw = reader.read_bits(12)? as i64;
262        return Ok(sign_extend(raw, 12));
263    }
264
265    let raw = reader.read_bits(64)?;
266    Ok(raw as i64)
267}
268
269fn sign_extend(value: i64, bits: u32) -> i64 {
270    let shift = 64 - bits;
271    (value << shift) >> shift
272}
273
274// ---------------------------------------------------------------------------
275// Re-export types for lib.rs consistency
276// ---------------------------------------------------------------------------
277
278/// Streaming DoubleDelta encoder. Accumulates values and produces
279/// compressed bytes on `finish()`.
280pub struct DoubleDeltaEncoder {
281    values: Vec<i64>,
282}
283
284impl DoubleDeltaEncoder {
285    pub fn new() -> Self {
286        Self {
287            values: Vec::with_capacity(4096),
288        }
289    }
290
291    /// Append a single i64 value.
292    pub fn push(&mut self, value: i64) {
293        self.values.push(value);
294    }
295
296    /// Append a batch of i64 values.
297    pub fn push_batch(&mut self, values: &[i64]) {
298        self.values.extend_from_slice(values);
299    }
300
301    /// Number of values encoded so far.
302    pub fn count(&self) -> usize {
303        self.values.len()
304    }
305
306    /// Finish encoding and return compressed bytes.
307    pub fn finish(self) -> Vec<u8> {
308        encode(&self.values)
309    }
310}
311
312impl Default for DoubleDeltaEncoder {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318/// Streaming DoubleDelta decoder. Wraps the batch `decode()` function.
319pub struct DoubleDeltaDecoder {
320    values: Vec<i64>,
321    pos: usize,
322}
323
324impl DoubleDeltaDecoder {
325    /// Create a decoder from compressed bytes.
326    pub fn new(data: &[u8]) -> Result<Self, CodecError> {
327        let values = decode(data)?;
328        Ok(Self { values, pos: 0 })
329    }
330
331    /// Decode all values at once.
332    pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
333        decode(data)
334    }
335
336    /// Next value, or None if exhausted.
337    pub fn next_value(&mut self) -> Option<i64> {
338        if self.pos < self.values.len() {
339            let v = self.values[self.pos];
340            self.pos += 1;
341            Some(v)
342        } else {
343            None
344        }
345    }
346
347    /// Remaining value count.
348    pub fn remaining(&self) -> usize {
349        self.values.len() - self.pos
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    #[test]
358    fn empty_roundtrip() {
359        let encoded = encode(&[]);
360        let decoded = decode(&encoded).unwrap();
361        assert!(decoded.is_empty());
362    }
363
364    #[test]
365    fn single_value() {
366        let encoded = encode(&[1_700_000_000_000i64]);
367        let decoded = decode(&encoded).unwrap();
368        assert_eq!(decoded, vec![1_700_000_000_000i64]);
369        assert_eq!(encoded.len(), 12);
370    }
371
372    #[test]
373    fn two_values() {
374        let values = vec![1000i64, 2000];
375        let encoded = encode(&values);
376        let decoded = decode(&encoded).unwrap();
377        assert_eq!(decoded, values);
378        assert_eq!(encoded.len(), 20);
379    }
380
381    #[test]
382    fn constant_rate_timestamps() {
383        let values: Vec<i64> = (0..10_000)
384            .map(|i| 1_700_000_000_000 + i * 10_000)
385            .collect();
386        let encoded = encode(&values);
387        let decoded = decode(&encoded).unwrap();
388        assert_eq!(decoded, values);
389
390        let bits_per_sample = (encoded.len() as f64 * 8.0) / values.len() as f64;
391        assert!(
392            bits_per_sample < 2.0,
393            "constant-rate should compress to ~1 bit/sample, got {bits_per_sample:.1}"
394        );
395    }
396
397    #[test]
398    fn monotonic_with_jitter() {
399        let mut values = Vec::with_capacity(10_000);
400        let mut ts = 1_700_000_000_000i64;
401        let mut rng: u64 = 42;
402        for _ in 0..10_000 {
403            values.push(ts);
404            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
405            let jitter = ((rng >> 33) as i64 % 101) - 50;
406            ts += 10_000 + jitter;
407        }
408        let encoded = encode(&values);
409        let decoded = decode(&encoded).unwrap();
410        assert_eq!(decoded, values);
411
412        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
413        assert!(
414            bytes_per_sample < 2.0,
415            "jittered timestamps should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
416        );
417    }
418
419    #[test]
420    fn non_monotonic_values() {
421        let values: Vec<i64> = vec![100, 50, 200, 10, 300, 5, 1000, -500, 0, 42];
422        let encoded = encode(&values);
423        let decoded = decode(&encoded).unwrap();
424        assert_eq!(decoded, values);
425    }
426
427    #[test]
428    fn negative_values() {
429        let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
430        let encoded = encode(&values);
431        let decoded = decode(&encoded).unwrap();
432        assert_eq!(decoded, values);
433    }
434
435    #[test]
436    fn large_deltas() {
437        let values: Vec<i64> = vec![0, i64::MAX / 2, i64::MIN / 2, i64::MAX / 4, 0];
438        let encoded = encode(&values);
439        let decoded = decode(&encoded).unwrap();
440        assert_eq!(decoded, values);
441    }
442
443    #[test]
444    fn boundary_values() {
445        let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX, 0, i64::MIN];
446        let encoded = encode(&values);
447        let decoded = decode(&encoded).unwrap();
448        assert_eq!(decoded, values);
449    }
450
451    #[test]
452    fn compression_better_than_raw_for_constant_rate() {
453        let values: Vec<i64> = (0..100_000)
454            .map(|i| 1_700_000_000_000 + i * 10_000)
455            .collect();
456        let encoded = encode(&values);
457        let raw_size = values.len() * 8;
458        let ratio = raw_size as f64 / encoded.len() as f64;
459        assert!(
460            ratio > 5.0,
461            "expected >5x compression for constant-rate, got {ratio:.1}x"
462        );
463    }
464
465    #[test]
466    fn streaming_encoder_matches_batch() {
467        let values: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 100).collect();
468        let batch_encoded = encode(&values);
469
470        let mut enc = DoubleDeltaEncoder::new();
471        for &v in &values {
472            enc.push(v);
473        }
474        let stream_encoded = enc.finish();
475
476        assert_eq!(batch_encoded, stream_encoded);
477    }
478
479    #[test]
480    fn streaming_decoder() {
481        let values: Vec<i64> = (0..100).map(|i| 5000 + i * 10).collect();
482        let encoded = encode(&values);
483        let mut dec = DoubleDeltaDecoder::new(&encoded).unwrap();
484
485        for &expected in &values {
486            assert_eq!(dec.next_value(), Some(expected));
487        }
488        assert_eq!(dec.next_value(), None);
489        assert_eq!(dec.remaining(), 0);
490    }
491
492    #[test]
493    fn truncated_input_errors() {
494        assert!(decode(&[]).is_err());
495        assert!(decode(&[1, 0, 0, 0]).is_err());
496        assert!(decode(&[2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).is_err());
497    }
498}