Skip to main content

aegis_timeseries/
compression.rs

1//! Aegis Time Series Compression
2//!
3//! Gorilla-style compression for time series data using delta-of-delta
4//! encoding for timestamps and XOR-based encoding for values.
5//!
6//! @version 0.1.0
7//! @author AutomataNexus Development Team
8
9use crate::types::DataPoint;
10use chrono::DateTime;
11#[cfg(test)]
12use chrono::Utc;
13
14// =============================================================================
15// Bit Writer
16// =============================================================================
17
18/// Bit-level writer for compression.
19struct BitWriter {
20    buffer: Vec<u8>,
21    current_byte: u8,
22    bit_position: u8,
23}
24
25impl BitWriter {
26    fn new() -> Self {
27        Self {
28            buffer: Vec::new(),
29            current_byte: 0,
30            bit_position: 0,
31        }
32    }
33
34    fn write_bit(&mut self, bit: bool) {
35        if bit {
36            self.current_byte |= 1 << (7 - self.bit_position);
37        }
38        self.bit_position += 1;
39
40        if self.bit_position == 8 {
41            self.buffer.push(self.current_byte);
42            self.current_byte = 0;
43            self.bit_position = 0;
44        }
45    }
46
47    fn write_bits(&mut self, value: u64, num_bits: u8) {
48        for i in (0..num_bits).rev() {
49            self.write_bit((value >> i) & 1 == 1);
50        }
51    }
52
53    fn finish(mut self) -> Vec<u8> {
54        if self.bit_position > 0 {
55            self.buffer.push(self.current_byte);
56        }
57        self.buffer
58    }
59}
60
61// =============================================================================
62// Bit Reader
63// =============================================================================
64
65/// Bit-level reader for decompression.
66struct BitReader<'a> {
67    data: &'a [u8],
68    byte_position: usize,
69    bit_position: u8,
70}
71
72impl<'a> BitReader<'a> {
73    fn new(data: &'a [u8]) -> Self {
74        Self {
75            data,
76            byte_position: 0,
77            bit_position: 0,
78        }
79    }
80
81    fn read_bit(&mut self) -> Option<bool> {
82        if self.byte_position >= self.data.len() {
83            return None;
84        }
85
86        let bit = (self.data[self.byte_position] >> (7 - self.bit_position)) & 1 == 1;
87        self.bit_position += 1;
88
89        if self.bit_position == 8 {
90            self.byte_position += 1;
91            self.bit_position = 0;
92        }
93
94        Some(bit)
95    }
96
97    fn read_bits(&mut self, num_bits: u8) -> Option<u64> {
98        let mut value = 0u64;
99        for _ in 0..num_bits {
100            value = (value << 1) | (self.read_bit()? as u64);
101        }
102        Some(value)
103    }
104}
105
106// =============================================================================
107// Compressor
108// =============================================================================
109
110/// Gorilla-style time series compressor.
111pub struct Compressor {
112    writer: BitWriter,
113    first_timestamp: Option<i64>,
114    prev_timestamp: i64,
115    prev_delta: i64,
116    prev_value_bits: u64,
117    prev_leading_zeros: u8,
118    prev_trailing_zeros: u8,
119    count: usize,
120}
121
122impl Compressor {
123    pub fn new() -> Self {
124        Self {
125            writer: BitWriter::new(),
126            first_timestamp: None,
127            prev_timestamp: 0,
128            prev_delta: 0,
129            prev_value_bits: 0,
130            prev_leading_zeros: 64,
131            prev_trailing_zeros: 64,
132            count: 0,
133        }
134    }
135
136    /// Compress a data point.
137    pub fn compress(&mut self, point: &DataPoint) {
138        let timestamp = point.timestamp_millis();
139        let value_bits = point.value.to_bits();
140
141        if self.first_timestamp.is_none() {
142            self.first_timestamp = Some(timestamp);
143            self.writer.write_bits(timestamp as u64, 64);
144            self.writer.write_bits(value_bits, 64);
145            self.prev_timestamp = timestamp;
146            self.prev_value_bits = value_bits;
147            self.count = 1;
148            return;
149        }
150
151        self.compress_timestamp(timestamp);
152        self.compress_value(value_bits);
153
154        self.prev_timestamp = timestamp;
155        self.prev_value_bits = value_bits;
156        self.count += 1;
157    }
158
159    fn compress_timestamp(&mut self, timestamp: i64) {
160        let delta = timestamp - self.prev_timestamp;
161        let delta_of_delta = delta - self.prev_delta;
162
163        if delta_of_delta == 0 {
164            self.writer.write_bit(false);
165        } else if (-63..=64).contains(&delta_of_delta) {
166            self.writer.write_bits(0b10, 2);
167            self.writer.write_bits((delta_of_delta + 63) as u64, 7);
168        } else if (-255..=256).contains(&delta_of_delta) {
169            self.writer.write_bits(0b110, 3);
170            self.writer.write_bits((delta_of_delta + 255) as u64, 9);
171        } else if (-2047..=2048).contains(&delta_of_delta) {
172            self.writer.write_bits(0b1110, 4);
173            self.writer.write_bits((delta_of_delta + 2047) as u64, 12);
174        } else {
175            self.writer.write_bits(0b1111, 4);
176            self.writer.write_bits(delta_of_delta as u64, 64);
177        }
178
179        self.prev_delta = delta;
180    }
181
182    fn compress_value(&mut self, value_bits: u64) {
183        let xor = self.prev_value_bits ^ value_bits;
184
185        if xor == 0 {
186            self.writer.write_bit(false);
187            return;
188        }
189
190        self.writer.write_bit(true);
191
192        let leading_zeros = xor.leading_zeros() as u8;
193        let trailing_zeros = xor.trailing_zeros() as u8;
194
195        if leading_zeros >= self.prev_leading_zeros && trailing_zeros >= self.prev_trailing_zeros {
196            self.writer.write_bit(false);
197            let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
198            let shifted = xor >> self.prev_trailing_zeros;
199            self.writer.write_bits(shifted, meaningful_bits);
200        } else {
201            self.writer.write_bit(true);
202            self.writer.write_bits(leading_zeros as u64, 6);
203
204            let meaningful_bits = 64 - leading_zeros - trailing_zeros;
205            self.writer.write_bits(meaningful_bits as u64, 6);
206
207            let shifted = xor >> trailing_zeros;
208            self.writer.write_bits(shifted, meaningful_bits);
209
210            self.prev_leading_zeros = leading_zeros;
211            self.prev_trailing_zeros = trailing_zeros;
212        }
213    }
214
215    /// Finish compression and return the compressed data.
216    pub fn finish(self) -> CompressedBlock {
217        let data = self.writer.finish();
218        let checksum = crc32fast::hash(&data);
219        CompressedBlock {
220            data,
221            first_timestamp: self.first_timestamp.unwrap_or(0),
222            last_timestamp: self.prev_timestamp,
223            count: self.count,
224            checksum,
225        }
226    }
227}
228
229impl Default for Compressor {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235// =============================================================================
236// Compressed Block
237// =============================================================================
238
239/// A compressed block of time series data.
240#[derive(Debug, Clone)]
241pub struct CompressedBlock {
242    pub data: Vec<u8>,
243    pub first_timestamp: i64,
244    pub last_timestamp: i64,
245    pub count: usize,
246    pub checksum: u32,
247}
248
249impl CompressedBlock {
250    /// Get compression ratio.
251    pub fn compression_ratio(&self) -> f64 {
252        let uncompressed_size = self.count * 16;
253        if self.data.is_empty() {
254            return 1.0;
255        }
256        uncompressed_size as f64 / self.data.len() as f64
257    }
258
259    /// Verify block integrity using CRC32 checksum.
260    pub fn verify_checksum(&self) -> bool {
261        crc32fast::hash(&self.data) == self.checksum
262    }
263}
264
265// =============================================================================
266// Decompressor
267// =============================================================================
268
269/// Gorilla-style time series decompressor.
270#[allow(dead_code)]
271pub struct Decompressor<'a> {
272    reader: BitReader<'a>,
273    first_timestamp: i64,
274    prev_timestamp: i64,
275    prev_delta: i64,
276    prev_value_bits: u64,
277    prev_leading_zeros: u8,
278    prev_trailing_zeros: u8,
279    remaining: usize,
280    first_read: bool,
281}
282
283impl<'a> Decompressor<'a> {
284    pub fn new(block: &'a CompressedBlock) -> Self {
285        Self {
286            reader: BitReader::new(&block.data),
287            first_timestamp: block.first_timestamp,
288            prev_timestamp: 0,
289            prev_delta: 0,
290            prev_value_bits: 0,
291            prev_leading_zeros: 0,
292            prev_trailing_zeros: 0,
293            remaining: block.count,
294            first_read: true,
295        }
296    }
297
298    /// Decompress the next data point.
299    pub fn next(&mut self) -> Option<DataPoint> {
300        if self.remaining == 0 {
301            return None;
302        }
303
304        self.remaining -= 1;
305
306        if self.first_read {
307            self.first_read = false;
308            let timestamp = self.reader.read_bits(64)? as i64;
309            let value_bits = self.reader.read_bits(64)?;
310            self.prev_timestamp = timestamp;
311            self.prev_value_bits = value_bits;
312
313            return Some(DataPoint {
314                timestamp: DateTime::from_timestamp_millis(timestamp)?,
315                value: f64::from_bits(value_bits),
316            });
317        }
318
319        let timestamp = self.decompress_timestamp()?;
320        let value_bits = self.decompress_value()?;
321
322        self.prev_timestamp = timestamp;
323        self.prev_value_bits = value_bits;
324
325        Some(DataPoint {
326            timestamp: DateTime::from_timestamp_millis(timestamp)?,
327            value: f64::from_bits(value_bits),
328        })
329    }
330
331    fn decompress_timestamp(&mut self) -> Option<i64> {
332        let delta_of_delta = if !self.reader.read_bit()? {
333            0i64
334        } else if !self.reader.read_bit()? {
335            self.reader.read_bits(7)? as i64 - 63
336        } else if !self.reader.read_bit()? {
337            self.reader.read_bits(9)? as i64 - 255
338        } else if !self.reader.read_bit()? {
339            self.reader.read_bits(12)? as i64 - 2047
340        } else {
341            self.reader.read_bits(64)? as i64
342        };
343
344        self.prev_delta += delta_of_delta;
345        Some(self.prev_timestamp + self.prev_delta)
346    }
347
348    fn decompress_value(&mut self) -> Option<u64> {
349        if !self.reader.read_bit()? {
350            return Some(self.prev_value_bits);
351        }
352
353        let xor = if !self.reader.read_bit()? {
354            let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
355            let value = self.reader.read_bits(meaningful_bits)?;
356            value << self.prev_trailing_zeros
357        } else {
358            let leading_zeros = self.reader.read_bits(6)? as u8;
359            let meaningful_bits = self.reader.read_bits(6)? as u8;
360            let trailing_zeros = 64 - leading_zeros - meaningful_bits;
361
362            self.prev_leading_zeros = leading_zeros;
363            self.prev_trailing_zeros = trailing_zeros;
364
365            let value = self.reader.read_bits(meaningful_bits)?;
366            value << trailing_zeros
367        };
368
369        Some(self.prev_value_bits ^ xor)
370    }
371
372    /// Decompress all points.
373    pub fn decompress_all(&mut self) -> Vec<DataPoint> {
374        let mut points = Vec::with_capacity(self.remaining);
375        while let Some(point) = self.next() {
376            points.push(point);
377        }
378        points
379    }
380}
381
382// =============================================================================
383// Tests
384// =============================================================================
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use chrono::Duration;
390
391    #[test]
392    fn test_compress_decompress() {
393        let mut compressor = Compressor::new();
394
395        let base_time = Utc::now();
396        let points: Vec<DataPoint> = (0..100)
397            .map(|i| DataPoint {
398                timestamp: base_time + Duration::seconds(i),
399                value: 42.0 + (i as f64 * 0.1),
400            })
401            .collect();
402
403        for point in &points {
404            compressor.compress(point);
405        }
406
407        let block = compressor.finish();
408        assert!(block.compression_ratio() > 1.0);
409
410        let mut decompressor = Decompressor::new(&block);
411        let decompressed = decompressor.decompress_all();
412
413        assert_eq!(decompressed.len(), points.len());
414        for (original, decoded) in points.iter().zip(decompressed.iter()) {
415            assert_eq!(original.value, decoded.value);
416        }
417    }
418
419    #[test]
420    fn test_compression_ratio() {
421        let mut compressor = Compressor::new();
422
423        let base_time = Utc::now();
424        for i in 0..1000 {
425            compressor.compress(&DataPoint {
426                timestamp: base_time + Duration::seconds(i),
427                value: 100.0 + (i as f64 % 10.0),
428            });
429        }
430
431        let block = compressor.finish();
432        let ratio = block.compression_ratio();
433
434        assert!(ratio > 2.0, "Expected compression ratio > 2, got {}", ratio);
435    }
436}