tsz_compress/
compress.rs

1use bitvec::prelude::*;
2pub use tsz_macro::*;
3
4///
5/// Use `u8` as the storage type for a `BitVec` to represent
6/// bits for easier conversion to and from bytes.
7///
8pub type BitBuffer = BitVec<u8, Lsb0>;
9pub type BitBufferSlice = BitSlice<u8, Lsb0>;
10
11///
12/// A `Compressor` instance holds the state of the compression process.
13///
14/// Implement the `Compress` trait for your data type.
15/// Create a `Compressor` instance.
16/// Call `compress` for each row of data, handing off your `Compressor` instance.
17/// Call `finish` to get the compressed data.
18///
19#[derive(Default)]
20pub struct Compressor<T: Compress> {
21    pub output: BitBuffer,
22    pub row_n: Option<T>,
23    pub row_n1: Option<T>,
24}
25
26///
27/// A `Decompressor` instance holds the state of the decompression process.
28///
29/// Implement the `Decompress` trait for your data type.
30/// Create a `Decompressor` instance.
31/// Call `decompress` to iterate over the decompressed rows, handing off your `Decompressor` instance.
32///
33pub struct Decompressor<'de> {
34    pub input: &'de BitBufferSlice,
35}
36
37///
38/// Implement this trait for your data type to be able to compress it.
39///
40/// Derive the `Compressible` trait to get a default implementation.
41/// Derive the `DeltaEncodable` trait to get a default implementation for a `RowDelta`, which will be used as the `Delta` type.
42///
43pub trait Compress: Copy + Sized {
44    /// Full and Delta may differ in signedness or storage
45    /// Full is the representation of the value as a whole
46    /// Delta is the representation of the difference between the value and the previous value or the difference between differences
47    type Full: IntoCompressBits;
48    type Delta: IntoCompressBits;
49
50    fn into_full(self) -> Self::Full;
51    fn into_delta(self, prev_row: &Self) -> Self::Delta;
52    fn into_deltadelta(self, prev_prev_row: &Self, prev_row: &Self) -> Self::Delta;
53}
54
55///
56/// Implement this trait for your data type to be able to decompress it.
57///
58/// Derive the `Decompressible` trait to get a default implementation.
59/// Derive the `DeltaEncodable` trait to get a default implementation for a `RowDelta`, which will be used as the `Delta` type.
60///
61pub trait Decompress: Copy + Sized {
62    type Full: FromCompressBits;
63    type Delta: FromCompressBits;
64
65    fn from_full(bits: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str>;
66    fn from_delta<'a>(
67        bits: &'a BitBufferSlice,
68        prev_row: &Self,
69    ) -> Result<(Self, &'a BitBufferSlice), &'static str>;
70    fn from_deltadelta<'a>(
71        bits: &'a BitBufferSlice,
72        prev_row: &Self,
73        prev_prev_row: &Self,
74    ) -> Result<(Self, &'a BitBufferSlice), &'static str>;
75}
76
77pub trait IntoCompressBits: Sized {
78    fn into_bits(self, out: &mut BitBuffer);
79}
80
81pub trait FromCompressBits: Sized {
82    fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str>;
83}
84
85impl<T: Compress> Compressor<T> {
86    ///
87    /// Create a new `Compressor` instance.
88    ///
89    /// # Arguments
90    /// * `size_hint` - The number of bytes estimated to be in the final compressed data.
91    ///
92    pub fn new(size_hint: usize) -> Self {
93        Self {
94            output: BitBuffer::with_capacity(size_hint * 8),
95            row_n: None,
96            row_n1: None,
97        }
98    }
99
100    ///
101    /// Compress a row of data.
102    ///
103    /// Handles the first two rows differently to subsequent rows.
104    ///
105    pub fn compress(&mut self, row: T) {
106        let Some(row_n) = self.row_n.take() else {
107            self.row_n = Some(row);
108
109            // The first row is represented as the each value
110            // Encoded to unsigned VLQ
111            let representation = row.into_full();
112            representation.into_bits(&mut self.output);
113
114            return;
115        };
116
117        let Some(row_n1) = self.row_n1.take() else {
118            self.row_n = Some(row_n);
119            self.row_n1 = Some(row);
120
121            // The second row is represented as the difference between the first row and the second row
122            // Encoded to Gorilla Delta-Delta Encoding
123            let representation = row.into_delta(&row_n);
124            representation.into_bits(&mut self.output);
125
126            return;
127        };
128
129        // Each subsequent row is represented as the deltadelta = (row - row_n1) - (row_n1 - row_n)
130        // Encoded to Gorilla Delta-Delta Encoding
131        let representation = row.into_deltadelta(&row_n, &row_n1);
132        representation.into_bits(&mut self.output);
133
134        // Move the rows along
135
136        self.row_n = Some(row_n1);
137        self.row_n1 = Some(row);
138    }
139
140    ///
141    /// Check the number of bytes in the compressed data.
142    ///
143    pub fn len(&self) -> usize {
144        // Round up to the nearest byte
145        let num_bits = self.output.len();
146        (num_bits + 7) / 8
147    }
148
149    ///
150    /// Check if the compressed data is empty.
151    ///
152    pub fn is_empty(&self) -> bool {
153        self.output.is_empty()
154    }
155
156    ///
157    /// Take the compressed data, suitable for constructing a `Decompressor` instance.
158    ///
159    pub fn finish(self) -> BitBuffer {
160        self.output
161    }
162}
163
164impl<'de> Decompressor<'de> {
165    ///
166    /// Create a new `Decompressor` instance that will decompress bits from the given data.
167    ///
168    pub fn new(input: &'de BitBufferSlice) -> Self {
169        Self { input }
170    }
171
172    ///
173    /// Decompress the data into an iterator over the rows.
174    ///
175    pub fn decompress<T: Decompress>(&mut self) -> DecompressIter<'_, T> {
176        DecompressIter {
177            input: self.input,
178            finished: false,
179            first_row: None,
180            second_row: None,
181            t_prev_prev: None,
182            t_prev: None,
183        }
184    }
185}
186
187///
188/// An iterator over the decompressed data.
189///
190#[derive(Clone)]
191pub struct DecompressIter<'a, T> {
192    input: &'a BitBufferSlice,
193    finished: bool,
194    first_row: Option<T>,
195    second_row: Option<T>,
196    t_prev_prev: Option<T>,
197    t_prev: Option<T>,
198}
199
200///
201/// This iterator is returned on a decompress call to a Decompressor instance.
202///
203impl<'a, T> Iterator for DecompressIter<'a, T>
204where
205    T: Decompress,
206{
207    type Item = Result<T, &'static str>;
208
209    fn next(&mut self) -> Option<Self::Item> {
210        if self.finished {
211            return None;
212        }
213
214        let Some(first_row) = self.first_row.as_ref() else {
215            // Base case, no data
216            if self.input.is_empty() {
217                self.finished = true;
218                return None;
219            }
220
221            // The first row is represented as the each value
222            // Encoded to unsigned VLQ
223            let (first_row, trailing) = match T::from_full(self.input) {
224                Ok(x) => x,
225                Err(e) => {
226                    self.finished = true;
227                    return Some(Err(e));
228                }
229            };
230            self.input = trailing;
231            self.first_row = Some(first_row);
232            return Some(Ok(first_row));
233        };
234
235        let Some(_second_row) = self.second_row.as_ref() else {
236            // Base case, one row
237            if self.input.is_empty() {
238                self.finished = true;
239                return None;
240            }
241
242            // The second row is represented as the difference between the first row and the second row
243            // Encoded to Gorilla Delta-Delta Encoding
244            let (second_row, trailing) = match T::from_delta(self.input, first_row) {
245                Ok(x) => x,
246                Err(e) => {
247                    self.finished = true;
248                    return Some(Err(e));
249                }
250            };
251            self.input = trailing;
252            self.second_row = Some(second_row);
253            self.t_prev_prev = Some(*first_row);
254            self.t_prev = Some(second_row);
255            return Some(Ok(second_row));
256        };
257
258        // Each subsequent row is represented as the deltadelta = (row - row_n1) - (row_n1 - row_n)
259        // Encoded to Gorilla Delta-Delta Encoding
260
261        if self.input.is_empty() {
262            self.finished = true;
263            return None;
264        }
265
266        let t_prev = self.t_prev.take().unwrap();
267        let t_prev_prev = self.t_prev_prev.take().unwrap();
268
269        // Read the deltadelta, D, and reconstruct the row, t
270        let (row, trailing) = match T::from_deltadelta(self.input, &t_prev, &t_prev_prev) {
271            Ok(x) => x,
272            Err(e) => {
273                self.finished = true;
274                return Some(Err(e));
275            }
276        };
277        self.input = trailing;
278        self.t_prev_prev = Some(t_prev);
279        self.t_prev = Some(row);
280
281        Some(Ok(row))
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use core::ops::Add;
288    use core::ops::Sub;
289    use rand::Rng;
290
291    use crate::delta::*;
292    use crate::svlq::*;
293    use crate::uvlq::*;
294
295    use super::*;
296
297    #[test]
298    fn test_compress() {
299        // A row of data with a timestamp
300        #[derive(Debug, Copy, Clone)]
301        struct TestRow {
302            ts: u64,
303            v8: u8,
304            v16: u16,
305            v32: u32,
306            v64: u64,
307            vi8: i8,
308            vi16: i16,
309            vi32: i32,
310            vi64: i64,
311        }
312
313        // A row to capture the difference between two rows
314        #[derive(Debug, Copy, Clone)]
315        struct TestRowDelta {
316            ts: i128,
317            v8: i16,
318            v16: i32,
319            v32: i64,
320            v64: i128,
321            vi8: i16,
322            vi16: i32,
323            vi32: i64,
324            vi64: i128,
325        }
326
327        // How to take the difference between two rows
328        impl Sub for TestRow {
329            type Output = TestRowDelta;
330
331            fn sub(self, rhs: Self) -> Self::Output {
332                Self::Output {
333                    ts: self.ts as i128 - rhs.ts as i128,
334                    v8: self.v8 as i16 - rhs.v8 as i16,
335                    v16: self.v16 as i32 - rhs.v16 as i32,
336                    v32: self.v32 as i64 - rhs.v32 as i64,
337                    v64: self.v64 as i128 - rhs.v64 as i128,
338                    vi8: self.vi8 as i16 - rhs.vi8 as i16,
339                    vi16: self.vi16 as i32 - rhs.vi16 as i32,
340                    vi32: self.vi32 as i64 - rhs.vi32 as i64,
341                    vi64: self.vi64 as i128 - rhs.vi64 as i128,
342                }
343            }
344        }
345
346        // How to add a delta to a row to get another row
347        impl Add<TestRowDelta> for TestRow {
348            type Output = TestRow;
349
350            fn add(self, rhs: TestRowDelta) -> Self::Output {
351                Self::Output {
352                    ts: (self.ts as i128 + rhs.ts) as u64,
353                    v8: (self.v8 as i16 + rhs.v8) as u8,
354                    v16: (self.v16 as i32 + rhs.v16) as u16,
355                    v32: (self.v32 as i64 + rhs.v32) as u32,
356                    v64: (self.v64 as i128 + rhs.v64) as u64,
357                    vi8: (self.vi8 as i16 + rhs.vi8) as i8,
358                    vi16: (self.vi16 as i32 + rhs.vi16) as i16,
359                    vi32: (self.vi32 as i64 + rhs.vi32) as i32,
360                    vi64: (self.vi64 as i128 + rhs.vi64) as i64,
361                }
362            }
363        }
364
365        // How to take the difference between two deltas
366        impl Sub for TestRowDelta {
367            type Output = TestRowDelta;
368
369            fn sub(self, rhs: Self) -> Self::Output {
370                Self::Output {
371                    ts: self.ts - rhs.ts,
372                    v8: self.v8 - rhs.v8,
373                    v16: self.v16 - rhs.v16,
374                    v32: self.v32 - rhs.v32,
375                    v64: self.v64 - rhs.v64,
376                    vi8: self.vi8 - rhs.vi8,
377                    vi16: self.vi16 - rhs.vi16,
378                    vi32: self.vi32 - rhs.vi32,
379                    vi64: self.vi64 - rhs.vi64,
380                }
381            }
382        }
383
384        // How to bit pack a row
385        impl IntoCompressBits for TestRow {
386            fn into_bits(self, out: &mut BitBuffer) {
387                out.extend(Uvlq::from(self.ts).bits);
388                out.extend(Uvlq::from(self.v8).bits);
389                out.extend(Uvlq::from(self.v16).bits);
390                out.extend(Uvlq::from(self.v32).bits);
391                out.extend(Uvlq::from(self.v64).bits);
392                out.extend(Svlq::from(self.vi8).bits);
393                out.extend(Svlq::from(self.vi16).bits);
394                out.extend(Svlq::from(self.vi32).bits);
395                out.extend(Svlq::from(self.vi64).bits);
396            }
397        }
398
399        // How to bit pack a delta
400        impl IntoCompressBits for TestRowDelta {
401            fn into_bits(self, out: &mut BitBuffer) {
402                if self.ts < i64::MIN as i128 && self.ts > i64::MAX as i128 {
403                    unimplemented!()
404                }
405                encode_delta_i64(self.ts as i64, out);
406
407                encode_delta_i16(self.v8, out);
408                encode_delta_i32(self.v16, out);
409                encode_delta_i64(self.v32, out);
410
411                if self.v64 < i128::MIN as i128 && self.v64 > i128::MAX as i128 {
412                    unimplemented!()
413                }
414                encode_delta_i64(self.v64 as i64, out);
415
416                encode_delta_i16(self.vi8, out);
417                encode_delta_i32(self.vi16, out);
418                encode_delta_i64(self.vi32, out);
419
420                if self.vi64 < i64::MIN as i128 && self.vi64 > i64::MAX as i128 {
421                    unimplemented!()
422                }
423                encode_delta_i64(self.vi64 as i64, out);
424            }
425        }
426
427        // How to unmarshal a row from a bit slice
428        impl FromCompressBits for TestRow {
429            fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str> {
430                let (ts, ts_bits) = <(u64, usize)>::try_from(UvlqRef(input))?;
431                let input = &input[ts_bits..];
432                let (v8, v8_bits) = <(u8, usize)>::try_from(UvlqRef(input))?;
433                let input = &input[v8_bits..];
434                let (v16, v16_bits) = <(u16, usize)>::try_from(UvlqRef(input))?;
435                let input = &input[v16_bits..];
436                let (v32, v32_bits) = <(u32, usize)>::try_from(UvlqRef(input))?;
437                let input = &input[v32_bits..];
438                let (v64, v64_bits) = <(u64, usize)>::try_from(UvlqRef(input))?;
439                let input = &input[v64_bits..];
440
441                let (vi8, vi8_bits) = <(i8, usize)>::try_from(SvlqRef(input))?;
442                let input = &input[vi8_bits..];
443                let (vi16, vi16_bits) = <(i16, usize)>::try_from(SvlqRef(input))?;
444                let input = &input[vi16_bits..];
445                let (vi32, vi32_bits) = <(i32, usize)>::try_from(SvlqRef(input))?;
446                let input = &input[vi32_bits..];
447                let (vi64, vi64_bits) = <(i64, usize)>::try_from(SvlqRef(input))?;
448                let input = &input[vi64_bits..];
449
450                Ok((
451                    Self {
452                        ts,
453                        v8,
454                        v16,
455                        v32,
456                        v64,
457                        vi8,
458                        vi16,
459                        vi32,
460                        vi64,
461                    },
462                    input,
463                ))
464            }
465        }
466
467        // How to unmarshal a delta from a bit slice
468        impl FromCompressBits for TestRowDelta {
469            fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str> {
470                let (ts, input) = decode_delta_i64(input)?;
471                let Some(input) = input else {
472                    return Err("Early EOF");
473                };
474                let (v8, input) = decode_delta_i16(input)?;
475                let Some(input) = input else {
476                    return Err("Early EOF");
477                };
478                let (v16, input) = decode_delta_i32(input)?;
479                let Some(input) = input else {
480                    return Err("Early EOF");
481                };
482                let (v32, input) = decode_delta_i64(input)?;
483                let Some(input) = input else {
484                    return Err("Early EOF");
485                };
486                let (v64, input) = decode_delta_i64(input)?;
487                let Some(input) = input else {
488                    return Err("Early EOF");
489                };
490                let (vi8, input) = decode_delta_i16(input)?;
491                let Some(input) = input else {
492                    return Err("Early EOF");
493                };
494                let (vi16, input) = decode_delta_i32(input)?;
495                let Some(input) = input else {
496                    return Err("Early EOF");
497                };
498                let (vi32, input) = decode_delta_i64(input)?;
499                let Some(input) = input else {
500                    return Err("Early EOF");
501                };
502                let (vi64, input) = decode_delta_i64(input)?;
503                let input = input.unwrap_or_default();
504
505                Ok((
506                    Self {
507                        ts: ts as i128,
508                        v8,
509                        v16,
510                        v32,
511                        v64: v64 as i128,
512                        vi8,
513                        vi16,
514                        vi32,
515                        vi64: vi64 as i128,
516                    },
517                    input,
518                ))
519            }
520        }
521
522        // How to compute the representations for a series of rows
523        impl Compress for TestRow {
524            type Full = TestRow;
525
526            type Delta = TestRowDelta;
527
528            fn into_full(self) -> Self::Full {
529                self
530            }
531
532            fn into_delta(self, prev_row: &Self) -> Self::Delta {
533                let r = self - *prev_row;
534                r
535            }
536
537            fn into_deltadelta(self, prev_prev_row: &Self, prev_row: &Self) -> Self::Delta {
538                (self - *prev_row) - (*prev_row - *prev_prev_row)
539            }
540        }
541
542        impl Decompress for TestRow {
543            type Full = TestRow;
544            type Delta = TestRowDelta;
545
546            fn from_full<'a>(
547                bits: &'a BitBufferSlice,
548            ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
549                TestRow::from_bits(bits).map_err(|_| "failed to unmarshal full row")
550            }
551
552            fn from_delta<'a>(
553                bits: &'a BitBufferSlice,
554                prev_row: &Self,
555            ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
556                let delta =
557                    TestRowDelta::from_bits(bits).map_err(|_| "failed to unmarshal delta row")?;
558                Ok((*prev_row + delta.0, delta.1))
559            }
560
561            fn from_deltadelta<'a>(
562                bits: &'a BitBufferSlice,
563                prev_row: &Self,
564                prev_prev_row: &Self,
565            ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
566                // t = D + (t_prev - t_prev_prev) + t_prev
567                let deltadelta = TestRowDelta::from_bits(bits)
568                    .map_err(|_| "failed to unmarshal deltadelta row")?;
569                Ok((
570                    *prev_row + (*prev_row - *prev_prev_row) + deltadelta.0,
571                    deltadelta.1,
572                ))
573            }
574        }
575
576        let mut compressor = Compressor::new(100);
577
578        let lower = -32;
579        let j = 0;
580        for i in lower..10isize {
581            let row = TestRow {
582                ts: (j + i - lower) as u64,
583                v8: (j + i - lower) as u8,
584                v16: (j + i - lower) as u16,
585                v32: (j + i - lower) as u32,
586                v64: (j + i - lower) as u64,
587                vi8: (j + i) as i8,
588                vi16: (j + i) as i16,
589                vi32: (j + i) as i32,
590                vi64: (j + i) as i64,
591            };
592            // j += i;
593            println!("compressing row {:?}", row);
594            compressor.compress(row);
595        }
596
597        let encoded = compressor.finish();
598        println!("{:?}", encoded);
599
600        let mut decompressor = Decompressor::new(&encoded);
601        for (idx, row) in decompressor.decompress::<TestRow>().enumerate() {
602            println!("{:?}: {:?}", idx, row);
603        }
604    }
605}