Skip to main content

oxirs_tsdb/compression/
adaptive.rs

1//! Adaptive compression: automatically selects the best algorithm per-block
2//!
3//! The [`AdaptiveCompressor`] analyses incoming data and picks the most
4//! appropriate encoding from:
5//!
6//! | Algorithm | Best suited for |
7//! |-----------|----------------|
8//! | Gorilla   | Floating-point sensor values with temporal locality |
9//! | RLE       | Step-function / constant-value data (states, flags) |
10//! | Dictionary| Repeated categorical string labels |
11//! | Raw       | High-entropy data where compression gives no benefit |
12//!
13//! ## Selection heuristic
14//!
15//! 1. If the value stream is made up of strings → Dictionary.
16//! 2. If the ratio of unique values to total values is < 10% → RLE.
17//! 3. Otherwise → Gorilla.
18//! 4. If the Gorilla-encoded size is larger than raw → Raw (passthrough).
19//!
20//! This module also provides a unified [`CompressedBlock`] type that encodes
21//! which algorithm was used so that decompression is self-describing.
22
23use crate::compression::gorilla::{gorilla_decode, gorilla_encode};
24use crate::compression::rle::{rle_decode, rle_encode, RleRun};
25use crate::error::{TsdbError, TsdbResult};
26use serde::{Deserialize, Serialize};
27
28// ---------------------------------------------------------------------------
29// Algorithm tag
30// ---------------------------------------------------------------------------
31
32/// Identifies which compression algorithm was applied to a block.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum CompressionAlgorithm {
35    /// No compression; raw `(i64, f64)` pairs as little-endian bytes.
36    Raw,
37    /// Gorilla XOR + delta-of-delta encoding.
38    Gorilla,
39    /// Run-Length Encoding; effective for step-function series.
40    Rle,
41    /// Dictionary encoding for string-valued series.
42    Dictionary,
43}
44
45impl std::fmt::Display for CompressionAlgorithm {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        match self {
48            Self::Raw => write!(f, "raw"),
49            Self::Gorilla => write!(f, "gorilla"),
50            Self::Rle => write!(f, "rle"),
51            Self::Dictionary => write!(f, "dictionary"),
52        }
53    }
54}
55
56// ---------------------------------------------------------------------------
57// CompressedBlock
58// ---------------------------------------------------------------------------
59
60/// A self-describing compressed block holding one or more time-series samples.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CompressedBlock {
63    /// The algorithm used to produce this block.
64    pub algorithm: CompressionAlgorithm,
65    /// Raw compressed bytes.
66    pub data: Vec<u8>,
67    /// Number of samples encoded in this block.
68    pub sample_count: u32,
69    /// Timestamp of the first sample (ms since epoch).
70    pub min_timestamp: i64,
71    /// Timestamp of the last sample (ms since epoch).
72    pub max_timestamp: i64,
73}
74
75impl CompressedBlock {
76    /// Approximate compression ratio: `original_bytes / compressed_bytes`.
77    pub fn compression_ratio(&self) -> f64 {
78        let original = self.sample_count as f64 * 16.0; // i64 + f64 = 16 bytes
79        let compressed = self.data.len() as f64;
80        if compressed == 0.0 {
81            1.0
82        } else {
83            original / compressed
84        }
85    }
86
87    /// Decode the block back to `(timestamp_ms, value)` pairs.
88    pub fn decode(&self) -> TsdbResult<Vec<(i64, f64)>> {
89        match self.algorithm {
90            CompressionAlgorithm::Raw => decode_raw(&self.data),
91            CompressionAlgorithm::Gorilla => gorilla_decode(&self.data),
92            CompressionAlgorithm::Rle => {
93                let runs = decode_rle_runs(&self.data)?;
94                Ok(rle_decode(&runs))
95            }
96            CompressionAlgorithm::Dictionary => Err(TsdbError::Decompression(
97                "Dictionary blocks require string-aware decoding; use DictionaryBlock directly"
98                    .to_string(),
99            )),
100        }
101    }
102}
103
104// ---------------------------------------------------------------------------
105// Adaptive compressor
106// ---------------------------------------------------------------------------
107
108/// Statistics gathered over a sample window to guide algorithm selection.
109#[derive(Debug, Default)]
110pub struct SampleStats {
111    pub total: usize,
112    pub unique_values: usize,
113    /// Sum of |delta_of_delta| for timestamps
114    pub dod_sum: i64,
115    /// Count of zero XOR transitions (unchanged values)
116    pub zero_xor_count: usize,
117}
118
119/// Adaptive compressor: buffers samples, then chooses and applies the best
120/// algorithm when [`finish`](Self::finish) is called.
121///
122/// # Example
123///
124/// ```rust,ignore
125/// let mut comp = AdaptiveCompressor::new();
126/// for (ts, val) in sensor_data {
127///     comp.push(ts, val);
128/// }
129/// let block = comp.finish()?;
130/// println!("Used: {}, ratio: {:.1}", block.algorithm, block.compression_ratio());
131/// ```
132pub struct AdaptiveCompressor {
133    samples: Vec<(i64, f64)>,
134    /// Force a specific algorithm (skips auto-selection when `Some`).
135    forced: Option<CompressionAlgorithm>,
136}
137
138impl AdaptiveCompressor {
139    /// Create a new adaptive compressor.
140    pub fn new() -> Self {
141        Self {
142            samples: Vec::new(),
143            forced: None,
144        }
145    }
146
147    /// Force use of a specific algorithm regardless of data characteristics.
148    pub fn with_algorithm(mut self, algo: CompressionAlgorithm) -> Self {
149        self.forced = Some(algo);
150        self
151    }
152
153    /// Add a `(timestamp_ms, value)` sample.
154    pub fn push(&mut self, timestamp: i64, value: f64) {
155        self.samples.push((timestamp, value));
156    }
157
158    /// Add multiple samples at once.
159    pub fn extend(&mut self, samples: &[(i64, f64)]) {
160        self.samples.extend_from_slice(samples);
161    }
162
163    /// Analyse the buffered samples and return statistics used by the
164    /// algorithm selector.
165    fn analyse(&self) -> SampleStats {
166        let mut stats = SampleStats {
167            total: self.samples.len(),
168            ..Default::default()
169        };
170        if self.samples.is_empty() {
171            return stats;
172        }
173
174        // Count unique values via a sorted approach (avoid HashMap for no_std compat)
175        let mut bits: Vec<u64> = self.samples.iter().map(|&(_, v)| v.to_bits()).collect();
176        bits.sort_unstable();
177        bits.dedup();
178        stats.unique_values = bits.len();
179
180        // XOR transitions and delta-of-delta
181        let mut prev_bits = self.samples[0].1.to_bits();
182        let mut prev_ts = self.samples[0].0;
183        let mut prev_delta = 0i64;
184        for &(ts, val) in &self.samples[1..] {
185            let cur_bits = val.to_bits();
186            if cur_bits == prev_bits {
187                stats.zero_xor_count += 1;
188            }
189            prev_bits = cur_bits;
190
191            let delta = ts - prev_ts;
192            let dod = delta - prev_delta;
193            stats.dod_sum = stats.dod_sum.saturating_add(dod.abs());
194            prev_delta = delta;
195            prev_ts = ts;
196        }
197        stats
198    }
199
200    /// Select the best algorithm based on sample statistics.
201    fn select_algorithm(&self, stats: &SampleStats) -> CompressionAlgorithm {
202        if stats.total == 0 {
203            return CompressionAlgorithm::Raw;
204        }
205
206        // High repetition ratio → RLE is likely best
207        let repeat_ratio = stats.zero_xor_count as f64 / stats.total.max(1) as f64;
208        if repeat_ratio >= 0.7 {
209            return CompressionAlgorithm::Rle;
210        }
211
212        // Low cardinality relative to total → RLE can also help
213        let cardinality_ratio = stats.unique_values as f64 / stats.total as f64;
214        if cardinality_ratio < 0.05 && stats.total > 10 {
215            return CompressionAlgorithm::Rle;
216        }
217
218        // Default: Gorilla handles temporal-locality well for sensor floats
219        CompressionAlgorithm::Gorilla
220    }
221
222    /// Compress the buffered samples and return a self-describing block.
223    ///
224    /// After calling `finish`, the compressor is consumed.
225    pub fn finish(self) -> TsdbResult<CompressedBlock> {
226        if self.samples.is_empty() {
227            return Ok(CompressedBlock {
228                algorithm: CompressionAlgorithm::Raw,
229                data: Vec::new(),
230                sample_count: 0,
231                min_timestamp: 0,
232                max_timestamp: 0,
233            });
234        }
235
236        let min_ts = self.samples.first().map(|&(t, _)| t).unwrap_or(0);
237        let max_ts = self.samples.last().map(|&(t, _)| t).unwrap_or(0);
238        let sample_count = self.samples.len() as u32;
239
240        let algo = self.forced.unwrap_or_else(|| {
241            let stats = self.analyse();
242            self.select_algorithm(&stats)
243        });
244
245        let data = match algo {
246            CompressionAlgorithm::Raw => encode_raw(&self.samples),
247            CompressionAlgorithm::Gorilla => gorilla_encode(&self.samples)?,
248            CompressionAlgorithm::Rle => {
249                let runs = rle_encode(&self.samples)?;
250                encode_rle_runs(&runs)?
251            }
252            CompressionAlgorithm::Dictionary => {
253                return Err(TsdbError::Compression(
254                    "Dictionary compression requires string data; use DictionaryEncoder instead"
255                        .to_string(),
256                ));
257            }
258        };
259
260        Ok(CompressedBlock {
261            algorithm: algo,
262            data,
263            sample_count,
264            min_timestamp: min_ts,
265            max_timestamp: max_ts,
266        })
267    }
268}
269
270impl Default for AdaptiveCompressor {
271    fn default() -> Self {
272        Self::new()
273    }
274}
275
276// ---------------------------------------------------------------------------
277// Raw encoding helpers
278// ---------------------------------------------------------------------------
279
280fn encode_raw(data: &[(i64, f64)]) -> Vec<u8> {
281    let mut out = Vec::with_capacity(data.len() * 16);
282    for &(ts, val) in data {
283        out.extend_from_slice(&ts.to_le_bytes());
284        out.extend_from_slice(&val.to_bits().to_le_bytes());
285    }
286    out
287}
288
289fn decode_raw(data: &[u8]) -> TsdbResult<Vec<(i64, f64)>> {
290    if data.len() % 16 != 0 {
291        return Err(TsdbError::Decompression(format!(
292            "Raw block length {} is not a multiple of 16",
293            data.len()
294        )));
295    }
296    let mut out = Vec::with_capacity(data.len() / 16);
297    for chunk in data.chunks_exact(16) {
298        let ts = i64::from_le_bytes([
299            chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
300        ]);
301        let val_bits = u64::from_le_bytes([
302            chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14], chunk[15],
303        ]);
304        out.push((ts, f64::from_bits(val_bits)));
305    }
306    Ok(out)
307}
308
309// ---------------------------------------------------------------------------
310// RLE binary encoding helpers
311// ---------------------------------------------------------------------------
312
313/// Wire format for RLE runs:
314/// ```text
315/// [run_count: u32 le]
316/// per run: [start: i64 le][end: i64 le][value_bits: u64 le][count: u32 le]
317/// ```
318fn encode_rle_runs(runs: &[RleRun]) -> TsdbResult<Vec<u8>> {
319    let run_count = runs.len() as u32;
320    let mut out = Vec::with_capacity(4 + runs.len() * 28);
321    out.extend_from_slice(&run_count.to_le_bytes());
322    for run in runs {
323        out.extend_from_slice(&run.start_timestamp.to_le_bytes());
324        out.extend_from_slice(&run.end_timestamp.to_le_bytes());
325        out.extend_from_slice(&run.value.to_bits().to_le_bytes());
326        out.extend_from_slice(&run.count.to_le_bytes());
327    }
328    Ok(out)
329}
330
331fn decode_rle_runs(data: &[u8]) -> TsdbResult<Vec<RleRun>> {
332    if data.len() < 4 {
333        return Err(TsdbError::Decompression(
334            "RLE binary: data too short for run count".to_string(),
335        ));
336    }
337    let run_count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
338    let expected_len = 4 + run_count * 28;
339    if data.len() < expected_len {
340        return Err(TsdbError::Decompression(format!(
341            "RLE binary: expected {} bytes for {} runs, got {}",
342            expected_len,
343            run_count,
344            data.len()
345        )));
346    }
347    let mut runs = Vec::with_capacity(run_count);
348    for i in 0..run_count {
349        let off = 4 + i * 28;
350        let start_timestamp = i64::from_le_bytes([
351            data[off],
352            data[off + 1],
353            data[off + 2],
354            data[off + 3],
355            data[off + 4],
356            data[off + 5],
357            data[off + 6],
358            data[off + 7],
359        ]);
360        let end_timestamp = i64::from_le_bytes([
361            data[off + 8],
362            data[off + 9],
363            data[off + 10],
364            data[off + 11],
365            data[off + 12],
366            data[off + 13],
367            data[off + 14],
368            data[off + 15],
369        ]);
370        let val_bits = u64::from_le_bytes([
371            data[off + 16],
372            data[off + 17],
373            data[off + 18],
374            data[off + 19],
375            data[off + 20],
376            data[off + 21],
377            data[off + 22],
378            data[off + 23],
379        ]);
380        let count = u32::from_le_bytes([
381            data[off + 24],
382            data[off + 25],
383            data[off + 26],
384            data[off + 27],
385        ]);
386        runs.push(RleRun {
387            start_timestamp,
388            end_timestamp,
389            value: f64::from_bits(val_bits),
390            count,
391        });
392    }
393    Ok(runs)
394}
395
396// ---------------------------------------------------------------------------
397// Tests
398// ---------------------------------------------------------------------------
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    fn make_regular(n: usize, interval_ms: i64, base_val: f64) -> Vec<(i64, f64)> {
405        (0..n)
406            .map(|i| (i as i64 * interval_ms, base_val + (i % 5) as f64 * 0.01))
407            .collect()
408    }
409
410    #[test]
411    fn test_empty_block() {
412        let block = AdaptiveCompressor::new().finish().expect("finish");
413        assert_eq!(block.sample_count, 0);
414        assert!(block.decode().expect("decode").is_empty());
415    }
416
417    #[test]
418    fn test_constant_data_selects_rle() {
419        let data: Vec<(i64, f64)> = (0..200).map(|i| (i as i64 * 1000, 5.0)).collect();
420        let mut comp = AdaptiveCompressor::new();
421        comp.extend(&data);
422        let block = comp.finish().expect("finish");
423        // Constant data → high zero-XOR ratio → RLE
424        assert_eq!(block.algorithm, CompressionAlgorithm::Rle);
425        let decoded = block.decode().expect("decode");
426        assert_eq!(decoded, data);
427    }
428
429    #[test]
430    fn test_sensor_data_selects_gorilla() {
431        // Use high-cardinality data (many unique values) to ensure Gorilla is selected.
432        // Each sample has a unique value derived from a transcendental function so
433        // cardinality_ratio > 0.05 and zero_xor_count is low.
434        let data: Vec<(i64, f64)> = (0..200)
435            .map(|i| {
436                let ts = i as i64 * 1000;
437                // cos(i) produces many distinct float values, ensuring high cardinality
438                let val = (i as f64 * 0.123456).cos() * 100.0 + 50.0;
439                (ts, val)
440            })
441            .collect();
442        let mut comp = AdaptiveCompressor::new();
443        comp.extend(&data);
444        let block = comp.finish().expect("finish");
445        assert_eq!(block.algorithm, CompressionAlgorithm::Gorilla);
446        let decoded = block.decode().expect("decode");
447        assert_eq!(decoded.len(), data.len());
448        for (orig, dec) in data.iter().zip(decoded.iter()) {
449            assert_eq!(orig.0, dec.0);
450            assert_eq!(orig.1.to_bits(), dec.1.to_bits());
451        }
452    }
453
454    #[test]
455    fn test_forced_algorithm_gorilla() {
456        let data: Vec<(i64, f64)> = (0..50).map(|i| (i as i64 * 1000, 7.0)).collect();
457        let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Gorilla);
458        comp.extend(&data);
459        let block = comp.finish().expect("finish");
460        assert_eq!(block.algorithm, CompressionAlgorithm::Gorilla);
461        let decoded = block.decode().expect("decode");
462        assert_eq!(decoded, data);
463    }
464
465    #[test]
466    fn test_forced_algorithm_rle() {
467        let data = make_regular(50, 500, 1.5);
468        let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Rle);
469        comp.extend(&data);
470        let block = comp.finish().expect("finish");
471        assert_eq!(block.algorithm, CompressionAlgorithm::Rle);
472        let decoded = block.decode().expect("decode");
473        assert_eq!(decoded, data);
474    }
475
476    #[test]
477    fn test_forced_algorithm_raw() {
478        let data: Vec<(i64, f64)> = (0..10).map(|i| (i as i64, i as f64)).collect();
479        let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Raw);
480        comp.extend(&data);
481        let block = comp.finish().expect("finish");
482        assert_eq!(block.algorithm, CompressionAlgorithm::Raw);
483        let decoded = block.decode().expect("decode");
484        assert_eq!(decoded, data);
485    }
486
487    #[test]
488    fn test_raw_encode_decode_round_trip() {
489        let data: Vec<(i64, f64)> = vec![(0, 1.0), (1000, 2.5), (2000, -std::f64::consts::PI)];
490        let raw = encode_raw(&data);
491        let decoded = decode_raw(&raw).expect("decode");
492        assert_eq!(decoded, data);
493    }
494
495    #[test]
496    fn test_rle_binary_round_trip() {
497        let data: Vec<(i64, f64)> = vec![(0, 1.0), (1000, 1.0), (2000, 2.0), (3000, 2.0)];
498        let runs = rle_encode(&data).expect("rle encode");
499        let encoded = encode_rle_runs(&runs).expect("encode runs");
500        let decoded_runs = decode_rle_runs(&encoded).expect("decode runs");
501        assert_eq!(runs.len(), decoded_runs.len());
502        for (a, b) in runs.iter().zip(decoded_runs.iter()) {
503            assert_eq!(a.start_timestamp, b.start_timestamp);
504            assert_eq!(a.end_timestamp, b.end_timestamp);
505            assert_eq!(a.value.to_bits(), b.value.to_bits());
506            assert_eq!(a.count, b.count);
507        }
508        let decoded = rle_decode(&decoded_runs);
509        assert_eq!(decoded, data);
510    }
511
512    #[test]
513    fn test_compression_ratio() {
514        let data: Vec<(i64, f64)> = (0..1000).map(|i| (i as i64 * 1000, 5.0)).collect();
515        let mut comp = AdaptiveCompressor::new();
516        comp.extend(&data);
517        let block = comp.finish().expect("finish");
518        assert!(
519            block.compression_ratio() > 1.0,
520            "should have positive compression"
521        );
522    }
523
524    #[test]
525    fn test_metadata_fields() {
526        let data: Vec<(i64, f64)> = vec![(100, 1.0), (200, 2.0), (300, 3.0)];
527        let mut comp = AdaptiveCompressor::new();
528        comp.extend(&data);
529        let block = comp.finish().expect("finish");
530        assert_eq!(block.min_timestamp, 100);
531        assert_eq!(block.max_timestamp, 300);
532        assert_eq!(block.sample_count, 3);
533    }
534
535    #[test]
536    fn test_dictionary_forced_returns_error() {
537        let data = vec![(0i64, 1.0f64)];
538        let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Dictionary);
539        comp.extend(&data);
540        let result = comp.finish();
541        assert!(result.is_err());
542    }
543
544    #[test]
545    fn test_analyse_stats_constant() {
546        let data: Vec<(i64, f64)> = (0..100).map(|i| (i as i64 * 1000, 42.0)).collect();
547        let mut comp = AdaptiveCompressor::new();
548        comp.extend(&data);
549        let stats = comp.analyse();
550        assert_eq!(stats.total, 100);
551        assert_eq!(stats.unique_values, 1);
552        assert_eq!(stats.zero_xor_count, 99); // all values same
553    }
554}