Skip to main content

nodedb_codec/
gorilla.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Gorilla XOR encoding for floating-point timeseries metrics.
4//!
5//! Implements the Facebook Gorilla paper's XOR-based compression for
6//! double-precision floating-point values. Achieves ~1.5 bytes per
7//! 16-byte (timestamp + value) sample by exploiting temporal locality
8//! in metric streams.
9//!
10//! Also usable for non-monotonic i64 values by casting through f64 bits.
11//!
12//! Wire format:
13//! ```text
14//! [4 bytes] sample count (LE u32)
15//! [N bytes] bitstream: first sample raw (64+64 bits), then delta-of-delta
16//!           timestamps + XOR-compressed values
17//! ```
18//!
19//! Reference: "Gorilla: A Fast, Scalable, In-Memory Time Series Database"
20//! (Pelkonen et al., VLDB 2015)
21
22use crate::double_delta::{BitReader, BitWriter};
23use crate::error::CodecError;
24
25// ---------------------------------------------------------------------------
26// Encoder
27// ---------------------------------------------------------------------------
28
29/// Gorilla XOR encoder for (timestamp, f64) sample streams.
30///
31/// Timestamps use delta-of-delta encoding. Values use XOR with
32/// leading/trailing zero compression.
33#[derive(Debug)]
34pub struct GorillaEncoder {
35    buf: BitWriter,
36    prev_ts: i64,
37    prev_delta: i64,
38    prev_value: u64,
39    prev_leading: u8,
40    prev_trailing: u8,
41    count: u64,
42}
43
44impl GorillaEncoder {
45    pub fn new() -> Self {
46        Self {
47            buf: BitWriter::new(),
48            prev_ts: 0,
49            prev_delta: 0,
50            prev_value: 0,
51            prev_leading: u8::MAX,
52            prev_trailing: 0,
53            count: 0,
54        }
55    }
56
57    /// Encode a (timestamp_ms, value) sample.
58    pub fn encode(&mut self, timestamp_ms: i64, value: f64) {
59        let value_bits = value.to_bits();
60
61        if self.count == 0 {
62            self.buf.write_bits(timestamp_ms as u64, 64);
63            self.buf.write_bits(value_bits, 64);
64            self.prev_ts = timestamp_ms;
65            self.prev_value = value_bits;
66            self.count = 1;
67            return;
68        }
69
70        // Timestamp: delta-of-delta.
71        let delta = timestamp_ms - self.prev_ts;
72        let dod = delta - self.prev_delta;
73        self.encode_timestamp_dod(dod);
74        self.prev_ts = timestamp_ms;
75        self.prev_delta = delta;
76
77        // Value: XOR.
78        let xor = self.prev_value ^ value_bits;
79        self.encode_value_xor(xor);
80        self.prev_value = value_bits;
81
82        self.count += 1;
83    }
84
85    fn encode_timestamp_dod(&mut self, dod: i64) {
86        if dod == 0 {
87            self.buf.write_bit(false);
88        } else if (-64..=63).contains(&dod) {
89            self.buf.write_bits(0b10, 2);
90            self.buf.write_bits((dod as u64) & 0x7F, 7);
91        } else if (-256..=255).contains(&dod) {
92            self.buf.write_bits(0b110, 3);
93            self.buf.write_bits((dod as u64) & 0x1FF, 9);
94        } else if (-2048..=2047).contains(&dod) {
95            self.buf.write_bits(0b1110, 4);
96            self.buf.write_bits((dod as u64) & 0xFFF, 12);
97        } else {
98            self.buf.write_bits(0b1111, 4);
99            self.buf.write_bits(dod as u64, 64);
100        }
101    }
102
103    fn encode_value_xor(&mut self, xor: u64) {
104        if xor == 0 {
105            self.buf.write_bit(false);
106            return;
107        }
108
109        self.buf.write_bit(true);
110
111        let leading = xor.leading_zeros() as u8;
112        let trailing = xor.trailing_zeros() as u8;
113
114        if self.prev_leading != u8::MAX
115            && leading >= self.prev_leading
116            && trailing >= self.prev_trailing
117        {
118            // Fits within previous window.
119            self.buf.write_bit(false);
120            let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
121            self.buf
122                .write_bits(xor >> self.prev_trailing, meaningful_bits as usize);
123        } else {
124            // New window.
125            self.buf.write_bit(true);
126            self.buf.write_bits(leading as u64, 6);
127            let meaningful_bits = 64 - leading - trailing;
128            self.buf.write_bits((meaningful_bits - 1) as u64, 6);
129            self.buf
130                .write_bits(xor >> trailing, meaningful_bits as usize);
131            self.prev_leading = leading;
132            self.prev_trailing = trailing;
133        }
134    }
135
136    /// Finish encoding and return compressed bytes.
137    ///
138    /// Prepends a 4-byte LE sample count header.
139    pub fn finish(self) -> Vec<u8> {
140        let count_bytes = (self.count as u32).to_le_bytes();
141        let bitstream = self.buf.as_bytes();
142        let mut out = Vec::with_capacity(4 + bitstream.len());
143        out.extend_from_slice(&count_bytes);
144        out.extend_from_slice(bitstream);
145        out
146    }
147
148    pub fn count(&self) -> u64 {
149        self.count
150    }
151
152    pub fn compressed_size(&self) -> usize {
153        self.buf.bit_len().div_ceil(8)
154    }
155}
156
157impl Default for GorillaEncoder {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Decoder
165// ---------------------------------------------------------------------------
166
167/// Gorilla XOR decoder for (timestamp, f64) sample streams.
168pub struct GorillaDecoder<'a> {
169    reader: BitReader<'a>,
170    prev_ts: i64,
171    prev_delta: i64,
172    prev_value: u64,
173    prev_leading: u8,
174    prev_trailing: u8,
175    count: u64,
176    total: u64,
177    first: bool,
178}
179
180impl<'a> GorillaDecoder<'a> {
181    /// Create a decoder from compressed bytes.
182    ///
183    /// Expects a 4-byte LE sample count header followed by the bitstream.
184    pub fn new(buf: &'a [u8]) -> Self {
185        if buf.len() < 4 {
186            return Self {
187                reader: BitReader::new(&[]),
188                prev_ts: 0,
189                prev_delta: 0,
190                prev_value: 0,
191                prev_leading: 0,
192                prev_trailing: 0,
193                count: 0,
194                total: 0,
195                first: true,
196            };
197        }
198        let total = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as u64;
199        Self {
200            reader: BitReader::new(&buf[4..]),
201            prev_ts: 0,
202            prev_delta: 0,
203            prev_value: 0,
204            prev_leading: 0,
205            prev_trailing: 0,
206            count: 0,
207            total,
208            first: true,
209        }
210    }
211
212    /// Decode the next sample, or None if all samples decoded.
213    pub fn next_sample(&mut self) -> Option<(i64, f64)> {
214        if self.count >= self.total {
215            return None;
216        }
217
218        if self.first {
219            self.first = false;
220            let ts = self.reader.read_bits(64).ok()? as i64;
221            let val = self.reader.read_bits(64).ok()?;
222            self.prev_ts = ts;
223            self.prev_value = val;
224            self.count = 1;
225            return Some((ts, f64::from_bits(val)));
226        }
227
228        let ts = self.decode_timestamp().ok()?;
229        let val = self.decode_value().ok()?;
230        self.count += 1;
231        Some((ts, f64::from_bits(val)))
232    }
233
234    fn decode_timestamp(&mut self) -> Result<i64, CodecError> {
235        let bit = self.reader.read_bit()?;
236        let dod = if !bit {
237            0i64
238        } else {
239            let bit2 = self.reader.read_bit()?;
240            if !bit2 {
241                let raw = self.reader.read_bits(7)? as i64;
242                sign_extend(raw, 7)
243            } else {
244                let bit3 = self.reader.read_bit()?;
245                if !bit3 {
246                    let raw = self.reader.read_bits(9)? as i64;
247                    sign_extend(raw, 9)
248                } else {
249                    let bit4 = self.reader.read_bit()?;
250                    if !bit4 {
251                        let raw = self.reader.read_bits(12)? as i64;
252                        sign_extend(raw, 12)
253                    } else {
254                        self.reader.read_bits(64)? as i64
255                    }
256                }
257            }
258        };
259
260        let delta = self.prev_delta + dod;
261        let ts = self.prev_ts + delta;
262        self.prev_ts = ts;
263        self.prev_delta = delta;
264        Ok(ts)
265    }
266
267    fn decode_value(&mut self) -> Result<u64, CodecError> {
268        let bit = self.reader.read_bit()?;
269        if !bit {
270            return Ok(self.prev_value);
271        }
272
273        let bit2 = self.reader.read_bit()?;
274        let xor = if !bit2 {
275            let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
276            let bits = self.reader.read_bits(meaningful_bits as usize)?;
277            bits << self.prev_trailing
278        } else {
279            let leading = self.reader.read_bits(6)? as u8;
280            let meaningful_bits = self.reader.read_bits(6)? as u8 + 1;
281            let trailing = 64 - leading - meaningful_bits;
282            let bits = self.reader.read_bits(meaningful_bits as usize)?;
283            self.prev_leading = leading;
284            self.prev_trailing = trailing;
285            bits << trailing
286        };
287
288        let val = self.prev_value ^ xor;
289        self.prev_value = val;
290        Ok(val)
291    }
292
293    /// Decode all remaining samples.
294    pub fn decode_all(&mut self) -> Vec<(i64, f64)> {
295        let mut samples = Vec::new();
296        while let Some(s) = self.next_sample() {
297            samples.push(s);
298        }
299        samples
300    }
301}
302
303fn sign_extend(value: i64, bits: u32) -> i64 {
304    let shift = 64 - bits;
305    (value << shift) >> shift
306}
307
308// ---------------------------------------------------------------------------
309// Convenience functions for pure-value encoding (no timestamps)
310// ---------------------------------------------------------------------------
311
312/// Encode a slice of f64 values using Gorilla XOR compression.
313///
314/// Uses synthetic sequential timestamps (0, 1, 2, ...) so the timestamp
315/// channel compresses to near-zero overhead.
316pub fn encode_f64(values: &[f64]) -> Vec<u8> {
317    let mut enc = GorillaEncoder::new();
318    for (i, &v) in values.iter().enumerate() {
319        enc.encode(i as i64, v);
320    }
321    enc.finish()
322}
323
324/// Decode Gorilla-compressed f64 values (encoded with `encode_f64`).
325pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
326    let mut dec = GorillaDecoder::new(data);
327    let samples = dec.decode_all();
328    if samples.len() != dec.total as usize {
329        return Err(CodecError::Truncated {
330            expected: dec.total as usize,
331            actual: samples.len(),
332        });
333    }
334    Ok(samples.into_iter().map(|(_, v)| v).collect())
335}
336
337/// Encode a slice of i64 timestamps using Gorilla (value channel unused).
338///
339/// For timestamps, prefer `DoubleDelta` codec — it compresses ~4x better.
340/// This function exists for backward compatibility with V1 segments.
341pub fn encode_timestamps(timestamps: &[i64]) -> Vec<u8> {
342    let mut enc = GorillaEncoder::new();
343    for &ts in timestamps {
344        enc.encode(ts, 0.0);
345    }
346    enc.finish()
347}
348
349/// Decode Gorilla-encoded timestamps.
350pub fn decode_timestamps(data: &[u8]) -> Result<Vec<i64>, CodecError> {
351    let mut dec = GorillaDecoder::new(data);
352    let samples = dec.decode_all();
353    if samples.len() != dec.total as usize {
354        return Err(CodecError::Truncated {
355            expected: dec.total as usize,
356            actual: samples.len(),
357        });
358    }
359    Ok(samples.into_iter().map(|(ts, _)| ts).collect())
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn empty_encoder() {
368        let enc = GorillaEncoder::new();
369        assert_eq!(enc.count(), 0);
370        let data = enc.finish();
371        assert_eq!(data.len(), 4);
372        assert_eq!(u32::from_le_bytes(data[0..4].try_into().unwrap()), 0);
373    }
374
375    #[test]
376    fn single_sample_roundtrip() {
377        let mut enc = GorillaEncoder::new();
378        enc.encode(1000, 42.5);
379        let data = enc.finish();
380
381        let mut dec = GorillaDecoder::new(&data);
382        let (ts, val) = dec.next_sample().unwrap();
383        assert_eq!(ts, 1000);
384        assert!((val - 42.5).abs() < f64::EPSILON);
385        assert!(dec.next_sample().is_none());
386    }
387
388    #[test]
389    fn monotonic_timestamps_compress_well() {
390        let mut enc = GorillaEncoder::new();
391        for i in 0..1000 {
392            enc.encode(1_000_000 + i * 10_000, 100.0 + (i as f64) * 0.001);
393        }
394        let data = enc.finish();
395
396        assert!(
397            data.len() < 8000,
398            "expected good compression, got {} bytes for 1000 samples",
399            data.len()
400        );
401
402        let mut dec = GorillaDecoder::new(&data);
403        let samples = dec.decode_all();
404        assert_eq!(samples.len(), 1000);
405        assert_eq!(samples[0].0, 1_000_000);
406    }
407
408    #[test]
409    fn identical_values_compress_minimally() {
410        let mut enc = GorillaEncoder::new();
411        for i in 0..100 {
412            enc.encode(1000 + i * 1000, 42.0);
413        }
414        let data = enc.finish();
415
416        assert!(
417            data.len() < 100,
418            "identical values should compress well, got {} bytes",
419            data.len()
420        );
421
422        let mut dec = GorillaDecoder::new(&data);
423        let samples = dec.decode_all();
424        assert_eq!(samples.len(), 100);
425        for s in &samples {
426            assert!((s.1 - 42.0).abs() < f64::EPSILON);
427        }
428    }
429
430    #[test]
431    fn f64_batch_roundtrip() {
432        let values: Vec<f64> = (0..500).map(|i| 42.0 + i as f64 * 0.1).collect();
433        let encoded = encode_f64(&values);
434        let decoded = decode_f64(&encoded).unwrap();
435        assert_eq!(values.len(), decoded.len());
436        for (a, b) in values.iter().zip(decoded.iter()) {
437            assert_eq!(a.to_bits(), b.to_bits());
438        }
439    }
440
441    #[test]
442    fn timestamp_batch_roundtrip() {
443        let timestamps: Vec<i64> = (0..1000).map(|i| 1_700_000_000_000 + i * 10_000).collect();
444        let encoded = encode_timestamps(&timestamps);
445        let decoded = decode_timestamps(&encoded).unwrap();
446        assert_eq!(timestamps, decoded);
447    }
448
449    #[test]
450    fn varying_values_roundtrip() {
451        let mut enc = GorillaEncoder::new();
452        let test_values = [
453            0.0,
454            1.0,
455            -1.0,
456            f64::MAX,
457            f64::MIN,
458            std::f64::consts::PI,
459            1e-300,
460            1e300,
461        ];
462        for (i, &val) in test_values.iter().enumerate() {
463            enc.encode(i as i64 * 1000, val);
464        }
465        let data = enc.finish();
466
467        let mut dec = GorillaDecoder::new(&data);
468        let samples = dec.decode_all();
469        assert_eq!(samples.len(), test_values.len());
470        for (i, &expected) in test_values.iter().enumerate() {
471            assert_eq!(samples[i].1.to_bits(), expected.to_bits());
472        }
473    }
474
475    #[test]
476    fn compression_ratio() {
477        let mut enc = GorillaEncoder::new();
478        let mut rng_state: u64 = 12345;
479        for i in 0..10_000 {
480            rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
481            let jitter = ((rng_state >> 33) as f64) / (u32::MAX as f64) * 2.0 - 1.0;
482            let value = 50.0 + jitter * 5.0;
483            enc.encode(1_700_000_000_000 + i * 10_000, value);
484        }
485        let data = enc.finish();
486
487        let raw_size = 10_000 * 16;
488        let ratio = raw_size as f64 / data.len() as f64;
489        assert!(
490            ratio > 2.0,
491            "compression ratio {ratio:.1}:1 too low (expected >2:1)"
492        );
493    }
494}