1#![no_std]
2#![feature(generic_arg_infer)]
3
4use core::{cell::Cell, error::Error, fmt::Display, ops::Not};
5
6pub use bitvec::prelude::*;
7use stats::{CompressionOptions, CompressionStatsExt, NoStats};
8
9pub mod stats;
10
11macro_rules! sign_contract_32 {
15 ($value:expr, $n:expr) => {{
16 let mut result = bitarr!(u8, Lsb0; 0; 32);
17 let value = $value as i32;
18 value
19 .to_le_bytes()
20 .view_bits::<Lsb0>()
21 .into_iter()
22 .enumerate()
23 .filter_map(|(i, b)| if i < $n - 1 || i == 31 { Some(b) } else { None })
24 .enumerate()
25 .for_each(|(i, b)| result.set(i, *b));
26 result
27 }};
28}
29
30pub struct CompressError<'a> {
31 pub valid_bits: &'a BitSlice<u8>,
32 pub entries_processed: usize,
33}
34
35pub fn compress<'a, 'b, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>>(
36 series: impl IntoIterator<Item = &'a (u32, [f32; N])>,
37 buf: &'b mut BitSlice<u8>,
38) -> Result<&'b BitSlice<u8>, CompressError<'b>> {
39 compress_with_stats(series, buf, &mut NoStats::<BC, OPTS>::new())
40}
41pub fn compress_with_stats<
44 'a,
45 'b,
46 const N: usize,
47 const BC: usize,
48 OPTS: CompressionOptions<BC>,
49>(
50 series: impl IntoIterator<Item = &'a (u32, [f32; N])>,
51 buf: &'b mut BitSlice<u8>,
52 stats: &mut impl CompressionStatsExt<BC, OPTS>,
53) -> Result<&'b BitSlice<u8>, CompressError<'b>> {
54 let mut series = series.into_iter();
55 let mut index = 0;
56 let last_valid_index = Cell::new(0);
57
58 macro_rules! write_bits_ {
59 ($b:expr, $idx:expr) => {
60 if buf.len() < index + $b.len() {
61 return Err(CompressError {
62 valid_bits: &buf[..last_valid_index.get()],
63 entries_processed: $idx,
64 });
65 }
66 buf[index..(index + $b.len())].copy_from_bitslice($b);
67 index += $b.len();
68 };
69 }
70 macro_rules! write_bit_ {
71 ($b:expr, $idx:expr) => {
72 if buf.len() < index + 1 {
73 return Err(CompressError {
74 valid_bits: &buf[..last_valid_index.get()],
75 entries_processed: $idx,
76 });
77 }
78 buf.set(index, $b);
79 index += 1;
80 };
81 }
82
83 let (mut previous_time, mut previous_data) =
84 *series.next().expect("Time series must not be empty");
85 let mut previous_delta = 0i32;
86 let mut previous_xors = [0u32; N];
87
88 write_bits_!(previous_time.to_le_bytes().view_bits::<Lsb0>(), 0);
89 for bytes in previous_data.iter().map(|it| it.to_le_bytes()) {
90 write_bits_!(BitSlice::<u8, Lsb0>::from_slice(&bytes), 0);
91 }
92
93 for (row_index, &(time, ref data)) in series.enumerate() {
94 macro_rules! write_bit {
95 ($b:expr) => {
96 write_bit_!($b, row_index);
97 };
98 }
99 macro_rules! write_bits {
100 ($b:expr) => {
101 write_bits_!($b, row_index);
102 };
103 }
104
105 let delta = (time as i64 - previous_time as i64) as i32;
106 let delta_delta = delta - previous_delta;
107
108 if delta_delta == 0 {
109 write_bits!(bits![u8, Lsb0; 0u8]);
110 stats.increment_repeated_count();
111 } else {
112 let mut fit_in_bin = false;
113 for (i, bin) in OPTS::DELTA_DELTA_BINS.iter().enumerate() {
114 if delta_delta >= -(1 << (bin - 1)) && delta_delta < (1 << (bin - 1)) {
115 for _ in 0..(i + 1) {
117 write_bits!(bits![u8, Lsb0; 1]);
118 }
119 write_bits!(bits![u8, Lsb0; 0]);
120
121 let bits = &sign_contract_32!(delta_delta, *bin as usize)[..*bin as usize];
123 write_bits!(&bits);
124 fit_in_bin = true;
125 stats.increment_bin(i);
126 break;
127 }
128 }
129 if !fit_in_bin {
130 for _ in 0..(OPTS::DELTA_DELTA_BINS.len() + 1) {
132 write_bits!(bits![u8, Lsb0; 1]);
133 }
134
135 let bits = sign_contract_32!(delta_delta, 32);
137 stats.increment_overflow_count();
138 write_bits!(&bits);
139 }
140 }
141
142 previous_delta = delta;
143 previous_time = time;
144
145 for ((&d, previous_d), previous_xor) in data
146 .iter()
147 .zip(previous_data.iter_mut())
148 .zip(previous_xors.iter_mut())
149 {
150 let xor = d.to_bits() ^ previous_d.to_bits();
151
152 if xor == 0 {
153 write_bits!(bits![u8, Lsb0; 0]);
155 *previous_xor = 0;
156 continue;
157 } else {
158 write_bits!(bits![u8, Lsb0; 1]);
159 }
160
161 let leading_zeros = xor.leading_zeros();
162 let trailing_zeros = xor.trailing_zeros();
163 let prev_leading_zeros = previous_xor.leading_zeros();
164 let prev_trailing_zeros = previous_xor.trailing_zeros();
165
166 if leading_zeros >= prev_leading_zeros && trailing_zeros >= prev_trailing_zeros {
173 write_bits!(bits![u8, Lsb0; 0]);
175
176 let n_bits_to_write = 32 - prev_leading_zeros - prev_trailing_zeros;
177 let compressed = xor
178 .view_bits::<Lsb0>()
179 .iter()
180 .skip(prev_trailing_zeros as usize)
181 .take(n_bits_to_write as usize);
182
183 for bit in compressed {
184 let bit = *bit;
185 write_bit!(bit);
186 }
187 } else {
188 let leading_zeros = leading_zeros.min(0b1111);
189
190 write_bits!(bits![u8, Lsb0; 1]);
191
192 let leading_zero_bits = leading_zeros.to_le_bytes();
195 write_bits!(&leading_zero_bits.view_bits::<Lsb0>()[..4]);
196
197 let n_meaningful_bits = 32 - leading_zeros - trailing_zeros;
198 write_bits!(&(n_meaningful_bits - 1).to_le_bytes().view_bits::<Lsb0>()[..5]);
199
200 for b in xor
201 .view_bits::<Lsb0>()
202 .iter()
203 .skip(trailing_zeros as usize)
204 .take(n_meaningful_bits as usize)
205 {
206 write_bit!(*b);
207 }
208 }
209
210 *previous_d = d;
211 *previous_xor = xor;
212 }
213
214 last_valid_index.set(index);
215 }
216
217 Ok(buf.get(0..index).expect("checked bounds already"))
218}
219
220pub struct Decompressor<'a, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>> {
221 buf: &'a BitSlice<u8>,
222 index: usize,
223 previous_time: u32,
224 previous_delta: i32,
225 previous_data: [f32; N],
226 previous_xors: [u32; N],
227 failed: bool,
228 _options: core::marker::PhantomData<OPTS>,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232pub enum DecompressError {
233 MissingHeader,
234 CorruptedTimestamp { index: usize },
235 CorruptedData { index: usize },
236}
237
238impl Display for DecompressError {
239 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
240 match self {
241 DecompressError::MissingHeader => write!(f, "Missing header in the compressed data"),
242 DecompressError::CorruptedTimestamp { index } => {
243 write!(f, "Corrupted timestamp at bit {index}")
244 }
245 DecompressError::CorruptedData { index } => {
246 write!(f, "Corrupted data at bit {index}")
247 }
248 }
249 }
250}
251
252impl Error for DecompressError {
253 fn source(&self) -> Option<&(dyn Error + 'static)> {
254 None
255 }
256
257 fn description(&self) -> &str {
258 "description() is deprecated; use Display"
259 }
260
261 fn cause(&self) -> Option<&dyn Error> {
262 self.source()
263 }
264}
265
266impl<'a, const N: usize, const BC: usize, OPTS: CompressionOptions<BC>>
267 Decompressor<'a, N, BC, OPTS>
268{
269 pub fn new(buf: &'a BitSlice<u8>) -> Self {
270 Self {
271 buf,
272 index: 0,
273 previous_time: 0,
274 previous_data: [0.0; N],
275 previous_delta: 0,
276 previous_xors: [0; N],
277 failed: false,
278 _options: core::marker::PhantomData,
279 }
280 }
281}
282
283impl<'a, const SAMPLES_PER_ROW: usize, const BC: usize, OPTS: CompressionOptions<BC>> Iterator
284 for Decompressor<'a, SAMPLES_PER_ROW, BC, OPTS>
285{
286 type Item = Result<(u32, [f32; SAMPLES_PER_ROW]), DecompressError>;
287
288 fn next(&mut self) -> Option<Self::Item> {
289 let consume = |index: &mut usize, n: usize| {
290 if self.buf.len() - *index < n {
291 return None;
292 }
293 let bits = self.buf.get(*index..*index + n)?;
294 *index += n;
295 Some(bits)
296 };
297 if self.failed || self.index >= self.buf.len() {
298 return None; }
300
301 if self.index == 0 {
302 if self.buf.len() < 32 + (SAMPLES_PER_ROW * 32) {
303 self.failed = true;
304 return Some(Err(DecompressError::MissingHeader));
305 }
306
307 let first_time: u32 = consume(&mut self.index, 32)
308 .expect("checked bounds already")
309 .load_le();
310 let first_data: [f32; SAMPLES_PER_ROW] = self.buf[32..(32 + (SAMPLES_PER_ROW * 32))]
311 .chunks_exact(32)
312 .map(|chunk| f32::from_bits(chunk.load_le()))
313 .collect::<heapless::Vec<f32, SAMPLES_PER_ROW>>()
314 .into_array()
315 .expect("Impossible for N f32's to not fit into an array of N f32's");
316
317 self.index = 32 + (SAMPLES_PER_ROW * 32);
318 self.previous_time = first_time;
319 self.previous_data = first_data;
320
321 return Some(Ok((self.previous_time, self.previous_data)));
322 }
323
324 let timestamp_ctl_bits = self
325 .buf
326 .get(self.index..)
327 .expect("Checked bounds already")
328 .leading_ones();
329
330 self.index += core::cmp::min(timestamp_ctl_bits + 1, OPTS::DELTA_DELTA_BINS.len() + 1);
331
332 if timestamp_ctl_bits == 0 {
333 if let Some(t) = self.previous_time.checked_add_signed(self.previous_delta) {
335 self.previous_time = t;
336 } else {
337 self.failed = true;
338 return Some(Err(DecompressError::CorruptedTimestamp {
339 index: self.index,
340 }));
341 }
342 } else {
343 let n_dd_bits = *OPTS::DELTA_DELTA_BINS
344 .get(timestamp_ctl_bits - 1)
345 .unwrap_or(&32) as usize;
346
347 let delta_delta: i32 = if let Some(dd) = consume(&mut self.index, n_dd_bits) {
348 dd.load_le()
349 } else {
350 self.failed = true;
351 return Some(Err(DecompressError::CorruptedTimestamp {
352 index: self.index,
353 }));
354 };
355
356 let delta = self.previous_delta + delta_delta;
357 self.previous_time = self.previous_time.wrapping_add(delta as u32);
358 self.previous_delta = delta;
359 };
360
361 for (previous_d, previous_xor) in self
362 .previous_data
363 .iter_mut()
364 .zip(self.previous_xors.iter_mut())
365 {
366 let new_data_bit = if let Some(x) = consume(&mut self.index, 1) {
367 x[0]
368 } else {
369 self.failed = true;
370 return Some(Err(DecompressError::CorruptedTimestamp {
371 index: self.index,
372 }));
373 };
374
375 if new_data_bit.not() {
376 *previous_xor = 0;
378 continue;
379 } else {
380 let xor_ctl_bit = if let Some(x) = consume(&mut self.index, 1) {
381 x[0]
382 } else {
383 self.failed = true;
384 return Some(Err(DecompressError::CorruptedData { index: self.index }));
385 };
386
387 if xor_ctl_bit.not() {
388 let n_bits_to_read =
390 (32 - previous_xor.leading_zeros() - previous_xor.trailing_zeros())
391 as usize;
392 let compressed = if let Some(x) = consume(&mut self.index, n_bits_to_read) {
393 x
394 } else {
395 self.failed = true;
396 return Some(Err(DecompressError::CorruptedData { index: self.index }));
397 };
398
399 let xor = compressed.load_le::<u32>() << previous_xor.trailing_zeros();
400 *previous_d = f32::from_bits(xor ^ previous_d.to_bits());
401 *previous_xor = xor;
402 } else {
403 let leading_zero_bits = if let Some(x) = consume(&mut self.index, 4) {
404 x.load_le::<u32>()
405 } else {
406 self.failed = true;
407 return Some(Err(DecompressError::CorruptedData { index: self.index }));
408 };
409
410 let meaningful_bits_len = if let Some(x) = consume(&mut self.index, 5) {
411 x.load_le::<u32>() + 1
413 } else {
414 self.failed = true;
415 return Some(Err(DecompressError::CorruptedData { index: self.index }));
416 };
417
418 if leading_zero_bits + meaningful_bits_len > 32 {
419 self.failed = true;
420 return Some(Err(DecompressError::CorruptedData { index: self.index }));
421 }
422
423 let trailing_zeros = 32 - leading_zero_bits - meaningful_bits_len;
424
425 let meaningful_bits =
426 if let Some(x) = consume(&mut self.index, meaningful_bits_len as usize) {
427 x
428 } else {
429 self.failed = true;
430 return Some(Err(DecompressError::CorruptedData { index: self.index }));
431 };
432
433 let xor = meaningful_bits.load_le::<u32>() << trailing_zeros;
434 *previous_d = f32::from_bits(xor ^ previous_d.to_bits());
435 *previous_xor = xor;
436 }
437 }
438 }
439
440 return Some(Ok((self.previous_time, self.previous_data)));
441 }
442}
443
444#[cfg(test)]
445extern crate std;
446
447#[cfg(test)]
448mod tests {
449 use crate::stats::{CompressionStats, whitepaper::WhitepaperOptions};
450 use std::{eprintln, println, vec::Vec};
451
452 use super::*;
453
454 #[test]
455 fn compression_test() {
456 const TIMESERIES: &'static str = include_str!("../samples.csv");
457 const COLUMNS: usize = const_str::split!(const_str::split!(TIMESERIES, '\n')[0], ',').len();
458
459 let timeseries: Vec<_> = TIMESERIES
460 .lines()
461 .map(|line| {
462 let mut spl = line.split(',');
463 let time = spl.next().unwrap().trim().parse::<f32>().unwrap() as u32;
464 let samples: [f32; COLUMNS - 1] = spl
465 .map(|col| col.trim().parse::<f32>().unwrap())
466 .collect::<heapless::Vec<_, { COLUMNS - 1 }>>()
467 .into_array()
468 .expect("checked size already");
469 (time, samples)
470 })
471 .collect();
472
473 let mut buf = [0u8; 250];
474 let mut entries_processed = timeseries.len();
475 let compressed = match compress::<{ COLUMNS - 1 }, 3, WhitepaperOptions>(
476 timeseries.iter(),
477 buf.view_bits_mut(),
478 ) {
479 Ok(compressed) => compressed,
480 Err(CompressError {
481 valid_bits,
482 entries_processed: entries,
483 }) => {
484 eprintln!(
485 "warn: Buffer too small, only compressed {entries} entries out of {}",
486 timeseries.len()
487 );
488 entries_processed = entries;
489 valid_bits
490 }
491 };
492
493 let compressed_size_bits = compressed.len();
494 let data_size_bytes = entries_processed * (4 + 4 * timeseries[0].1.len());
495
496 let decompressor = Decompressor::<_, _, WhitepaperOptions>::new(compressed);
497 for (result, reference) in decompressor.zip(timeseries.iter()) {
498 match result {
499 Ok((time, data)) => {
500 assert!(time == reference.0);
501 assert!(data == reference.1);
502 }
503 Err(e) => {
504 panic!("Decompression failed: {e}");
505 }
506 }
507 }
508
509 println!(
510 "Compressed size: {} bytes, originally {} bytes. Compression ratio: {:.2}%",
511 compressed_size_bits / 8,
512 data_size_bytes,
513 compressed_size_bits as f64 / 8. * 100.0 / data_size_bytes as f64
514 );
515 }
516}