gortsz/
lib.rs

1#![no_std]
2#![feature(generic_arg_infer)]
3
4use core::{cell::Cell, error::Error, fmt::Display, ops::Not};
5
6pub use bitvec::prelude::*;
7use stats::{CompressionOptions, CompressionStatsExt, NoStats};
8
9pub mod stats;
10
11// sign contract for i32.
12// NB: this returns a BitArray which will be u8 aligned. Callers must
13//     slice the first $n bits to get the correct result.
14macro_rules! sign_contract_32 {
15    ($value:expr, $n:expr) => {{
16        let mut result = bitarr!(u8, Lsb0; 0; 32);
17        let value = $value as i32;
18        value
19            .to_le_bytes()
20            .view_bits::<Lsb0>()
21            .into_iter()
22            .enumerate()
23            .filter_map(|(i, b)| if i < $n - 1 || i == 31 { Some(b) } else { None })
24            .enumerate()
25            .for_each(|(i, b)| result.set(i, *b));
26        result
27    }};
28}
29
30pub struct CompressError<'a> {
31    pub valid_bits: &'a BitSlice<u8>,
32    pub entries_processed: usize,
33}
34
35pub fn compress<'a, 'b, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>>(
36    series: impl IntoIterator<Item = &'a (u32, [f32; N])>,
37    buf: &'b mut BitSlice<u8>,
38) -> Result<&'b BitSlice<u8>, CompressError<'b>> {
39    compress_with_stats(series, buf, &mut NoStats::<BC, OPTS>::new())
40}
41/// Facebook's Gorilla compression algorithm for time series data.
42/// The timestamps are delta-to-delta encoded and the data is XOR encoded.
43pub fn compress_with_stats<
44    'a,
45    'b,
46    const N: usize,
47    const BC: usize,
48    OPTS: CompressionOptions<BC>,
49>(
50    series: impl IntoIterator<Item = &'a (u32, [f32; N])>,
51    buf: &'b mut BitSlice<u8>,
52    stats: &mut impl CompressionStatsExt<BC, OPTS>,
53) -> Result<&'b BitSlice<u8>, CompressError<'b>> {
54    let mut series = series.into_iter();
55    let mut index = 0;
56    let last_valid_index = Cell::new(0);
57
58    macro_rules! write_bits_ {
59        ($b:expr, $idx:expr) => {
60            if buf.len() < index + $b.len() {
61                return Err(CompressError {
62                    valid_bits: &buf[..last_valid_index.get()],
63                    entries_processed: $idx,
64                });
65            }
66            buf[index..(index + $b.len())].copy_from_bitslice($b);
67            index += $b.len();
68        };
69    }
70    macro_rules! write_bit_ {
71        ($b:expr, $idx:expr) => {
72            if buf.len() < index + 1 {
73                return Err(CompressError {
74                    valid_bits: &buf[..last_valid_index.get()],
75                    entries_processed: $idx,
76                });
77            }
78            buf.set(index, $b);
79            index += 1;
80        };
81    }
82
83    let (mut previous_time, mut previous_data) =
84        *series.next().expect("Time series must not be empty");
85    let mut previous_delta = 0i32;
86    let mut previous_xors = [0u32; N];
87
88    write_bits_!(previous_time.to_le_bytes().view_bits::<Lsb0>(), 0);
89    for bytes in previous_data.iter().map(|it| it.to_le_bytes()) {
90        write_bits_!(BitSlice::<u8, Lsb0>::from_slice(&bytes), 0);
91    }
92
93    for (row_index, &(time, ref data)) in series.enumerate() {
94        macro_rules! write_bit {
95            ($b:expr) => {
96                write_bit_!($b, row_index);
97            };
98        }
99        macro_rules! write_bits {
100            ($b:expr) => {
101                write_bits_!($b, row_index);
102            };
103        }
104
105        let delta = (time as i64 - previous_time as i64) as i32;
106        let delta_delta = delta - previous_delta;
107
108        if delta_delta == 0 {
109            write_bits!(bits![u8, Lsb0; 0u8]);
110            stats.increment_repeated_count();
111        } else {
112            let mut fit_in_bin = false;
113            for (i, bin) in OPTS::DELTA_DELTA_BINS.iter().enumerate() {
114                if delta_delta >= -(1 << (bin - 1)) && delta_delta < (1 << (bin - 1)) {
115                    // Write i + 1 one bits then a zero bit
116                    for _ in 0..(i + 1) {
117                        write_bits!(bits![u8, Lsb0; 1]);
118                    }
119                    write_bits!(bits![u8, Lsb0; 0]);
120
121                    // Write the delta delta
122                    let bits = &sign_contract_32!(delta_delta, *bin as usize)[..*bin as usize];
123                    write_bits!(&bits);
124                    fit_in_bin = true;
125                    stats.increment_bin(i);
126                    break;
127                }
128            }
129            if !fit_in_bin {
130                // Write n+1 one bits and no zero bit
131                for _ in 0..(OPTS::DELTA_DELTA_BINS.len() + 1) {
132                    write_bits!(bits![u8, Lsb0; 1]);
133                }
134
135                // Write the delta delta
136                let bits = sign_contract_32!(delta_delta, 32);
137                stats.increment_overflow_count();
138                write_bits!(&bits);
139            }
140        }
141
142        previous_delta = delta;
143        previous_time = time;
144
145        for ((&d, previous_d), previous_xor) in data
146            .iter()
147            .zip(previous_data.iter_mut())
148            .zip(previous_xors.iter_mut())
149        {
150            let xor = d.to_bits() ^ previous_d.to_bits();
151
152            if xor == 0 {
153                // No change, write a zero bit
154                write_bits!(bits![u8, Lsb0; 0]);
155                *previous_xor = 0;
156                continue;
157            } else {
158                write_bits!(bits![u8, Lsb0; 1]);
159            }
160
161            let leading_zeros = xor.leading_zeros();
162            let trailing_zeros = xor.trailing_zeros();
163            let prev_leading_zeros = previous_xor.leading_zeros();
164            let prev_trailing_zeros = previous_xor.trailing_zeros();
165
166            // NB: if previous_xor is zero, we will read 32 for prev_leading_zeros and prev_trailing_zeros
167            //     however, this is fine as in the two cases:
168            //      - xor is also zero, in which case we already handled it above
169            //      - xor is not zero, in which case leading_zeros and trailing_zeros will be less than 32
170            //        so we will enter the second branch below.
171
172            if leading_zeros >= prev_leading_zeros && trailing_zeros >= prev_trailing_zeros {
173                // The new data is fully contained within the previous data's leading and trailing zeros
174                write_bits!(bits![u8, Lsb0; 0]);
175
176                let n_bits_to_write = 32 - prev_leading_zeros - prev_trailing_zeros;
177                let compressed = xor
178                    .view_bits::<Lsb0>()
179                    .iter()
180                    .skip(prev_trailing_zeros as usize)
181                    .take(n_bits_to_write as usize);
182
183                for bit in compressed {
184                    let bit = *bit;
185                    write_bit!(bit);
186                }
187            } else {
188                let leading_zeros = leading_zeros.min(0b1111);
189
190                write_bits!(bits![u8, Lsb0; 1]);
191
192                // this is kind of a weird unexplained decision in the whitepaper,
193                // why use 5 bits for a f64 (adapted to 4 bits for f32 here)?
194                let leading_zero_bits = leading_zeros.to_le_bytes();
195                write_bits!(&leading_zero_bits.view_bits::<Lsb0>()[..4]);
196
197                let n_meaningful_bits = 32 - leading_zeros - trailing_zeros;
198                write_bits!(&(n_meaningful_bits - 1).to_le_bytes().view_bits::<Lsb0>()[..5]);
199
200                for b in xor
201                    .view_bits::<Lsb0>()
202                    .iter()
203                    .skip(trailing_zeros as usize)
204                    .take(n_meaningful_bits as usize)
205                {
206                    write_bit!(*b);
207                }
208            }
209
210            *previous_d = d;
211            *previous_xor = xor;
212        }
213
214        last_valid_index.set(index);
215    }
216
217    Ok(buf.get(0..index).expect("checked bounds already"))
218}
219
220pub struct Decompressor<'a, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>> {
221    buf: &'a BitSlice<u8>,
222    index: usize,
223    previous_time: u32,
224    previous_delta: i32,
225    previous_data: [f32; N],
226    previous_xors: [u32; N],
227    failed: bool,
228    _options: core::marker::PhantomData<OPTS>,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232pub enum DecompressError {
233    MissingHeader,
234    CorruptedTimestamp { index: usize },
235    CorruptedData { index: usize },
236}
237
238impl Display for DecompressError {
239    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
240        match self {
241            DecompressError::MissingHeader => write!(f, "Missing header in the compressed data"),
242            DecompressError::CorruptedTimestamp { index } => {
243                write!(f, "Corrupted timestamp at bit {index}")
244            }
245            DecompressError::CorruptedData { index } => {
246                write!(f, "Corrupted data at bit {index}")
247            }
248        }
249    }
250}
251
252impl Error for DecompressError {
253    fn source(&self) -> Option<&(dyn Error + 'static)> {
254        None
255    }
256
257    fn description(&self) -> &str {
258        "description() is deprecated; use Display"
259    }
260
261    fn cause(&self) -> Option<&dyn Error> {
262        self.source()
263    }
264}
265
266impl<'a, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>>
267    Decompressor<'a, N, BC, OPTS>
268{
269    pub fn new(buf: &'a BitSlice<u8>) -> Self {
270        Self {
271            buf,
272            index: 0,
273            previous_time: 0,
274            previous_data: [0.0; N],
275            previous_delta: 0,
276            previous_xors: [0; N],
277            failed: false,
278            _options: core::marker::PhantomData,
279        }
280    }
281}
282
283impl<'a, const SAMPLES_PER_ROW: usize, const BC: usize, OPTS: CompressionOptions<BC>> Iterator
284    for Decompressor<'a, SAMPLES_PER_ROW, BC, OPTS>
285{
286    type Item = Result<(u32, [f32; SAMPLES_PER_ROW]), DecompressError>;
287
288    fn next(&mut self) -> Option<Self::Item> {
289        let consume = |index: &mut usize, n: usize| {
290            if self.buf.len() - *index < n {
291                return None;
292            }
293            let bits = self.buf.get(*index..*index + n)?;
294            *index += n;
295            Some(bits)
296        };
297        if self.failed || self.index >= self.buf.len() {
298            return None; // No more data to decompress
299        }
300
301        if self.index == 0 {
302            if self.buf.len() < 32 + (SAMPLES_PER_ROW * 32) {
303                self.failed = true;
304                return Some(Err(DecompressError::MissingHeader));
305            }
306
307            let first_time: u32 = consume(&mut self.index, 32)
308                .expect("checked bounds already")
309                .load_le();
310            let first_data: [f32; SAMPLES_PER_ROW] = self.buf[32..(32 + (SAMPLES_PER_ROW * 32))]
311                .chunks_exact(32)
312                .map(|chunk| f32::from_bits(chunk.load_le()))
313                .collect::<heapless::Vec<f32, SAMPLES_PER_ROW>>()
314                .into_array()
315                .expect("Impossible for N f32's to not fit into an array of N f32's");
316
317            self.index = 32 + (SAMPLES_PER_ROW * 32);
318            self.previous_time = first_time;
319            self.previous_data = first_data;
320
321            return Some(Ok((self.previous_time, self.previous_data)));
322        }
323
324        let timestamp_ctl_bits = self
325            .buf
326            .get(self.index..)
327            .expect("Checked bounds already")
328            .leading_ones();
329
330        self.index += core::cmp::min(timestamp_ctl_bits + 1, OPTS::DELTA_DELTA_BINS.len() + 1);
331
332        if timestamp_ctl_bits == 0 {
333            // No change in delta
334            if let Some(t) = self.previous_time.checked_add_signed(self.previous_delta) {
335                self.previous_time = t;
336            } else {
337                self.failed = true;
338                return Some(Err(DecompressError::CorruptedTimestamp {
339                    index: self.index,
340                }));
341            }
342        } else {
343            let n_dd_bits = *OPTS::DELTA_DELTA_BINS
344                .get(timestamp_ctl_bits - 1)
345                .unwrap_or(&32) as usize;
346
347            let delta_delta: i32 = if let Some(dd) = consume(&mut self.index, n_dd_bits) {
348                dd.load_le()
349            } else {
350                self.failed = true;
351                return Some(Err(DecompressError::CorruptedTimestamp {
352                    index: self.index,
353                }));
354            };
355
356            let delta = self.previous_delta + delta_delta;
357            self.previous_time = self.previous_time.wrapping_add(delta as u32);
358            self.previous_delta = delta;
359        };
360
361        for (previous_d, previous_xor) in self
362            .previous_data
363            .iter_mut()
364            .zip(self.previous_xors.iter_mut())
365        {
366            let new_data_bit = if let Some(x) = consume(&mut self.index, 1) {
367                x[0]
368            } else {
369                self.failed = true;
370                return Some(Err(DecompressError::CorruptedTimestamp {
371                    index: self.index,
372                }));
373            };
374
375            if new_data_bit.not() {
376                // No change, just copy the previous value
377                *previous_xor = 0;
378                continue;
379            } else {
380                let xor_ctl_bit = if let Some(x) = consume(&mut self.index, 1) {
381                    x[0]
382                } else {
383                    self.failed = true;
384                    return Some(Err(DecompressError::CorruptedData { index: self.index }));
385                };
386
387                if xor_ctl_bit.not() {
388                    // The new data is fully contained within the previous data's leading and trailing zeros
389                    let n_bits_to_read =
390                        (32 - previous_xor.leading_zeros() - previous_xor.trailing_zeros())
391                            as usize;
392                    let compressed = if let Some(x) = consume(&mut self.index, n_bits_to_read) {
393                        x
394                    } else {
395                        self.failed = true;
396                        return Some(Err(DecompressError::CorruptedData { index: self.index }));
397                    };
398
399                    let xor = compressed.load_le::<u32>() << previous_xor.trailing_zeros();
400                    *previous_d = f32::from_bits(xor ^ previous_d.to_bits());
401                    *previous_xor = xor;
402                } else {
403                    let leading_zero_bits = if let Some(x) = consume(&mut self.index, 4) {
404                        x.load_le::<u32>()
405                    } else {
406                        self.failed = true;
407                        return Some(Err(DecompressError::CorruptedData { index: self.index }));
408                    };
409
410                    let meaningful_bits_len = if let Some(x) = consume(&mut self.index, 5) {
411                        // + 1 to convert to 0-based indexing
412                        x.load_le::<u32>() + 1
413                    } else {
414                        self.failed = true;
415                        return Some(Err(DecompressError::CorruptedData { index: self.index }));
416                    };
417
418                    if leading_zero_bits + meaningful_bits_len > 32 {
419                        self.failed = true;
420                        return Some(Err(DecompressError::CorruptedData { index: self.index }));
421                    }
422
423                    let trailing_zeros = 32 - leading_zero_bits - meaningful_bits_len;
424
425                    let meaningful_bits =
426                        if let Some(x) = consume(&mut self.index, meaningful_bits_len as usize) {
427                            x
428                        } else {
429                            self.failed = true;
430                            return Some(Err(DecompressError::CorruptedData { index: self.index }));
431                        };
432
433                    let xor = meaningful_bits.load_le::<u32>() << trailing_zeros;
434                    *previous_d = f32::from_bits(xor ^ previous_d.to_bits());
435                    *previous_xor = xor;
436                }
437            }
438        }
439
440        return Some(Ok((self.previous_time, self.previous_data)));
441    }
442}
443
444#[cfg(test)]
445extern crate std;
446
447#[cfg(test)]
448mod tests {
449    use crate::stats::{CompressionStats, whitepaper::WhitepaperOptions};
450    use std::{eprintln, println, vec::Vec};
451
452    use super::*;
453
454    #[test]
455    fn compression_test() {
456        const TIMESERIES: &'static str = include_str!("../samples.csv");
457        const COLUMNS: usize = const_str::split!(const_str::split!(TIMESERIES, '\n')[0], ',').len();
458
459        let timeseries: Vec<_> = TIMESERIES
460            .lines()
461            .map(|line| {
462                let mut spl = line.split(',');
463                let time = spl.next().unwrap().trim().parse::<f32>().unwrap() as u32;
464                let samples: [f32; COLUMNS - 1] = spl
465                    .map(|col| col.trim().parse::<f32>().unwrap())
466                    .collect::<heapless::Vec<_, { COLUMNS - 1 }>>()
467                    .into_array()
468                    .expect("checked size already");
469                (time, samples)
470            })
471            .collect();
472
473        let mut buf = [0u8; 250];
474        let mut entries_processed = timeseries.len();
475        let compressed = match compress::<{ COLUMNS - 1 }, 3, WhitepaperOptions>(
476            timeseries.iter(),
477            buf.view_bits_mut(),
478        ) {
479            Ok(compressed) => compressed,
480            Err(CompressError {
481                valid_bits,
482                entries_processed: entries,
483            }) => {
484                eprintln!(
485                    "warn: Buffer too small, only compressed {entries} entries out of {}",
486                    timeseries.len()
487                );
488                entries_processed = entries;
489                valid_bits
490            }
491        };
492
493        let compressed_size_bits = compressed.len();
494        let data_size_bytes = entries_processed * (4 + 4 * timeseries[0].1.len());
495
496        let decompressor = Decompressor::<_, _, WhitepaperOptions>::new(compressed);
497        for (result, reference) in decompressor.zip(timeseries.iter()) {
498            match result {
499                Ok((time, data)) => {
500                    assert!(time == reference.0);
501                    assert!(data == reference.1);
502                }
503                Err(e) => {
504                    panic!("Decompression failed: {e}");
505                }
506            }
507        }
508
509        println!(
510            "Compressed size: {} bytes, originally {} bytes. Compression ratio: {:.2}%",
511            compressed_size_bits / 8,
512            data_size_bytes,
513            compressed_size_bits as f64 / 8. * 100.0 / data_size_bytes as f64
514        );
515    }
516}