Skip to main content

nodedb_codec/
double_delta.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! DoubleDelta codec for timestamp columns.
4//!
5//! Timestamps are monotonically increasing with near-constant intervals
6//! (e.g., every 10s). DoubleDelta encodes the difference-of-differences:
7//!
8//! ```text
9//! value[0]              → stored raw (8 bytes)
10//! delta[0] = v[1]-v[0]  → stored raw (8 bytes)
11//! dod[i]   = delta[i] - delta[i-1]  → bit-packed (usually 0 → 1 bit)
12//! ```
13//!
14//! For constant-rate timestamps, all delta-of-deltas are 0, achieving ~1 bit
15//! per sample after the header. 4x better than Gorilla for timestamp columns.
16//!
17//! Wire format:
18//! ```text
19//! [4 bytes] sample count (LE u32)
20//! [8 bytes] first value (LE i64)
21//! [8 bytes] first delta (LE i64)  — only present if count >= 2
22//! [N bytes] bitstream of delta-of-deltas — only present if count >= 3
23//! ```
24//!
25//! DoD bit-packing uses the same bucket scheme as Gorilla timestamps:
26//! - `0`              → dod == 0
27//! - `10` + 7 bits    → dod in [-63, 64]
28//! - `110` + 9 bits   → dod in [-255, 256]
29//! - `1110` + 12 bits → dod in [-2047, 2048]
30//! - `1111` + 64 bits → arbitrary dod
31
32use crate::error::CodecError;
33
34// ---------------------------------------------------------------------------
35// Bit I/O helpers
36// ---------------------------------------------------------------------------
37
38#[derive(Debug)]
39pub(crate) struct BitWriter {
40    buf: Vec<u8>,
41    bit_pos: usize,
42}
43
44impl BitWriter {
45    pub(crate) fn new() -> Self {
46        Self {
47            buf: Vec::with_capacity(1024),
48            bit_pos: 0,
49        }
50    }
51
52    pub(crate) fn write_bit(&mut self, bit: bool) {
53        let byte_idx = self.bit_pos / 8;
54        let bit_idx = 7 - (self.bit_pos % 8);
55        if byte_idx >= self.buf.len() {
56            self.buf.push(0);
57        }
58        if bit {
59            self.buf[byte_idx] |= 1 << bit_idx;
60        }
61        self.bit_pos += 1;
62    }
63
64    pub(crate) fn write_bits(&mut self, value: u64, num_bits: usize) {
65        for i in (0..num_bits).rev() {
66            self.write_bit((value >> i) & 1 == 1);
67        }
68    }
69
70    pub(crate) fn as_bytes(&self) -> &[u8] {
71        &self.buf
72    }
73
74    pub(crate) fn bit_len(&self) -> usize {
75        self.bit_pos
76    }
77}
78
79pub(crate) struct BitReader<'a> {
80    buf: &'a [u8],
81    bit_pos: usize,
82}
83
84impl<'a> BitReader<'a> {
85    pub(crate) fn new(buf: &'a [u8]) -> Self {
86        Self { buf, bit_pos: 0 }
87    }
88
89    pub(crate) fn read_bit(&mut self) -> Result<bool, CodecError> {
90        let byte_idx = self.bit_pos / 8;
91        if byte_idx >= self.buf.len() {
92            return Err(CodecError::Truncated {
93                expected: byte_idx + 1,
94                actual: self.buf.len(),
95            });
96        }
97        let bit_idx = 7 - (self.bit_pos % 8);
98        let bit = (self.buf[byte_idx] >> bit_idx) & 1 == 1;
99        self.bit_pos += 1;
100        Ok(bit)
101    }
102
103    pub(crate) fn read_bits(&mut self, num_bits: usize) -> Result<u64, CodecError> {
104        let mut value = 0u64;
105        for _ in 0..num_bits {
106            value = (value << 1) | u64::from(self.read_bit()?);
107        }
108        Ok(value)
109    }
110}
111
112// ---------------------------------------------------------------------------
113// Public encode / decode API
114// ---------------------------------------------------------------------------
115
116/// Encode a slice of i64 values using DoubleDelta compression.
117pub fn encode(values: &[i64]) -> Vec<u8> {
118    let count = values.len() as u32;
119    let mut out = Vec::with_capacity(20 + values.len() / 4);
120
121    out.extend_from_slice(&count.to_le_bytes());
122
123    if values.is_empty() {
124        return out;
125    }
126
127    out.extend_from_slice(&values[0].to_le_bytes());
128
129    if values.len() == 1 {
130        return out;
131    }
132
133    let first_delta = values[1].wrapping_sub(values[0]);
134    out.extend_from_slice(&first_delta.to_le_bytes());
135
136    if values.len() == 2 {
137        return out;
138    }
139
140    let mut bs = BitWriter::new();
141    let mut prev_delta = first_delta;
142
143    for i in 2..values.len() {
144        let delta = values[i].wrapping_sub(values[i - 1]);
145        let dod = delta.wrapping_sub(prev_delta);
146        encode_dod(&mut bs, dod);
147        prev_delta = delta;
148    }
149
150    out.extend_from_slice(bs.as_bytes());
151    out
152}
153
154/// Decode DoubleDelta-compressed bytes back to i64 values.
155pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
156    if data.len() < 4 {
157        return Err(CodecError::Truncated {
158            expected: 4,
159            actual: data.len(),
160        });
161    }
162
163    let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
164        detail: "invalid header".into(),
165    })?) as usize;
166
167    if count == 0 {
168        return Ok(Vec::new());
169    }
170
171    if data.len() < 12 {
172        return Err(CodecError::Truncated {
173            expected: 12,
174            actual: data.len(),
175        });
176    }
177
178    let first_value =
179        i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
180            detail: "invalid first value".into(),
181        })?);
182
183    let mut values = Vec::with_capacity(count);
184    values.push(first_value);
185
186    if count == 1 {
187        return Ok(values);
188    }
189
190    if data.len() < 20 {
191        return Err(CodecError::Truncated {
192            expected: 20,
193            actual: data.len(),
194        });
195    }
196
197    let first_delta =
198        i64::from_le_bytes(data[12..20].try_into().map_err(|_| CodecError::Corrupt {
199            detail: "invalid first delta".into(),
200        })?);
201    values.push(first_value.wrapping_add(first_delta));
202
203    if count == 2 {
204        return Ok(values);
205    }
206
207    let mut reader = BitReader::new(&data[20..]);
208    let mut prev_delta = first_delta;
209
210    for _ in 2..count {
211        let dod = decode_dod(&mut reader)?;
212        let delta = prev_delta.wrapping_add(dod);
213        let value = values[values.len() - 1].wrapping_add(delta);
214        values.push(value);
215        prev_delta = delta;
216    }
217
218    Ok(values)
219}
220
221// ---------------------------------------------------------------------------
222// DoD bit encoding / decoding
223// ---------------------------------------------------------------------------
224
225fn encode_dod(bs: &mut BitWriter, dod: i64) {
226    if dod == 0 {
227        bs.write_bit(false);
228    } else if (-64..=63).contains(&dod) {
229        bs.write_bits(0b10, 2);
230        bs.write_bits((dod as u64) & 0x7F, 7);
231    } else if (-256..=255).contains(&dod) {
232        bs.write_bits(0b110, 3);
233        bs.write_bits((dod as u64) & 0x1FF, 9);
234    } else if (-2048..=2047).contains(&dod) {
235        bs.write_bits(0b1110, 4);
236        bs.write_bits((dod as u64) & 0xFFF, 12);
237    } else {
238        bs.write_bits(0b1111, 4);
239        bs.write_bits(dod as u64, 64);
240    }
241}
242
243fn decode_dod(reader: &mut BitReader<'_>) -> Result<i64, CodecError> {
244    let bit = reader.read_bit()?;
245    if !bit {
246        return Ok(0);
247    }
248
249    let bit2 = reader.read_bit()?;
250    if !bit2 {
251        let raw = reader.read_bits(7)? as i64;
252        return Ok(sign_extend(raw, 7));
253    }
254
255    let bit3 = reader.read_bit()?;
256    if !bit3 {
257        let raw = reader.read_bits(9)? as i64;
258        return Ok(sign_extend(raw, 9));
259    }
260
261    let bit4 = reader.read_bit()?;
262    if !bit4 {
263        let raw = reader.read_bits(12)? as i64;
264        return Ok(sign_extend(raw, 12));
265    }
266
267    let raw = reader.read_bits(64)?;
268    Ok(raw as i64)
269}
270
271fn sign_extend(value: i64, bits: u32) -> i64 {
272    let shift = 64 - bits;
273    (value << shift) >> shift
274}
275
276// ---------------------------------------------------------------------------
277// Re-export types for lib.rs consistency
278// ---------------------------------------------------------------------------
279
280/// Streaming DoubleDelta encoder. Accumulates values and produces
281/// compressed bytes on `finish()`.
282pub struct DoubleDeltaEncoder {
283    values: Vec<i64>,
284}
285
286impl DoubleDeltaEncoder {
287    pub fn new() -> Self {
288        Self {
289            values: Vec::with_capacity(4096),
290        }
291    }
292
293    /// Append a single i64 value.
294    pub fn push(&mut self, value: i64) {
295        self.values.push(value);
296    }
297
298    /// Append a batch of i64 values.
299    pub fn push_batch(&mut self, values: &[i64]) {
300        self.values.extend_from_slice(values);
301    }
302
303    /// Number of values encoded so far.
304    pub fn count(&self) -> usize {
305        self.values.len()
306    }
307
308    /// Finish encoding and return compressed bytes.
309    pub fn finish(self) -> Vec<u8> {
310        encode(&self.values)
311    }
312}
313
314impl Default for DoubleDeltaEncoder {
315    fn default() -> Self {
316        Self::new()
317    }
318}
319
320/// Streaming DoubleDelta decoder. Wraps the batch `decode()` function.
321pub struct DoubleDeltaDecoder {
322    values: Vec<i64>,
323    pos: usize,
324}
325
326impl DoubleDeltaDecoder {
327    /// Create a decoder from compressed bytes.
328    pub fn new(data: &[u8]) -> Result<Self, CodecError> {
329        let values = decode(data)?;
330        Ok(Self { values, pos: 0 })
331    }
332
333    /// Decode all values at once.
334    pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
335        decode(data)
336    }
337
338    /// Next value, or None if exhausted.
339    pub fn next_value(&mut self) -> Option<i64> {
340        if self.pos < self.values.len() {
341            let v = self.values[self.pos];
342            self.pos += 1;
343            Some(v)
344        } else {
345            None
346        }
347    }
348
349    /// Remaining value count.
350    pub fn remaining(&self) -> usize {
351        self.values.len() - self.pos
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn empty_roundtrip() {
361        let encoded = encode(&[]);
362        let decoded = decode(&encoded).unwrap();
363        assert!(decoded.is_empty());
364    }
365
366    #[test]
367    fn single_value() {
368        let encoded = encode(&[1_700_000_000_000i64]);
369        let decoded = decode(&encoded).unwrap();
370        assert_eq!(decoded, vec![1_700_000_000_000i64]);
371        assert_eq!(encoded.len(), 12);
372    }
373
374    #[test]
375    fn two_values() {
376        let values = vec![1000i64, 2000];
377        let encoded = encode(&values);
378        let decoded = decode(&encoded).unwrap();
379        assert_eq!(decoded, values);
380        assert_eq!(encoded.len(), 20);
381    }
382
383    #[test]
384    fn constant_rate_timestamps() {
385        let values: Vec<i64> = (0..10_000)
386            .map(|i| 1_700_000_000_000 + i * 10_000)
387            .collect();
388        let encoded = encode(&values);
389        let decoded = decode(&encoded).unwrap();
390        assert_eq!(decoded, values);
391
392        let bits_per_sample = (encoded.len() as f64 * 8.0) / values.len() as f64;
393        assert!(
394            bits_per_sample < 2.0,
395            "constant-rate should compress to ~1 bit/sample, got {bits_per_sample:.1}"
396        );
397    }
398
399    #[test]
400    fn monotonic_with_jitter() {
401        let mut values = Vec::with_capacity(10_000);
402        let mut ts = 1_700_000_000_000i64;
403        let mut rng: u64 = 42;
404        for _ in 0..10_000 {
405            values.push(ts);
406            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
407            let jitter = ((rng >> 33) as i64 % 101) - 50;
408            ts += 10_000 + jitter;
409        }
410        let encoded = encode(&values);
411        let decoded = decode(&encoded).unwrap();
412        assert_eq!(decoded, values);
413
414        let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
415        assert!(
416            bytes_per_sample < 2.0,
417            "jittered timestamps should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
418        );
419    }
420
421    #[test]
422    fn non_monotonic_values() {
423        let values: Vec<i64> = vec![100, 50, 200, 10, 300, 5, 1000, -500, 0, 42];
424        let encoded = encode(&values);
425        let decoded = decode(&encoded).unwrap();
426        assert_eq!(decoded, values);
427    }
428
429    #[test]
430    fn negative_values() {
431        let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
432        let encoded = encode(&values);
433        let decoded = decode(&encoded).unwrap();
434        assert_eq!(decoded, values);
435    }
436
437    #[test]
438    fn large_deltas() {
439        let values: Vec<i64> = vec![0, i64::MAX / 2, i64::MIN / 2, i64::MAX / 4, 0];
440        let encoded = encode(&values);
441        let decoded = decode(&encoded).unwrap();
442        assert_eq!(decoded, values);
443    }
444
445    #[test]
446    fn boundary_values() {
447        let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX, 0, i64::MIN];
448        let encoded = encode(&values);
449        let decoded = decode(&encoded).unwrap();
450        assert_eq!(decoded, values);
451    }
452
453    #[test]
454    fn compression_better_than_raw_for_constant_rate() {
455        let values: Vec<i64> = (0..100_000)
456            .map(|i| 1_700_000_000_000 + i * 10_000)
457            .collect();
458        let encoded = encode(&values);
459        let raw_size = values.len() * 8;
460        let ratio = raw_size as f64 / encoded.len() as f64;
461        assert!(
462            ratio > 5.0,
463            "expected >5x compression for constant-rate, got {ratio:.1}x"
464        );
465    }
466
467    #[test]
468    fn streaming_encoder_matches_batch() {
469        let values: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 100).collect();
470        let batch_encoded = encode(&values);
471
472        let mut enc = DoubleDeltaEncoder::new();
473        for &v in &values {
474            enc.push(v);
475        }
476        let stream_encoded = enc.finish();
477
478        assert_eq!(batch_encoded, stream_encoded);
479    }
480
481    #[test]
482    fn streaming_decoder() {
483        let values: Vec<i64> = (0..100).map(|i| 5000 + i * 10).collect();
484        let encoded = encode(&values);
485        let mut dec = DoubleDeltaDecoder::new(&encoded).unwrap();
486
487        for &expected in &values {
488            assert_eq!(dec.next_value(), Some(expected));
489        }
490        assert_eq!(dec.next_value(), None);
491        assert_eq!(dec.remaining(), 0);
492    }
493
494    #[test]
495    fn truncated_input_errors() {
496        assert!(decode(&[]).is_err());
497        assert!(decode(&[1, 0, 0, 0]).is_err());
498        assert!(decode(&[2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).is_err());
499    }
500}