Skip to main content

nodedb_codec/
gorilla.rs

1//! Gorilla XOR encoding for floating-point timeseries metrics.
2//!
3//! Implements the Facebook Gorilla paper's XOR-based compression for
4//! double-precision floating-point values. Achieves ~1.5 bytes per
5//! 16-byte (timestamp + value) sample by exploiting temporal locality
6//! in metric streams.
7//!
8//! Also usable for non-monotonic i64 values by casting through f64 bits.
9//!
10//! Wire format:
11//! ```text
12//! [4 bytes] sample count (LE u32)
13//! [N bytes] bitstream: first sample raw (64+64 bits), then delta-of-delta
14//!           timestamps + XOR-compressed values
15//! ```
16//!
17//! Reference: "Gorilla: A Fast, Scalable, In-Memory Time Series Database"
18//! (Pelkonen et al., VLDB 2015)
19
20use crate::double_delta::{BitReader, BitWriter};
21use crate::error::CodecError;
22
23// ---------------------------------------------------------------------------
24// Encoder
25// ---------------------------------------------------------------------------
26
27/// Gorilla XOR encoder for (timestamp, f64) sample streams.
28///
29/// Timestamps use delta-of-delta encoding. Values use XOR with
30/// leading/trailing zero compression.
31#[derive(Debug)]
32pub struct GorillaEncoder {
33    buf: BitWriter,
34    prev_ts: i64,
35    prev_delta: i64,
36    prev_value: u64,
37    prev_leading: u8,
38    prev_trailing: u8,
39    count: u64,
40}
41
42impl GorillaEncoder {
43    pub fn new() -> Self {
44        Self {
45            buf: BitWriter::new(),
46            prev_ts: 0,
47            prev_delta: 0,
48            prev_value: 0,
49            prev_leading: u8::MAX,
50            prev_trailing: 0,
51            count: 0,
52        }
53    }
54
55    /// Encode a (timestamp_ms, value) sample.
56    pub fn encode(&mut self, timestamp_ms: i64, value: f64) {
57        let value_bits = value.to_bits();
58
59        if self.count == 0 {
60            self.buf.write_bits(timestamp_ms as u64, 64);
61            self.buf.write_bits(value_bits, 64);
62            self.prev_ts = timestamp_ms;
63            self.prev_value = value_bits;
64            self.count = 1;
65            return;
66        }
67
68        // Timestamp: delta-of-delta.
69        let delta = timestamp_ms - self.prev_ts;
70        let dod = delta - self.prev_delta;
71        self.encode_timestamp_dod(dod);
72        self.prev_ts = timestamp_ms;
73        self.prev_delta = delta;
74
75        // Value: XOR.
76        let xor = self.prev_value ^ value_bits;
77        self.encode_value_xor(xor);
78        self.prev_value = value_bits;
79
80        self.count += 1;
81    }
82
83    fn encode_timestamp_dod(&mut self, dod: i64) {
84        if dod == 0 {
85            self.buf.write_bit(false);
86        } else if (-64..=63).contains(&dod) {
87            self.buf.write_bits(0b10, 2);
88            self.buf.write_bits((dod as u64) & 0x7F, 7);
89        } else if (-256..=255).contains(&dod) {
90            self.buf.write_bits(0b110, 3);
91            self.buf.write_bits((dod as u64) & 0x1FF, 9);
92        } else if (-2048..=2047).contains(&dod) {
93            self.buf.write_bits(0b1110, 4);
94            self.buf.write_bits((dod as u64) & 0xFFF, 12);
95        } else {
96            self.buf.write_bits(0b1111, 4);
97            self.buf.write_bits(dod as u64, 64);
98        }
99    }
100
101    fn encode_value_xor(&mut self, xor: u64) {
102        if xor == 0 {
103            self.buf.write_bit(false);
104            return;
105        }
106
107        self.buf.write_bit(true);
108
109        let leading = xor.leading_zeros() as u8;
110        let trailing = xor.trailing_zeros() as u8;
111
112        if self.prev_leading != u8::MAX
113            && leading >= self.prev_leading
114            && trailing >= self.prev_trailing
115        {
116            // Fits within previous window.
117            self.buf.write_bit(false);
118            let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
119            self.buf
120                .write_bits(xor >> self.prev_trailing, meaningful_bits as usize);
121        } else {
122            // New window.
123            self.buf.write_bit(true);
124            self.buf.write_bits(leading as u64, 6);
125            let meaningful_bits = 64 - leading - trailing;
126            self.buf.write_bits((meaningful_bits - 1) as u64, 6);
127            self.buf
128                .write_bits(xor >> trailing, meaningful_bits as usize);
129            self.prev_leading = leading;
130            self.prev_trailing = trailing;
131        }
132    }
133
134    /// Finish encoding and return compressed bytes.
135    ///
136    /// Prepends a 4-byte LE sample count header.
137    pub fn finish(self) -> Vec<u8> {
138        let count_bytes = (self.count as u32).to_le_bytes();
139        let bitstream = self.buf.as_bytes();
140        let mut out = Vec::with_capacity(4 + bitstream.len());
141        out.extend_from_slice(&count_bytes);
142        out.extend_from_slice(bitstream);
143        out
144    }
145
146    pub fn count(&self) -> u64 {
147        self.count
148    }
149
150    pub fn compressed_size(&self) -> usize {
151        self.buf.bit_len().div_ceil(8)
152    }
153}
154
155impl Default for GorillaEncoder {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161// ---------------------------------------------------------------------------
162// Decoder
163// ---------------------------------------------------------------------------
164
165/// Gorilla XOR decoder for (timestamp, f64) sample streams.
166pub struct GorillaDecoder<'a> {
167    reader: BitReader<'a>,
168    prev_ts: i64,
169    prev_delta: i64,
170    prev_value: u64,
171    prev_leading: u8,
172    prev_trailing: u8,
173    count: u64,
174    total: u64,
175    first: bool,
176}
177
178impl<'a> GorillaDecoder<'a> {
179    /// Create a decoder from compressed bytes.
180    ///
181    /// Expects a 4-byte LE sample count header followed by the bitstream.
182    pub fn new(buf: &'a [u8]) -> Self {
183        if buf.len() < 4 {
184            return Self {
185                reader: BitReader::new(&[]),
186                prev_ts: 0,
187                prev_delta: 0,
188                prev_value: 0,
189                prev_leading: 0,
190                prev_trailing: 0,
191                count: 0,
192                total: 0,
193                first: true,
194            };
195        }
196        let total = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as u64;
197        Self {
198            reader: BitReader::new(&buf[4..]),
199            prev_ts: 0,
200            prev_delta: 0,
201            prev_value: 0,
202            prev_leading: 0,
203            prev_trailing: 0,
204            count: 0,
205            total,
206            first: true,
207        }
208    }
209
210    /// Decode the next sample, or None if all samples decoded.
211    pub fn next_sample(&mut self) -> Option<(i64, f64)> {
212        if self.count >= self.total {
213            return None;
214        }
215
216        if self.first {
217            self.first = false;
218            let ts = self.reader.read_bits(64).ok()? as i64;
219            let val = self.reader.read_bits(64).ok()?;
220            self.prev_ts = ts;
221            self.prev_value = val;
222            self.count = 1;
223            return Some((ts, f64::from_bits(val)));
224        }
225
226        let ts = self.decode_timestamp().ok()?;
227        let val = self.decode_value().ok()?;
228        self.count += 1;
229        Some((ts, f64::from_bits(val)))
230    }
231
232    fn decode_timestamp(&mut self) -> Result<i64, CodecError> {
233        let bit = self.reader.read_bit()?;
234        let dod = if !bit {
235            0i64
236        } else {
237            let bit2 = self.reader.read_bit()?;
238            if !bit2 {
239                let raw = self.reader.read_bits(7)? as i64;
240                sign_extend(raw, 7)
241            } else {
242                let bit3 = self.reader.read_bit()?;
243                if !bit3 {
244                    let raw = self.reader.read_bits(9)? as i64;
245                    sign_extend(raw, 9)
246                } else {
247                    let bit4 = self.reader.read_bit()?;
248                    if !bit4 {
249                        let raw = self.reader.read_bits(12)? as i64;
250                        sign_extend(raw, 12)
251                    } else {
252                        self.reader.read_bits(64)? as i64
253                    }
254                }
255            }
256        };
257
258        let delta = self.prev_delta + dod;
259        let ts = self.prev_ts + delta;
260        self.prev_ts = ts;
261        self.prev_delta = delta;
262        Ok(ts)
263    }
264
265    fn decode_value(&mut self) -> Result<u64, CodecError> {
266        let bit = self.reader.read_bit()?;
267        if !bit {
268            return Ok(self.prev_value);
269        }
270
271        let bit2 = self.reader.read_bit()?;
272        let xor = if !bit2 {
273            let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
274            let bits = self.reader.read_bits(meaningful_bits as usize)?;
275            bits << self.prev_trailing
276        } else {
277            let leading = self.reader.read_bits(6)? as u8;
278            let meaningful_bits = self.reader.read_bits(6)? as u8 + 1;
279            let trailing = 64 - leading - meaningful_bits;
280            let bits = self.reader.read_bits(meaningful_bits as usize)?;
281            self.prev_leading = leading;
282            self.prev_trailing = trailing;
283            bits << trailing
284        };
285
286        let val = self.prev_value ^ xor;
287        self.prev_value = val;
288        Ok(val)
289    }
290
291    /// Decode all remaining samples.
292    pub fn decode_all(&mut self) -> Vec<(i64, f64)> {
293        let mut samples = Vec::new();
294        while let Some(s) = self.next_sample() {
295            samples.push(s);
296        }
297        samples
298    }
299}
300
301fn sign_extend(value: i64, bits: u32) -> i64 {
302    let shift = 64 - bits;
303    (value << shift) >> shift
304}
305
306// ---------------------------------------------------------------------------
307// Convenience functions for pure-value encoding (no timestamps)
308// ---------------------------------------------------------------------------
309
310/// Encode a slice of f64 values using Gorilla XOR compression.
311///
312/// Uses synthetic sequential timestamps (0, 1, 2, ...) so the timestamp
313/// channel compresses to near-zero overhead.
314pub fn encode_f64(values: &[f64]) -> Vec<u8> {
315    let mut enc = GorillaEncoder::new();
316    for (i, &v) in values.iter().enumerate() {
317        enc.encode(i as i64, v);
318    }
319    enc.finish()
320}
321
322/// Decode Gorilla-compressed f64 values (encoded with `encode_f64`).
323pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
324    let mut dec = GorillaDecoder::new(data);
325    let samples = dec.decode_all();
326    if samples.len() != dec.total as usize {
327        return Err(CodecError::Truncated {
328            expected: dec.total as usize,
329            actual: samples.len(),
330        });
331    }
332    Ok(samples.into_iter().map(|(_, v)| v).collect())
333}
334
335/// Encode a slice of i64 timestamps using Gorilla (value channel unused).
336///
337/// For timestamps, prefer `DoubleDelta` codec — it compresses ~4x better.
338/// This function exists for backward compatibility with V1 segments.
339pub fn encode_timestamps(timestamps: &[i64]) -> Vec<u8> {
340    let mut enc = GorillaEncoder::new();
341    for &ts in timestamps {
342        enc.encode(ts, 0.0);
343    }
344    enc.finish()
345}
346
347/// Decode Gorilla-encoded timestamps.
348pub fn decode_timestamps(data: &[u8]) -> Result<Vec<i64>, CodecError> {
349    let mut dec = GorillaDecoder::new(data);
350    let samples = dec.decode_all();
351    if samples.len() != dec.total as usize {
352        return Err(CodecError::Truncated {
353            expected: dec.total as usize,
354            actual: samples.len(),
355        });
356    }
357    Ok(samples.into_iter().map(|(ts, _)| ts).collect())
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    #[test]
365    fn empty_encoder() {
366        let enc = GorillaEncoder::new();
367        assert_eq!(enc.count(), 0);
368        let data = enc.finish();
369        assert_eq!(data.len(), 4);
370        assert_eq!(u32::from_le_bytes(data[0..4].try_into().unwrap()), 0);
371    }
372
373    #[test]
374    fn single_sample_roundtrip() {
375        let mut enc = GorillaEncoder::new();
376        enc.encode(1000, 42.5);
377        let data = enc.finish();
378
379        let mut dec = GorillaDecoder::new(&data);
380        let (ts, val) = dec.next_sample().unwrap();
381        assert_eq!(ts, 1000);
382        assert!((val - 42.5).abs() < f64::EPSILON);
383        assert!(dec.next_sample().is_none());
384    }
385
386    #[test]
387    fn monotonic_timestamps_compress_well() {
388        let mut enc = GorillaEncoder::new();
389        for i in 0..1000 {
390            enc.encode(1_000_000 + i * 10_000, 100.0 + (i as f64) * 0.001);
391        }
392        let data = enc.finish();
393
394        assert!(
395            data.len() < 8000,
396            "expected good compression, got {} bytes for 1000 samples",
397            data.len()
398        );
399
400        let mut dec = GorillaDecoder::new(&data);
401        let samples = dec.decode_all();
402        assert_eq!(samples.len(), 1000);
403        assert_eq!(samples[0].0, 1_000_000);
404    }
405
406    #[test]
407    fn identical_values_compress_minimally() {
408        let mut enc = GorillaEncoder::new();
409        for i in 0..100 {
410            enc.encode(1000 + i * 1000, 42.0);
411        }
412        let data = enc.finish();
413
414        assert!(
415            data.len() < 100,
416            "identical values should compress well, got {} bytes",
417            data.len()
418        );
419
420        let mut dec = GorillaDecoder::new(&data);
421        let samples = dec.decode_all();
422        assert_eq!(samples.len(), 100);
423        for s in &samples {
424            assert!((s.1 - 42.0).abs() < f64::EPSILON);
425        }
426    }
427
428    #[test]
429    fn f64_batch_roundtrip() {
430        let values: Vec<f64> = (0..500).map(|i| 42.0 + i as f64 * 0.1).collect();
431        let encoded = encode_f64(&values);
432        let decoded = decode_f64(&encoded).unwrap();
433        assert_eq!(values.len(), decoded.len());
434        for (a, b) in values.iter().zip(decoded.iter()) {
435            assert_eq!(a.to_bits(), b.to_bits());
436        }
437    }
438
439    #[test]
440    fn timestamp_batch_roundtrip() {
441        let timestamps: Vec<i64> = (0..1000).map(|i| 1_700_000_000_000 + i * 10_000).collect();
442        let encoded = encode_timestamps(&timestamps);
443        let decoded = decode_timestamps(&encoded).unwrap();
444        assert_eq!(timestamps, decoded);
445    }
446
447    #[test]
448    fn varying_values_roundtrip() {
449        let mut enc = GorillaEncoder::new();
450        let test_values = [
451            0.0,
452            1.0,
453            -1.0,
454            f64::MAX,
455            f64::MIN,
456            std::f64::consts::PI,
457            1e-300,
458            1e300,
459        ];
460        for (i, &val) in test_values.iter().enumerate() {
461            enc.encode(i as i64 * 1000, val);
462        }
463        let data = enc.finish();
464
465        let mut dec = GorillaDecoder::new(&data);
466        let samples = dec.decode_all();
467        assert_eq!(samples.len(), test_values.len());
468        for (i, &expected) in test_values.iter().enumerate() {
469            assert_eq!(samples[i].1.to_bits(), expected.to_bits());
470        }
471    }
472
473    #[test]
474    fn compression_ratio() {
475        let mut enc = GorillaEncoder::new();
476        let mut rng_state: u64 = 12345;
477        for i in 0..10_000 {
478            rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
479            let jitter = ((rng_state >> 33) as f64) / (u32::MAX as f64) * 2.0 - 1.0;
480            let value = 50.0 + jitter * 5.0;
481            enc.encode(1_700_000_000_000 + i * 10_000, value);
482        }
483        let data = enc.finish();
484
485        let raw_size = 10_000 * 16;
486        let ratio = raw_size as f64 / data.len() as f64;
487        assert!(
488            ratio > 2.0,
489            "compression ratio {ratio:.1}:1 too low (expected >2:1)"
490        );
491    }
492}