Skip to main content

reddb_server/storage/timeseries/
compression.rs

1//! Timestamp and value compression for time-series data
2//!
3//! - Delta-of-delta encoding for timestamps (Facebook Gorilla paper)
4//! - XOR-based compression for floating-point values
5
6/// Delta-of-delta encode a sorted list of timestamps.
7/// Each value is encoded as the difference of differences.
8/// First value stored as-is, second as delta, rest as delta-of-delta.
9pub fn delta_encode_timestamps(timestamps: &[u64]) -> Vec<i64> {
10    if timestamps.is_empty() {
11        return Vec::new();
12    }
13    let mut encoded = Vec::with_capacity(timestamps.len());
14    encoded.push(timestamps[0] as i64); // first value as-is
15
16    if timestamps.len() == 1 {
17        return encoded;
18    }
19
20    let mut prev_delta = timestamps[1] as i64 - timestamps[0] as i64;
21    encoded.push(prev_delta); // second value as delta
22
23    for i in 2..timestamps.len() {
24        let delta = timestamps[i] as i64 - timestamps[i - 1] as i64;
25        let dod = delta - prev_delta;
26        encoded.push(dod);
27        prev_delta = delta;
28    }
29
30    encoded
31}
32
33/// Decode delta-of-delta encoded timestamps
34pub fn delta_decode_timestamps(encoded: &[i64]) -> Vec<u64> {
35    if encoded.is_empty() {
36        return Vec::new();
37    }
38    let mut decoded = Vec::with_capacity(encoded.len());
39    decoded.push(encoded[0] as u64); // first value
40
41    if encoded.len() == 1 {
42        return decoded;
43    }
44
45    let mut prev_delta = encoded[1];
46    decoded.push((encoded[0] + prev_delta) as u64); // second value
47
48    for val in encoded.iter().skip(2) {
49        let delta = prev_delta + val;
50        let value = *decoded.last().unwrap() as i64 + delta;
51        decoded.push(value as u64);
52        prev_delta = delta;
53    }
54
55    decoded
56}
57
58/// XOR-encode a series of f64 values (Gorilla-style).
59/// Returns the XOR deltas. First value stored as-is (as u64 bits).
60pub fn xor_encode_values(values: &[f64]) -> Vec<u64> {
61    if values.is_empty() {
62        return Vec::new();
63    }
64    let mut encoded = Vec::with_capacity(values.len());
65    encoded.push(values[0].to_bits());
66
67    for i in 1..values.len() {
68        let xor = values[i].to_bits() ^ values[i - 1].to_bits();
69        encoded.push(xor);
70    }
71
72    encoded
73}
74
75/// Decode XOR-encoded f64 values
76pub fn xor_decode_values(encoded: &[u64]) -> Vec<f64> {
77    if encoded.is_empty() {
78        return Vec::new();
79    }
80    let mut decoded = Vec::with_capacity(encoded.len());
81    decoded.push(f64::from_bits(encoded[0]));
82
83    for i in 1..encoded.len() {
84        let prev_bits = decoded[i - 1].to_bits();
85        decoded.push(f64::from_bits(prev_bits ^ encoded[i]));
86    }
87
88    decoded
89}
90
91// =============================================================================
92// T64 — bit-packing for integers drawn from a narrow range.
93// =============================================================================
94//
95// For a sequence of `i64`s that all fit into `k` bits of unsigned
96// magnitude (after subtracting the minimum), you only need `k` bits
97// per value instead of 64. The on-wire layout is:
98//
99//   [min: i64] [max: i64] [bit_width: u8] [packed payload bits...]
100//
101// When every value equals `min` (zero bit_width), the payload is
102// empty. Callers reconstruct via `t64_decode` which enforces the
103// bit-width range (0..=64) and the declared length.
104
105/// Encode a slice of i64s into a compact byte vector using T64
106/// bit-packing. Returns `(bytes, length)` — `length` is the number
107/// of values so decode knows how many to emit (bit-packed payloads
108/// don't self-describe length).
109pub fn t64_encode(values: &[i64]) -> (Vec<u8>, usize) {
110    if values.is_empty() {
111        return (Vec::new(), 0);
112    }
113    let min = *values.iter().min().unwrap();
114    let max = *values.iter().max().unwrap();
115    let range = (max as i128) - (min as i128);
116    let bit_width: u8 = if range <= 0 {
117        0
118    } else {
119        let ceil_bits = 128 - (range as u128).leading_zeros() as u8;
120        ceil_bits.min(64)
121    };
122
123    let mut out: Vec<u8> = Vec::with_capacity(17 + values.len() * 8);
124    out.extend_from_slice(&min.to_le_bytes());
125    out.extend_from_slice(&max.to_le_bytes());
126    out.push(bit_width);
127
128    if bit_width == 0 {
129        return (out, values.len());
130    }
131
132    let mut bit_buf: u128 = 0;
133    let mut bits_in_buf: u32 = 0;
134    for v in values {
135        let offset = (*v as i128 - min as i128) as u128;
136        bit_buf |= offset << bits_in_buf;
137        bits_in_buf += bit_width as u32;
138        while bits_in_buf >= 8 {
139            out.push(bit_buf as u8);
140            bit_buf >>= 8;
141            bits_in_buf -= 8;
142        }
143    }
144    if bits_in_buf > 0 {
145        out.push(bit_buf as u8);
146    }
147    (out, values.len())
148}
149
150/// Inverse of [`t64_encode`]. `length` must match the value passed
151/// at encode-time.
152pub fn t64_decode(bytes: &[u8], length: usize) -> Option<Vec<i64>> {
153    if length == 0 {
154        return Some(Vec::new());
155    }
156    if bytes.len() < 17 {
157        return None;
158    }
159    let min = i64::from_le_bytes(bytes[0..8].try_into().ok()?);
160    let _max = i64::from_le_bytes(bytes[8..16].try_into().ok()?);
161    let bit_width = bytes[16];
162    if bit_width == 0 {
163        return Some(vec![min; length]);
164    }
165    if bit_width > 64 {
166        return None;
167    }
168    let mut out = Vec::with_capacity(length);
169    let payload = &bytes[17..];
170    let mut bit_buf: u128 = 0;
171    let mut bits_in_buf: u32 = 0;
172    let mut byte_idx = 0usize;
173    let mask: u128 = if bit_width == 64 {
174        u64::MAX as u128
175    } else {
176        (1u128 << bit_width) - 1
177    };
178    for _ in 0..length {
179        while bits_in_buf < bit_width as u32 {
180            if byte_idx >= payload.len() {
181                return None;
182            }
183            bit_buf |= (payload[byte_idx] as u128) << bits_in_buf;
184            byte_idx += 1;
185            bits_in_buf += 8;
186        }
187        let offset = bit_buf & mask;
188        bit_buf >>= bit_width as u32;
189        bits_in_buf -= bit_width as u32;
190        let v = (min as i128).saturating_add(offset as i128) as i64;
191        out.push(v);
192    }
193    Some(out)
194}
195
196// =============================================================================
197// Chunk-wide ZSTD fallback — for payloads that compress poorly with
198// the Delta / XOR / T64 codecs above, apply a final zstd pass.
199// =============================================================================
200
201/// Compress arbitrary bytes with zstd at level 3 (good-enough balance
202/// between ratio and cpu). Small inputs short-circuit: we return the
203/// original bytes with a `0x00` leading marker so decode knows not to
204/// feed them to zstd.
205pub fn zstd_compress(bytes: &[u8]) -> Vec<u8> {
206    zstd_compress_at(bytes, 3)
207}
208
209/// Variant that lets the caller pick the zstd level. Level is
210/// clamped to `1..=22`.
211pub fn zstd_compress_at(bytes: &[u8], level: i32) -> Vec<u8> {
212    if bytes.len() < 64 {
213        // Smaller than a cache line — compression overhead outweighs
214        // any win. Prefix `0` and emit the raw buffer.
215        let mut out = Vec::with_capacity(bytes.len() + 1);
216        out.push(0u8);
217        out.extend_from_slice(bytes);
218        return out;
219    }
220    let clamped = level.clamp(1, 22);
221    match zstd::bulk::compress(bytes, clamped) {
222        Ok(compressed) => {
223            let mut out = Vec::with_capacity(compressed.len() + 1);
224            out.push(1u8);
225            out.extend_from_slice(&compressed);
226            out
227        }
228        Err(_) => {
229            // zstd shouldn't fail on valid slices; fall back to raw
230            // so roundtrip is still correct.
231            let mut out = Vec::with_capacity(bytes.len() + 1);
232            out.push(0u8);
233            out.extend_from_slice(bytes);
234            out
235        }
236    }
237}
238
239/// Inverse of [`zstd_compress`]. Returns `None` for truncated or
240/// malformed inputs.
241pub fn zstd_decompress(bytes: &[u8]) -> Option<Vec<u8>> {
242    if bytes.is_empty() {
243        return None;
244    }
245    match bytes[0] {
246        0 => Some(bytes[1..].to_vec()),
247        1 => zstd::bulk::decompress(&bytes[1..], 1 << 28).ok(),
248        _ => None,
249    }
250}
251
252// =============================================================================
253// Auto-selector — picks the cheapest codec for a given input shape.
254// =============================================================================
255
256/// Catalogue of codecs the time-series layer can pick between. Kept
257/// in sync with the `CODEC(...)` surface exposed in the DDL sprint
258/// that follows.
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum TsIntCodec {
261    /// Raw i64 per value — no compression. Fallback.
262    Raw,
263    /// Delta-of-delta (good for monotonic timestamps).
264    DeltaOfDelta,
265    /// T64 bit-packing (good for narrow-range integers).
266    T64,
267}
268
269/// Pick a codec for an integer series based on its shape. A strictly
270/// monotonic series with small deltas wins with delta-of-delta; a
271/// narrow-range series (regardless of order) wins with T64; anything
272/// else falls back to Raw + zstd fallback at the chunk layer.
273pub fn select_int_codec(values: &[i64]) -> TsIntCodec {
274    if values.len() < 4 {
275        return TsIntCodec::Raw;
276    }
277    // Heuristic 1: monotonic non-decreasing → DeltaOfDelta.
278    let monotonic = values.windows(2).all(|w| w[1] >= w[0]);
279    if monotonic {
280        return TsIntCodec::DeltaOfDelta;
281    }
282    // Heuristic 2: narrow range (< 20 bits) → T64.
283    let min = *values.iter().min().unwrap();
284    let max = *values.iter().max().unwrap();
285    let range = (max as i128 - min as i128).max(0) as u128;
286    let bits = if range == 0 {
287        0
288    } else {
289        128 - range.leading_zeros()
290    };
291    if bits <= 20 {
292        return TsIntCodec::T64;
293    }
294    TsIntCodec::Raw
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_delta_encode_decode() {
303        let timestamps: Vec<u64> = vec![1000, 1060, 1120, 1180, 1240, 1300];
304        let encoded = delta_encode_timestamps(&timestamps);
305        let decoded = delta_decode_timestamps(&encoded);
306        assert_eq!(timestamps, decoded);
307    }
308
309    #[test]
310    fn test_delta_irregular() {
311        let timestamps: Vec<u64> = vec![100, 200, 250, 400, 405, 500];
312        let encoded = delta_encode_timestamps(&timestamps);
313        let decoded = delta_decode_timestamps(&encoded);
314        assert_eq!(timestamps, decoded);
315
316        // Delta-of-deltas should mostly be small for regular data
317        // [100, 100, -50, 100, -145, 90] — deltas
318        // Then dod compresses differences of those
319    }
320
321    #[test]
322    fn test_delta_single() {
323        let timestamps: Vec<u64> = vec![42];
324        let encoded = delta_encode_timestamps(&timestamps);
325        let decoded = delta_decode_timestamps(&encoded);
326        assert_eq!(timestamps, decoded);
327    }
328
329    #[test]
330    fn test_delta_empty() {
331        let timestamps: Vec<u64> = vec![];
332        let encoded = delta_encode_timestamps(&timestamps);
333        let decoded = delta_decode_timestamps(&encoded);
334        assert!(decoded.is_empty());
335    }
336
337    #[test]
338    fn test_delta_compression_ratio() {
339        // Regular 1-second intervals — should compress very well
340        let timestamps: Vec<u64> = (0..1000).map(|i| 1_000_000 + i * 1000).collect();
341        let encoded = delta_encode_timestamps(&timestamps);
342
343        // After first two values, all delta-of-deltas should be 0
344        for &dod in &encoded[2..] {
345            assert_eq!(dod, 0, "Regular intervals should have zero delta-of-delta");
346        }
347    }
348
349    #[test]
350    fn test_xor_encode_decode() {
351        let values = vec![72.5, 72.6, 72.55, 72.7, 72.65, 72.8];
352        let encoded = xor_encode_values(&values);
353        let decoded = xor_decode_values(&encoded);
354        assert_eq!(values, decoded);
355    }
356
357    #[test]
358    fn test_xor_compression_similar_values() {
359        let values: Vec<f64> = (0..100).map(|i| 95.0 + (i as f64) * 0.01).collect();
360        let encoded = xor_encode_values(&values);
361
362        // XOR of similar floats should have many leading zeros
363        let zero_xors = encoded[1..].iter().filter(|&&x| x == 0).count();
364        // Not all will be zero since values differ, but demonstrates compression potential
365        let _ = zero_xors;
366    }
367
368    #[test]
369    fn test_xor_empty() {
370        assert!(xor_encode_values(&[]).is_empty());
371        assert!(xor_decode_values(&[]).is_empty());
372    }
373
374    // ---- T64 tests ----------------------------------------------------
375
376    #[test]
377    fn t64_round_trips_narrow_range() {
378        let values: Vec<i64> = (0..1024).map(|i| 1000 + (i % 128)).collect();
379        let (bytes, len) = t64_encode(&values);
380        let decoded = t64_decode(&bytes, len).unwrap();
381        assert_eq!(values, decoded);
382        // Compression ratio sanity: 7 bits per value + 17-byte header
383        // is way under 8 bytes/value.
384        assert!(bytes.len() < values.len() * 8 / 4);
385    }
386
387    #[test]
388    fn t64_handles_constant_sequence_with_zero_bit_width() {
389        let values = vec![42i64; 100];
390        let (bytes, len) = t64_encode(&values);
391        assert_eq!(bytes.len(), 17); // header only
392        let decoded = t64_decode(&bytes, len).unwrap();
393        assert_eq!(values, decoded);
394    }
395
396    #[test]
397    fn t64_empty_returns_empty() {
398        let (bytes, len) = t64_encode(&[]);
399        assert!(bytes.is_empty());
400        assert_eq!(len, 0);
401        assert_eq!(t64_decode(&[], 0).unwrap(), Vec::<i64>::new());
402    }
403
404    #[test]
405    fn t64_handles_negative_values() {
406        let values = vec![-1000, -500, 0, 500, 1000, -750, 250];
407        let (bytes, len) = t64_encode(&values);
408        let decoded = t64_decode(&bytes, len).unwrap();
409        assert_eq!(values, decoded);
410    }
411
412    #[test]
413    fn t64_rejects_corrupted_payload() {
414        // Length claim exceeds the bytes available.
415        let (bytes, _) = t64_encode(&[1i64, 2, 3, 4]);
416        assert!(t64_decode(&bytes[..18], 100).is_none());
417    }
418
419    // ---- ZSTD fallback tests ------------------------------------------
420
421    #[test]
422    fn zstd_small_input_passes_through_uncompressed() {
423        let data = b"short";
424        let compressed = zstd_compress(data);
425        // Header (1 byte) + raw data.
426        assert_eq!(compressed[0], 0);
427        assert_eq!(&compressed[1..], data);
428        assert_eq!(zstd_decompress(&compressed).unwrap(), data.to_vec());
429    }
430
431    #[test]
432    fn zstd_large_input_compresses_and_round_trips() {
433        let data: Vec<u8> = (0..4096).map(|i| (i % 8) as u8).collect();
434        let compressed = zstd_compress(&data);
435        assert_eq!(compressed[0], 1);
436        assert!(
437            compressed.len() < data.len() / 2,
438            "zstd should compress ≥2x on repetitive input"
439        );
440        let decompressed = zstd_decompress(&compressed).unwrap();
441        assert_eq!(decompressed, data);
442    }
443
444    #[test]
445    fn zstd_decompress_rejects_unknown_marker() {
446        assert!(zstd_decompress(&[0xff, 0, 1, 2]).is_none());
447        assert!(zstd_decompress(&[]).is_none());
448    }
449
450    // ---- select_int_codec -------------------------------------------
451
452    #[test]
453    fn select_int_codec_picks_delta_for_monotonic_timestamps() {
454        let ts: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 1000).collect();
455        assert_eq!(select_int_codec(&ts), TsIntCodec::DeltaOfDelta);
456    }
457
458    #[test]
459    fn select_int_codec_picks_t64_for_narrow_range() {
460        // Random-looking but bounded in [0, 1024] — fits T64's 20-bit
461        // threshold easily and is not monotonic.
462        let vals: Vec<i64> = (0..500).map(|i| ((i * 13 + 7) % 1024) as i64).collect();
463        assert_eq!(select_int_codec(&vals), TsIntCodec::T64);
464    }
465
466    #[test]
467    fn select_int_codec_falls_back_to_raw_on_wide_non_monotonic() {
468        let vals = vec![1_000_000_000i64, -1, 500_000_000, 42, i64::MAX / 2];
469        assert_eq!(select_int_codec(&vals), TsIntCodec::Raw);
470    }
471
472    #[test]
473    fn select_int_codec_returns_raw_for_tiny_inputs() {
474        assert_eq!(select_int_codec(&[]), TsIntCodec::Raw);
475        assert_eq!(select_int_codec(&[1, 2, 3]), TsIntCodec::Raw);
476    }
477}