oxirs_tsdb/storage/
compression.rs

1//! Compression algorithms for time-series data
2//!
3//! Implements Gorilla compression (Facebook, VLDB 2015) for floating-point values
4//! and Delta-of-delta encoding for timestamps.
5
6use crate::error::{TsdbError, TsdbResult};
7use bit_vec::BitVec;
8use std::ops::Range;
9
10/// Gorilla compressor for floating-point time series
11///
12/// Based on Facebook's Gorilla: A Fast, Scalable, In-Memory Time Series Database
13/// Reference: <http://www.vldb.org/pvldb/vol8/p1816-teller.pdf>
14///
15/// # Algorithm
16///
17/// 1. XOR current value with previous value
18/// 2. If XOR == 0: store single '0' bit (value unchanged)
19/// 3. If XOR != 0: store '1' + compressed XOR using variable-length encoding
20///
21/// # Compression Ratio
22///
23/// - Temperature sensors: 100:1 (stable readings)
24/// - Vibration sensors: 10:1 (high variance)
25/// - Average IoT workloads: 30-50:1
26pub struct GorillaCompressor {
27    /// Previous value for XOR
28    prev_value: f64,
29
30    /// Previous XOR's leading zeros count
31    prev_leading_zeros: u8,
32
33    /// Previous XOR's trailing zeros count
34    prev_trailing_zeros: u8,
35
36    /// Compressed bit stream
37    bits: BitVec,
38
39    /// Number of values compressed (including first)
40    count: u32,
41}
42
43impl GorillaCompressor {
44    /// Create new compressor with first value
45    ///
46    /// The first value is stored uncompressed (64 bits).
47    /// A 32-bit count header is prepended when finish() is called.
48    pub fn new(first_value: f64) -> Self {
49        let mut bits = BitVec::new();
50
51        // Store first value uncompressed
52        let value_bits = first_value.to_bits();
53        for i in (0..64).rev() {
54            bits.push((value_bits >> i) & 1 == 1);
55        }
56
57        Self {
58            prev_value: first_value,
59            prev_leading_zeros: 64, // Initialize to max so first XOR always uses new block
60            prev_trailing_zeros: 64,
61            bits,
62            count: 1, // First value already compressed
63        }
64    }
65
66    /// Compress a single value
67    pub fn compress(&mut self, value: f64) {
68        let xor = value.to_bits() ^ self.prev_value.to_bits();
69
70        if xor == 0 {
71            // Value unchanged: store single '0' bit
72            self.bits.push(false);
73        } else {
74            self.bits.push(true); // Value changed
75
76            let leading_zeros = xor.leading_zeros() as u8;
77            let trailing_zeros = xor.trailing_zeros() as u8;
78
79            // Check if we can reuse the previous block window
80            // Only if current XOR fits within the previous window
81            if self.prev_leading_zeros < 64
82                && leading_zeros >= self.prev_leading_zeros
83                && trailing_zeros >= self.prev_trailing_zeros
84            {
85                // Use previous block size: store '0' + meaningful bits
86                self.bits.push(false);
87
88                let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
89                let shifted_xor = xor >> self.prev_trailing_zeros;
90
91                for i in (0..meaningful_bits).rev() {
92                    self.bits.push((shifted_xor >> i) & 1 == 1);
93                }
94            } else {
95                // New block size: store '1' + leading zeros (5 bits) + block size (6 bits) + meaningful bits
96                self.bits.push(true);
97
98                // Encode leading zeros (5 bits, capped at 31)
99                let leading_zeros_capped = leading_zeros.min(31);
100                for i in (0..5).rev() {
101                    self.bits.push((leading_zeros_capped >> i) & 1 == 1);
102                }
103
104                // Encode meaningful bits count (6 bits)
105                // meaningful_bits = 64 - leading_zeros - trailing_zeros
106                // If leading_zeros was capped, adjust trailing_zeros
107                let actual_leading = leading_zeros_capped;
108                let meaningful_bits = (64 - leading_zeros - trailing_zeros).max(1);
109                for i in (0..6).rev() {
110                    self.bits.push((meaningful_bits >> i) & 1 == 1);
111                }
112
113                // Encode meaningful bits
114                let shifted_xor = xor >> trailing_zeros;
115                for i in (0..meaningful_bits).rev() {
116                    self.bits.push((shifted_xor >> i) & 1 == 1);
117                }
118
119                self.prev_leading_zeros = actual_leading;
120                self.prev_trailing_zeros = 64 - actual_leading - meaningful_bits;
121            }
122        }
123
124        self.prev_value = value;
125        self.count += 1;
126    }
127
128    /// Finish compression and return compressed bytes
129    ///
130    /// Returns: [count (4 bytes BE)] + [compressed bit stream]
131    pub fn finish(self) -> Vec<u8> {
132        let mut result = Vec::with_capacity(4 + (self.bits.len() + 7) / 8);
133        // Prepend count as big-endian u32
134        result.extend_from_slice(&self.count.to_be_bytes());
135        result.extend_from_slice(&self.bits.to_bytes());
136        result
137    }
138
139    /// Get current compression ratio
140    pub fn compression_ratio(&self, original_count: usize) -> f64 {
141        let original_bytes = original_count * 8; // f64 = 8 bytes
142        let compressed_bytes = 4 + (self.bits.len() + 7) / 8; // 4 byte header + bits
143        original_bytes as f64 / compressed_bytes as f64
144    }
145}
146
147/// Gorilla decompressor for floating-point time series
148pub struct GorillaDecompressor {
149    /// Compressed bit stream
150    bits: BitVec,
151
152    /// Current position in bit stream
153    pos: usize,
154
155    /// Previous value for XOR
156    prev_value: f64,
157
158    /// Previous XOR's leading zeros
159    prev_leading_zeros: u8,
160
161    /// Previous XOR's trailing zeros
162    prev_trailing_zeros: u8,
163
164    /// Total number of values to decompress
165    total_count: u32,
166
167    /// Number of values already decompressed
168    decompressed_count: u32,
169}
170
171impl GorillaDecompressor {
172    /// Create new decompressor from compressed bytes
173    ///
174    /// Expected format: [count (4 bytes BE)] + [compressed bit stream]
175    pub fn new(compressed: &[u8]) -> TsdbResult<Self> {
176        if compressed.len() < 4 {
177            return Err(TsdbError::Decompression(
178                "Compressed data too short (missing count header)".to_string(),
179            ));
180        }
181
182        // Read count header (first 4 bytes)
183        let total_count =
184            u32::from_be_bytes([compressed[0], compressed[1], compressed[2], compressed[3]]);
185
186        // Create BitVec from remaining bytes (after count header)
187        let bits = BitVec::from_bytes(&compressed[4..]);
188
189        if bits.len() < 64 {
190            return Err(TsdbError::Decompression(
191                "Compressed data too short (missing first value)".to_string(),
192            ));
193        }
194
195        // Read first value (uncompressed, 64 bits)
196        let first_value_bits = Self::read_bits_as_u64(&bits, 0..64);
197        let first_value = f64::from_bits(first_value_bits);
198
199        Ok(Self {
200            bits,
201            pos: 64,
202            prev_value: first_value,
203            prev_leading_zeros: 64, // Initialize to max to match compressor
204            prev_trailing_zeros: 64,
205            total_count,
206            decompressed_count: 1, // First value already read
207        })
208    }
209
210    /// Get the first value (always available)
211    pub fn first_value(&self) -> f64 {
212        self.prev_value
213    }
214
215    /// Get total number of values
216    pub fn total_count(&self) -> u32 {
217        self.total_count
218    }
219
220    /// Decompress next value
221    ///
222    /// Returns the next decompressed value, or None if all values have been read.
223    pub fn next_value(&mut self) -> Option<f64> {
224        // Check if we've decompressed all values
225        if self.decompressed_count >= self.total_count {
226            return None;
227        }
228
229        if self.pos >= self.bits.len() {
230            return None;
231        }
232
233        let changed = self.bits[self.pos];
234        self.pos += 1;
235
236        if !changed {
237            // Value unchanged
238            self.decompressed_count += 1;
239            return Some(self.prev_value);
240        }
241
242        if self.pos >= self.bits.len() {
243            return None;
244        }
245
246        let use_prev_block = !self.bits[self.pos]; // Note: '0' means use prev block, '1' means new block
247        self.pos += 1;
248
249        let meaningful_bits = if use_prev_block && self.prev_leading_zeros < 64 {
250            // Use previous block size
251            64 - self.prev_leading_zeros - self.prev_trailing_zeros
252        } else {
253            // Read new block size
254            if self.pos + 11 > self.bits.len() {
255                return None; // Not enough bits
256            }
257
258            let leading_zeros = Self::read_bits_as_u8(&self.bits, self.pos..self.pos + 5);
259            self.pos += 5;
260
261            let block_size = Self::read_bits_as_u8(&self.bits, self.pos..self.pos + 6);
262            self.pos += 6;
263
264            // block_size is the number of meaningful bits stored
265            let trailing_zeros = (64 - leading_zeros).saturating_sub(block_size);
266            self.prev_leading_zeros = leading_zeros;
267            self.prev_trailing_zeros = trailing_zeros;
268
269            block_size
270        };
271
272        if meaningful_bits == 0 {
273            // Edge case: no meaningful bits (shouldn't happen in normal data)
274            self.decompressed_count += 1;
275            return Some(self.prev_value);
276        }
277
278        if self.pos + meaningful_bits as usize > self.bits.len() {
279            return None; // Not enough bits
280        }
281
282        // Read meaningful bits
283        let xor_shifted =
284            Self::read_bits_as_u64(&self.bits, self.pos..self.pos + meaningful_bits as usize);
285        self.pos += meaningful_bits as usize;
286
287        // Shift back to original position - use checked shift to avoid overflow
288        let xor = if self.prev_trailing_zeros >= 64 {
289            0
290        } else {
291            xor_shifted << self.prev_trailing_zeros
292        };
293        let value_bits = self.prev_value.to_bits() ^ xor;
294        let value = f64::from_bits(value_bits);
295
296        self.prev_value = value;
297        self.decompressed_count += 1;
298        Some(value)
299    }
300
301    /// Decompress all values
302    pub fn decompress_all(mut self) -> Vec<f64> {
303        let first = self.first_value();
304        let mut values = vec![first];
305
306        while let Some(value) = self.next_value() {
307            values.push(value);
308        }
309
310        values
311    }
312
313    /// Read bits from range as u64
314    fn read_bits_as_u64(bits: &BitVec, range: Range<usize>) -> u64 {
315        let mut result: u64 = 0;
316        let range_len = range.len();
317        for (i, bit_idx) in range.enumerate() {
318            if bit_idx < bits.len() && bits[bit_idx] {
319                result |= 1 << (range_len - 1 - i);
320            }
321        }
322        result
323    }
324
325    /// Read bits from range as u8
326    fn read_bits_as_u8(bits: &BitVec, range: Range<usize>) -> u8 {
327        Self::read_bits_as_u64(bits, range) as u8
328    }
329}
330
331/// Delta-of-delta compressor for timestamps
332///
333/// Exploits regularity in sensor sampling intervals.
334/// For 1Hz regular sampling, compresses timestamps to <2 bits per sample.
335pub struct DeltaOfDeltaCompressor {
336    /// Previous timestamp (milliseconds since epoch)
337    prev_timestamp: i64,
338
339    /// Previous delta
340    prev_delta: i64,
341
342    /// Compressed bit stream
343    bits: BitVec,
344
345    /// Number of timestamps compressed (including first)
346    count: u32,
347}
348
349impl DeltaOfDeltaCompressor {
350    /// Create new compressor with first timestamp
351    ///
352    /// A 32-bit count header is prepended when finish() is called.
353    pub fn new(first_timestamp: i64) -> Self {
354        let mut bits = BitVec::new();
355
356        // Store first timestamp uncompressed (64 bits)
357        for i in (0..64).rev() {
358            bits.push((first_timestamp >> i) & 1 == 1);
359        }
360
361        Self {
362            prev_timestamp: first_timestamp,
363            prev_delta: 0,
364            bits,
365            count: 1, // First timestamp already compressed
366        }
367    }
368
369    /// Compress a timestamp
370    pub fn compress(&mut self, timestamp: i64) {
371        let delta = timestamp - self.prev_timestamp;
372        let delta_of_delta = delta - self.prev_delta;
373
374        // Variable-length encoding based on delta_of_delta magnitude
375        match delta_of_delta {
376            0 => {
377                // No change: single '0' bit
378                self.bits.push(false);
379            }
380            -63..=64 => {
381                // Small delta: '10' + 7-bit signed value
382                self.bits.push(true);
383                self.bits.push(false);
384                self.encode_signed(delta_of_delta, 7);
385            }
386            -255..=256 => {
387                // Medium delta: '110' + 9-bit signed value
388                self.bits.push(true);
389                self.bits.push(true);
390                self.bits.push(false);
391                self.encode_signed(delta_of_delta, 9);
392            }
393            -2047..=2048 => {
394                // Large delta: '1110' + 12-bit signed value
395                self.bits.push(true);
396                self.bits.push(true);
397                self.bits.push(true);
398                self.bits.push(false);
399                self.encode_signed(delta_of_delta, 12);
400            }
401            _ => {
402                // Huge delta: '1111' + 64-bit value
403                self.bits.push(true);
404                self.bits.push(true);
405                self.bits.push(true);
406                self.bits.push(true);
407                self.encode_signed(delta_of_delta, 64);
408            }
409        }
410
411        self.prev_timestamp = timestamp;
412        self.prev_delta = delta;
413        self.count += 1;
414    }
415
416    /// Encode signed integer with specified bit count
417    fn encode_signed(&mut self, value: i64, bit_count: usize) {
418        for i in (0..bit_count).rev() {
419            self.bits.push((value >> i) & 1 == 1);
420        }
421    }
422
423    /// Finish compression and return compressed bytes
424    ///
425    /// Returns: [count (4 bytes BE)] + [compressed bit stream]
426    pub fn finish(self) -> Vec<u8> {
427        let mut result = Vec::with_capacity(4 + (self.bits.len() + 7) / 8);
428        // Prepend count as big-endian u32
429        result.extend_from_slice(&self.count.to_be_bytes());
430        result.extend_from_slice(&self.bits.to_bytes());
431        result
432    }
433
434    /// Get current compression ratio
435    pub fn compression_ratio(&self, original_count: usize) -> f64 {
436        let original_bytes = original_count * 8; // i64 = 8 bytes
437        let compressed_bytes = 4 + (self.bits.len() + 7) / 8; // 4 byte header + bits
438        original_bytes as f64 / compressed_bytes as f64
439    }
440}
441
442/// Delta-of-delta decompressor for timestamps
443pub struct DeltaOfDeltaDecompressor {
444    /// Compressed bit stream
445    bits: BitVec,
446
447    /// Current position in bit stream
448    pos: usize,
449
450    /// Previous timestamp
451    prev_timestamp: i64,
452
453    /// Previous delta
454    prev_delta: i64,
455
456    /// Total number of timestamps to decompress
457    total_count: u32,
458
459    /// Number of timestamps already decompressed
460    decompressed_count: u32,
461}
462
463impl DeltaOfDeltaDecompressor {
464    /// Create new decompressor from compressed bytes
465    ///
466    /// Expected format: [count (4 bytes BE)] + [compressed bit stream]
467    pub fn new(compressed: &[u8]) -> TsdbResult<Self> {
468        if compressed.len() < 4 {
469            return Err(TsdbError::Decompression(
470                "Compressed timestamp data too short (missing count header)".to_string(),
471            ));
472        }
473
474        // Read count header (first 4 bytes)
475        let total_count =
476            u32::from_be_bytes([compressed[0], compressed[1], compressed[2], compressed[3]]);
477
478        // Create BitVec from remaining bytes (after count header)
479        let bits = BitVec::from_bytes(&compressed[4..]);
480
481        if bits.len() < 64 {
482            return Err(TsdbError::Decompression(
483                "Compressed timestamp data too short (missing first timestamp)".to_string(),
484            ));
485        }
486
487        // Read first timestamp (uncompressed, 64 bits)
488        let first_timestamp = Self::read_bits_as_i64(&bits, 0..64);
489
490        Ok(Self {
491            bits,
492            pos: 64,
493            prev_timestamp: first_timestamp,
494            prev_delta: 0,
495            total_count,
496            decompressed_count: 1, // First timestamp already read
497        })
498    }
499
500    /// Get first timestamp
501    pub fn first_timestamp(&self) -> i64 {
502        self.prev_timestamp
503    }
504
505    /// Get total number of timestamps
506    pub fn total_count(&self) -> u32 {
507        self.total_count
508    }
509
510    /// Decompress next timestamp
511    ///
512    /// Returns the next decompressed timestamp, or None if all timestamps have been read.
513    pub fn next_timestamp(&mut self) -> Option<i64> {
514        // Check if we've decompressed all timestamps
515        if self.decompressed_count >= self.total_count {
516            return None;
517        }
518
519        if self.pos >= self.bits.len() {
520            return None;
521        }
522
523        // Read prefix to determine encoding
524        let prefix = self.read_prefix();
525
526        let delta_of_delta = match prefix {
527            0 => 0, // '0' - no change
528            1 => {
529                // '10' - 7-bit signed
530                if self.pos + 7 > self.bits.len() {
531                    return None;
532                }
533                let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 7);
534                self.pos += 7;
535                Self::sign_extend(value, 7)
536            }
537            2 => {
538                // '110' - 9-bit signed
539                if self.pos + 9 > self.bits.len() {
540                    return None;
541                }
542                let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 9);
543                self.pos += 9;
544                Self::sign_extend(value, 9)
545            }
546            3 => {
547                // '1110' - 12-bit signed
548                if self.pos + 12 > self.bits.len() {
549                    return None;
550                }
551                let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 12);
552                self.pos += 12;
553                Self::sign_extend(value, 12)
554            }
555            4 => {
556                // '1111' - 64-bit signed
557                if self.pos + 64 > self.bits.len() {
558                    return None;
559                }
560                let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 64);
561                self.pos += 64;
562                value
563            }
564            _ => return None,
565        };
566
567        let delta = self.prev_delta + delta_of_delta;
568        let timestamp = self.prev_timestamp + delta;
569
570        self.prev_timestamp = timestamp;
571        self.prev_delta = delta;
572        self.decompressed_count += 1;
573
574        Some(timestamp)
575    }
576
577    /// Read variable-length prefix
578    fn read_prefix(&mut self) -> u8 {
579        if self.pos >= self.bits.len() || !self.bits[self.pos] {
580            self.pos += 1;
581            return 0; // '0'
582        }
583
584        self.pos += 1;
585        if self.pos >= self.bits.len() || !self.bits[self.pos] {
586            self.pos += 1;
587            return 1; // '10'
588        }
589
590        self.pos += 1;
591        if self.pos >= self.bits.len() || !self.bits[self.pos] {
592            self.pos += 1;
593            return 2; // '110'
594        }
595
596        self.pos += 1;
597        if self.pos >= self.bits.len() || !self.bits[self.pos] {
598            self.pos += 1;
599            return 3; // '1110'
600        }
601
602        self.pos += 1;
603        4 // '1111'
604    }
605
606    /// Read bits from range as i64
607    fn read_bits_as_i64(bits: &BitVec, range: Range<usize>) -> i64 {
608        let mut result: i64 = 0;
609        let range_len = range.len();
610        for (i, bit_idx) in range.enumerate() {
611            if bit_idx < bits.len() && bits[bit_idx] {
612                result |= 1 << (range_len - 1 - i);
613            }
614        }
615        result
616    }
617
618    /// Sign-extend a value from specified bit count
619    fn sign_extend(value: i64, bits: usize) -> i64 {
620        let sign_bit = 1i64 << (bits - 1);
621        if value & sign_bit != 0 {
622            // Negative: fill upper bits with 1
623            value | (!0i64 << bits)
624        } else {
625            // Positive: already correct
626            value
627        }
628    }
629
630    /// Decompress all timestamps
631    pub fn decompress_all(mut self) -> Vec<i64> {
632        let first = self.first_timestamp();
633        let mut timestamps = vec![first];
634
635        while let Some(timestamp) = self.next_timestamp() {
636            timestamps.push(timestamp);
637        }
638
639        timestamps
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646
647    #[test]
648    fn test_gorilla_compression_unchanged() {
649        // Test case: all values are the same (1000 values to amortize header overhead)
650        let mut compressor = GorillaCompressor::new(25.0);
651        for _ in 0..999 {
652            compressor.compress(25.0);
653        }
654
655        let compressed = compressor.finish();
656        let original_size = 1000 * 8; // 1000 values × 8 bytes
657        let compressed_size = compressed.len();
658
659        println!(
660            "Unchanged values (1000): {} bytes → {} bytes (ratio: {:.1}:1)",
661            original_size,
662            compressed_size,
663            original_size as f64 / compressed_size as f64
664        );
665
666        // Should compress extremely well (mostly '0' bits)
667        // 4 byte header + 8 byte first value + 999 bits = ~137 bytes
668        // Ratio: 8000/137 ≈ 58:1
669        assert!(compressed_size < original_size / 50); // > 50:1 ratio
670    }
671
672    #[test]
673    fn test_gorilla_round_trip() {
674        // Test case: Temperature sensor data with small variations
675        let values = vec![20.0, 20.1, 20.2, 20.1, 20.0, 20.1, 20.2, 20.3, 20.2, 20.1];
676
677        let mut compressor = GorillaCompressor::new(values[0]);
678        for &value in &values[1..] {
679            compressor.compress(value);
680        }
681
682        let compressed = compressor.finish();
683        let decompressor = GorillaDecompressor::new(&compressed).unwrap();
684        let decompressed = decompressor.decompress_all();
685
686        assert_eq!(values.len(), decompressed.len());
687
688        // Values should match exactly (no precision loss)
689        for (original, decompressed_val) in values.iter().zip(decompressed.iter()) {
690            assert_eq!(original, decompressed_val);
691        }
692    }
693
694    #[test]
695    fn test_delta_of_delta_regular_sampling() {
696        // Test case: Regular 1Hz sampling (delta always 1000ms)
697        let mut timestamps = Vec::new();
698        let base = 1640000000000i64; // Jan 2022
699        for i in 0..1000 {
700            timestamps.push(base + i * 1000); // 1 second intervals
701        }
702
703        let mut compressor = DeltaOfDeltaCompressor::new(timestamps[0]);
704        for &ts in &timestamps[1..] {
705            compressor.compress(ts);
706        }
707
708        let compressed = compressor.finish();
709        let original_size = 1000 * 8; // 1000 timestamps × 8 bytes
710        let compressed_size = compressed.len();
711
712        println!(
713            "Regular 1Hz sampling: {} bytes → {} bytes (ratio: {:.1}:1)",
714            original_size,
715            compressed_size,
716            original_size as f64 / compressed_size as f64
717        );
718
719        // Should compress very well (delta_of_delta always 0 → single '0' bit)
720        assert!(compressed_size < original_size / 30); // > 30:1 ratio
721
722        // Verify round-trip
723        let decompressor = DeltaOfDeltaDecompressor::new(&compressed).unwrap();
724        let decompressed = decompressor.decompress_all();
725
726        assert_eq!(timestamps, decompressed);
727    }
728
729    #[test]
730    fn test_delta_of_delta_irregular_sampling() {
731        // Test case: Irregular sampling with some gaps
732        let timestamps = vec![
733            1000, 2000, 3000, 3100, 5000, 6000, 7000, // Gap at 3100 and 5000
734        ];
735
736        let mut compressor = DeltaOfDeltaCompressor::new(timestamps[0]);
737        for &ts in &timestamps[1..] {
738            compressor.compress(ts);
739        }
740
741        let compressed = compressor.finish();
742
743        // Verify round-trip
744        let decompressor = DeltaOfDeltaDecompressor::new(&compressed).unwrap();
745        let decompressed = decompressor.decompress_all();
746
747        assert_eq!(timestamps, decompressed);
748    }
749
750    #[test]
751    fn test_compression_ratio_calculation() {
752        let mut compressor = GorillaCompressor::new(100.0);
753        for i in 0..99 {
754            compressor.compress(100.0 + (i as f64 * 0.1));
755        }
756
757        let ratio = compressor.compression_ratio(100);
758        assert!(ratio > 1.0); // Should have some compression
759        println!("Compression ratio: {:.1}:1", ratio);
760    }
761}