Skip to main content

oxirs_tsdb/compression/
rle.rs

1//! Run-Length Encoding (RLE) compression for time-series data
2//!
3//! RLE is highly effective for step-function data such as IoT device states,
4//! alarm flags, or any series that holds a constant value for extended periods.
5//!
6//! ## Encoding
7//!
8//! Each *run* describes a contiguous sequence of identical values:
9//!
10//! ```text
11//! RleRun { start_timestamp, value, count }
12//! ```
13//!
14//! The end timestamp of a run is implicit: it is `start_timestamp + (count - 1) * step`,
15//! where `step` is the nominal sampling interval.  For sparse data where the
16//! sampling interval is not fixed, each [`RleRun`] exposes `end_timestamp` which
17//! is set by the encoder to the timestamp of the last sample in the run.
18
19use crate::error::{TsdbError, TsdbResult};
20use serde::{Deserialize, Serialize};
21
22// ---------------------------------------------------------------------------
23// Types
24// ---------------------------------------------------------------------------
25
26/// A single RLE run: one or more consecutive samples that share the same value.
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct RleRun {
29    /// Timestamp (ms since epoch) of the first sample in this run.
30    pub start_timestamp: i64,
31    /// Timestamp (ms since epoch) of the last sample in this run.
32    pub end_timestamp: i64,
33    /// The common value of all samples in this run.
34    ///
35    /// Two f64 values are considered equal when their bit patterns match
36    /// (i.e. `a.to_bits() == b.to_bits()`), so that `NaN != NaN` does *not*
37    /// collapse different NaN payloads, and `+0.0 != -0.0`.
38    pub value: f64,
39    /// Number of samples in this run (>= 1).
40    pub count: u32,
41}
42
43impl RleRun {
44    /// Return the compressed bit-pattern equality of two f64 values.
45    #[inline]
46    fn values_equal(a: f64, b: f64) -> bool {
47        a.to_bits() == b.to_bits()
48    }
49}
50
51// ---------------------------------------------------------------------------
52// Encoder
53// ---------------------------------------------------------------------------
54
55/// Streaming RLE encoder for `(timestamp_ms, value)` pairs.
56///
57/// Feed samples in time-ascending order via [`push`](Self::push),
58/// then call [`finish`](Self::finish) to obtain the run list.
59pub struct RleEncoder {
60    current_value: Option<f64>,
61    run_length: u32,
62    current_start: i64,
63    current_last: i64,
64    encoded: Vec<RleRun>,
65}
66
67impl RleEncoder {
68    /// Create a new encoder.
69    pub fn new() -> Self {
70        Self {
71            current_value: None,
72            run_length: 0,
73            current_start: 0,
74            current_last: 0,
75            encoded: Vec::new(),
76        }
77    }
78
79    /// Push the next `(timestamp_ms, value)` sample.
80    ///
81    /// Samples should arrive in non-decreasing timestamp order.
82    pub fn push(&mut self, timestamp: i64, value: f64) -> TsdbResult<()> {
83        match self.current_value {
84            None => {
85                // First sample
86                self.current_value = Some(value);
87                self.run_length = 1;
88                self.current_start = timestamp;
89                self.current_last = timestamp;
90            }
91            Some(prev) if RleRun::values_equal(prev, value) => {
92                // Continue existing run
93                if timestamp < self.current_last {
94                    return Err(TsdbError::Compression(format!(
95                        "RLE: timestamps must be non-decreasing, got {} after {}",
96                        timestamp, self.current_last
97                    )));
98                }
99                self.run_length += 1;
100                self.current_last = timestamp;
101            }
102            Some(prev) => {
103                // Value changed: flush current run
104                if timestamp < self.current_last {
105                    return Err(TsdbError::Compression(format!(
106                        "RLE: timestamps must be non-decreasing, got {} after {}",
107                        timestamp, self.current_last
108                    )));
109                }
110                self.encoded.push(RleRun {
111                    start_timestamp: self.current_start,
112                    end_timestamp: self.current_last,
113                    value: prev,
114                    count: self.run_length,
115                });
116                self.current_value = Some(value);
117                self.run_length = 1;
118                self.current_start = timestamp;
119                self.current_last = timestamp;
120            }
121        }
122        Ok(())
123    }
124
125    /// Flush any remaining run and return the complete list of [`RleRun`]s.
126    pub fn finish(mut self) -> Vec<RleRun> {
127        if let Some(v) = self.current_value {
128            self.encoded.push(RleRun {
129                start_timestamp: self.current_start,
130                end_timestamp: self.current_last,
131                value: v,
132                count: self.run_length,
133            });
134        }
135        self.encoded
136    }
137}
138
139impl Default for RleEncoder {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145// ---------------------------------------------------------------------------
146// Convenience functions
147// ---------------------------------------------------------------------------
148
149/// Encode a slice of `(timestamp_ms, value)` pairs into a list of [`RleRun`]s.
150///
151/// Equivalent to constructing an [`RleEncoder`], pushing all samples, and
152/// calling [`RleEncoder::finish`].
153pub fn rle_encode(data: &[(i64, f64)]) -> TsdbResult<Vec<RleRun>> {
154    let mut encoder = RleEncoder::new();
155    for &(ts, val) in data {
156        encoder.push(ts, val)?;
157    }
158    Ok(encoder.finish())
159}
160
161/// Decode a list of [`RleRun`]s back into `(timestamp_ms, value)` pairs.
162///
163/// For each run, the encoder stored `count` samples.  When the sampling
164/// interval is uniform within a run, we distribute the timestamps linearly
165/// between `start_timestamp` and `end_timestamp`.  When a run has `count == 1`,
166/// only `start_timestamp` is emitted.
167///
168/// # Panics
169/// Never panics (uses `?` internally).
170pub fn rle_decode(runs: &[RleRun]) -> Vec<(i64, f64)> {
171    let total: usize = runs.iter().map(|r| r.count as usize).sum();
172    let mut out = Vec::with_capacity(total);
173
174    for run in runs {
175        if run.count == 0 {
176            continue;
177        }
178        if run.count == 1 {
179            out.push((run.start_timestamp, run.value));
180        } else {
181            // Distribute timestamps evenly across the run
182            let span = run.end_timestamp - run.start_timestamp;
183            let step = span / (run.count as i64 - 1);
184            for i in 0..run.count as i64 {
185                let ts = if i == run.count as i64 - 1 {
186                    // Use exact end timestamp for last sample to avoid rounding drift
187                    run.end_timestamp
188                } else {
189                    run.start_timestamp + i * step
190                };
191                out.push((ts, run.value));
192            }
193        }
194    }
195    out
196}
197
198// ---------------------------------------------------------------------------
199// RleBlock: a self-contained serialisable compressed block
200// ---------------------------------------------------------------------------
201
202/// A self-contained, serialisable RLE-compressed block.
203#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct RleBlock {
205    /// The encoded runs.
206    pub runs: Vec<RleRun>,
207    /// Total number of original samples represented by this block.
208    pub total_samples: u64,
209}
210
211impl RleBlock {
212    /// Build a block from raw data.
213    pub fn from_data(data: &[(i64, f64)]) -> TsdbResult<Self> {
214        let runs = rle_encode(data)?;
215        let total_samples = data.len() as u64;
216        Ok(Self {
217            runs,
218            total_samples,
219        })
220    }
221
222    /// Decode back to `(timestamp_ms, value)` pairs.
223    pub fn decode(&self) -> Vec<(i64, f64)> {
224        rle_decode(&self.runs)
225    }
226
227    /// Approximate compression ratio (original bytes / encoded bytes).
228    ///
229    /// Assumes each original sample costs 16 bytes (i64 + f64), and each
230    /// [`RleRun`] costs approximately 28 bytes on the wire.
231    pub fn compression_ratio(&self) -> f64 {
232        let original_bytes = self.total_samples as f64 * 16.0;
233        let encoded_bytes = self.runs.len() as f64 * 28.0 + 8.0; // +8 for header
234        if encoded_bytes == 0.0 {
235            1.0
236        } else {
237            original_bytes / encoded_bytes
238        }
239    }
240
241    /// Number of distinct runs (lower is better compression).
242    pub fn run_count(&self) -> usize {
243        self.runs.len()
244    }
245}
246
247// ---------------------------------------------------------------------------
248// Tests
249// ---------------------------------------------------------------------------
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn test_empty_encode_decode() {
257        let runs = rle_encode(&[]).expect("encode failed");
258        assert!(runs.is_empty());
259        let decoded = rle_decode(&runs);
260        assert!(decoded.is_empty());
261    }
262
263    #[test]
264    fn test_single_sample() {
265        let data = vec![(1000i64, 42.0f64)];
266        let runs = rle_encode(&data).expect("encode");
267        assert_eq!(runs.len(), 1);
268        assert_eq!(runs[0].count, 1);
269        assert_eq!(runs[0].value, 42.0);
270        let decoded = rle_decode(&runs);
271        assert_eq!(decoded, data);
272    }
273
274    #[test]
275    fn test_constant_series_compresses_to_one_run() {
276        let data: Vec<(i64, f64)> = (0..1000).map(|i| (i as i64 * 1000, 7.5)).collect();
277        let runs = rle_encode(&data).expect("encode");
278        assert_eq!(
279            runs.len(),
280            1,
281            "constant series should produce exactly one run"
282        );
283        assert_eq!(runs[0].count, 1000);
284        assert_eq!(runs[0].value, 7.5);
285        assert_eq!(runs[0].start_timestamp, 0);
286        assert_eq!(runs[0].end_timestamp, 999 * 1000);
287    }
288
289    #[test]
290    fn test_alternating_values_round_trip() {
291        let data: Vec<(i64, f64)> = (0..10)
292            .map(|i| (i as i64 * 100, if i % 2 == 0 { 0.0 } else { 1.0 }))
293            .collect();
294        let runs = rle_encode(&data).expect("encode");
295        // Every sample changes → 10 runs
296        assert_eq!(runs.len(), 10);
297        let decoded = rle_decode(&runs);
298        assert_eq!(decoded, data);
299    }
300
301    #[test]
302    fn test_step_function() {
303        // [0,0,0,1,1,1,0,0,2,2]
304        let data: Vec<(i64, f64)> = vec![
305            (0, 0.0),
306            (1, 0.0),
307            (2, 0.0),
308            (3, 1.0),
309            (4, 1.0),
310            (5, 1.0),
311            (6, 0.0),
312            (7, 0.0),
313            (8, 2.0),
314            (9, 2.0),
315        ];
316        let runs = rle_encode(&data).expect("encode");
317        assert_eq!(runs.len(), 4);
318        assert_eq!(runs[0].count, 3);
319        assert_eq!(runs[1].count, 3);
320        assert_eq!(runs[2].count, 2);
321        assert_eq!(runs[3].count, 2);
322        let decoded = rle_decode(&runs);
323        assert_eq!(decoded, data);
324    }
325
326    #[test]
327    fn test_nan_not_coalesced() {
328        // Two NaN with different bit patterns should NOT be merged
329        let nan1 = f64::from_bits(0x7FF8_0000_0000_0001);
330        let nan2 = f64::from_bits(0x7FF8_0000_0000_0002);
331        let data = vec![(0i64, nan1), (1000, nan2)];
332        let runs = rle_encode(&data).expect("encode");
333        assert_eq!(runs.len(), 2, "different NaN payloads should not be merged");
334    }
335
336    #[test]
337    fn test_positive_zero_vs_negative_zero() {
338        // +0.0 and -0.0 have different bit patterns, so should NOT be merged
339        let pos_zero = 0.0f64;
340        let neg_zero = -0.0f64;
341        assert_ne!(pos_zero.to_bits(), neg_zero.to_bits());
342        let data = vec![(0i64, pos_zero), (1000, neg_zero)];
343        let runs = rle_encode(&data).expect("encode");
344        assert_eq!(runs.len(), 2);
345    }
346
347    #[test]
348    fn test_out_of_order_timestamps_error() {
349        let mut enc = RleEncoder::new();
350        enc.push(2000, 1.0).expect("push ok");
351        enc.push(1000, 1.0)
352            .expect_err("should fail: ts goes backward");
353    }
354
355    #[test]
356    fn test_rle_block_compression_ratio() {
357        let data: Vec<(i64, f64)> = (0..10_000)
358            .map(|i| (i as i64 * 100, (i / 100) as f64))
359            .collect();
360        let block = RleBlock::from_data(&data).expect("build block");
361        assert_eq!(block.run_count(), 100); // 100 distinct constant regions
362        let ratio = block.compression_ratio();
363        assert!(
364            ratio > 1.0,
365            "should have positive compression: {:.2}",
366            ratio
367        );
368    }
369
370    #[test]
371    fn test_rle_block_decode_round_trip() {
372        let data: Vec<(i64, f64)> = vec![
373            (0, 10.0),
374            (1000, 10.0),
375            (2000, 10.0),
376            (3000, 20.0),
377            (4000, 20.0),
378            (5000, 30.0),
379        ];
380        let block = RleBlock::from_data(&data).expect("build block");
381        let decoded = block.decode();
382        assert_eq!(decoded, data);
383    }
384
385    #[test]
386    fn test_encoder_default() {
387        let enc = RleEncoder::default();
388        let runs = enc.finish();
389        assert!(runs.is_empty());
390    }
391
392    #[test]
393    fn test_large_constant_block() {
394        let n = 100_000usize;
395        let data: Vec<(i64, f64)> = (0..n)
396            .map(|i| (i as i64 * 10, std::f64::consts::PI))
397            .collect();
398        let runs = rle_encode(&data).expect("encode");
399        assert_eq!(runs.len(), 1);
400        assert_eq!(runs[0].count, n as u32);
401    }
402}