1use bitvec::prelude::*;
2pub use tsz_macro::*;
3
4pub type BitBuffer = BitVec<u8, Lsb0>;
9pub type BitBufferSlice = BitSlice<u8, Lsb0>;
10
11#[derive(Default)]
20pub struct Compressor<T: Compress> {
21 pub output: BitBuffer,
22 pub row_n: Option<T>,
23 pub row_n1: Option<T>,
24}
25
26pub struct Decompressor<'de> {
34 pub input: &'de BitBufferSlice,
35}
36
37pub trait Compress: Copy + Sized {
44 type Full: IntoCompressBits;
48 type Delta: IntoCompressBits;
49
50 fn into_full(self) -> Self::Full;
51 fn into_delta(self, prev_row: &Self) -> Self::Delta;
52 fn into_deltadelta(self, prev_prev_row: &Self, prev_row: &Self) -> Self::Delta;
53}
54
55pub trait Decompress: Copy + Sized {
62 type Full: FromCompressBits;
63 type Delta: FromCompressBits;
64
65 fn from_full(bits: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str>;
66 fn from_delta<'a>(
67 bits: &'a BitBufferSlice,
68 prev_row: &Self,
69 ) -> Result<(Self, &'a BitBufferSlice), &'static str>;
70 fn from_deltadelta<'a>(
71 bits: &'a BitBufferSlice,
72 prev_row: &Self,
73 prev_prev_row: &Self,
74 ) -> Result<(Self, &'a BitBufferSlice), &'static str>;
75}
76
77pub trait IntoCompressBits: Sized {
78 fn into_bits(self, out: &mut BitBuffer);
79}
80
81pub trait FromCompressBits: Sized {
82 fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str>;
83}
84
85impl<T: Compress> Compressor<T> {
86 pub fn new(size_hint: usize) -> Self {
93 Self {
94 output: BitBuffer::with_capacity(size_hint * 8),
95 row_n: None,
96 row_n1: None,
97 }
98 }
99
100 pub fn compress(&mut self, row: T) {
106 let Some(row_n) = self.row_n.take() else {
107 self.row_n = Some(row);
108
109 let representation = row.into_full();
112 representation.into_bits(&mut self.output);
113
114 return;
115 };
116
117 let Some(row_n1) = self.row_n1.take() else {
118 self.row_n = Some(row_n);
119 self.row_n1 = Some(row);
120
121 let representation = row.into_delta(&row_n);
124 representation.into_bits(&mut self.output);
125
126 return;
127 };
128
129 let representation = row.into_deltadelta(&row_n, &row_n1);
132 representation.into_bits(&mut self.output);
133
134 self.row_n = Some(row_n1);
137 self.row_n1 = Some(row);
138 }
139
140 pub fn len(&self) -> usize {
144 let num_bits = self.output.len();
146 (num_bits + 7) / 8
147 }
148
149 pub fn is_empty(&self) -> bool {
153 self.output.is_empty()
154 }
155
156 pub fn finish(self) -> BitBuffer {
160 self.output
161 }
162}
163
164impl<'de> Decompressor<'de> {
165 pub fn new(input: &'de BitBufferSlice) -> Self {
169 Self { input }
170 }
171
172 pub fn decompress<T: Decompress>(&mut self) -> DecompressIter<'_, T> {
176 DecompressIter {
177 input: self.input,
178 finished: false,
179 first_row: None,
180 second_row: None,
181 t_prev_prev: None,
182 t_prev: None,
183 }
184 }
185}
186
187#[derive(Clone)]
191pub struct DecompressIter<'a, T> {
192 input: &'a BitBufferSlice,
193 finished: bool,
194 first_row: Option<T>,
195 second_row: Option<T>,
196 t_prev_prev: Option<T>,
197 t_prev: Option<T>,
198}
199
200impl<'a, T> Iterator for DecompressIter<'a, T>
204where
205 T: Decompress,
206{
207 type Item = Result<T, &'static str>;
208
209 fn next(&mut self) -> Option<Self::Item> {
210 if self.finished {
211 return None;
212 }
213
214 let Some(first_row) = self.first_row.as_ref() else {
215 if self.input.is_empty() {
217 self.finished = true;
218 return None;
219 }
220
221 let (first_row, trailing) = match T::from_full(self.input) {
224 Ok(x) => x,
225 Err(e) => {
226 self.finished = true;
227 return Some(Err(e));
228 }
229 };
230 self.input = trailing;
231 self.first_row = Some(first_row);
232 return Some(Ok(first_row));
233 };
234
235 let Some(_second_row) = self.second_row.as_ref() else {
236 if self.input.is_empty() {
238 self.finished = true;
239 return None;
240 }
241
242 let (second_row, trailing) = match T::from_delta(self.input, first_row) {
245 Ok(x) => x,
246 Err(e) => {
247 self.finished = true;
248 return Some(Err(e));
249 }
250 };
251 self.input = trailing;
252 self.second_row = Some(second_row);
253 self.t_prev_prev = Some(*first_row);
254 self.t_prev = Some(second_row);
255 return Some(Ok(second_row));
256 };
257
258 if self.input.is_empty() {
262 self.finished = true;
263 return None;
264 }
265
266 let t_prev = self.t_prev.take().unwrap();
267 let t_prev_prev = self.t_prev_prev.take().unwrap();
268
269 let (row, trailing) = match T::from_deltadelta(self.input, &t_prev, &t_prev_prev) {
271 Ok(x) => x,
272 Err(e) => {
273 self.finished = true;
274 return Some(Err(e));
275 }
276 };
277 self.input = trailing;
278 self.t_prev_prev = Some(t_prev);
279 self.t_prev = Some(row);
280
281 Some(Ok(row))
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use core::ops::Add;
288 use core::ops::Sub;
289 use rand::Rng;
290
291 use crate::delta::*;
292 use crate::svlq::*;
293 use crate::uvlq::*;
294
295 use super::*;
296
297 #[test]
298 fn test_compress() {
299 #[derive(Debug, Copy, Clone)]
301 struct TestRow {
302 ts: u64,
303 v8: u8,
304 v16: u16,
305 v32: u32,
306 v64: u64,
307 vi8: i8,
308 vi16: i16,
309 vi32: i32,
310 vi64: i64,
311 }
312
313 #[derive(Debug, Copy, Clone)]
315 struct TestRowDelta {
316 ts: i128,
317 v8: i16,
318 v16: i32,
319 v32: i64,
320 v64: i128,
321 vi8: i16,
322 vi16: i32,
323 vi32: i64,
324 vi64: i128,
325 }
326
327 impl Sub for TestRow {
329 type Output = TestRowDelta;
330
331 fn sub(self, rhs: Self) -> Self::Output {
332 Self::Output {
333 ts: self.ts as i128 - rhs.ts as i128,
334 v8: self.v8 as i16 - rhs.v8 as i16,
335 v16: self.v16 as i32 - rhs.v16 as i32,
336 v32: self.v32 as i64 - rhs.v32 as i64,
337 v64: self.v64 as i128 - rhs.v64 as i128,
338 vi8: self.vi8 as i16 - rhs.vi8 as i16,
339 vi16: self.vi16 as i32 - rhs.vi16 as i32,
340 vi32: self.vi32 as i64 - rhs.vi32 as i64,
341 vi64: self.vi64 as i128 - rhs.vi64 as i128,
342 }
343 }
344 }
345
346 impl Add<TestRowDelta> for TestRow {
348 type Output = TestRow;
349
350 fn add(self, rhs: TestRowDelta) -> Self::Output {
351 Self::Output {
352 ts: (self.ts as i128 + rhs.ts) as u64,
353 v8: (self.v8 as i16 + rhs.v8) as u8,
354 v16: (self.v16 as i32 + rhs.v16) as u16,
355 v32: (self.v32 as i64 + rhs.v32) as u32,
356 v64: (self.v64 as i128 + rhs.v64) as u64,
357 vi8: (self.vi8 as i16 + rhs.vi8) as i8,
358 vi16: (self.vi16 as i32 + rhs.vi16) as i16,
359 vi32: (self.vi32 as i64 + rhs.vi32) as i32,
360 vi64: (self.vi64 as i128 + rhs.vi64) as i64,
361 }
362 }
363 }
364
365 impl Sub for TestRowDelta {
367 type Output = TestRowDelta;
368
369 fn sub(self, rhs: Self) -> Self::Output {
370 Self::Output {
371 ts: self.ts - rhs.ts,
372 v8: self.v8 - rhs.v8,
373 v16: self.v16 - rhs.v16,
374 v32: self.v32 - rhs.v32,
375 v64: self.v64 - rhs.v64,
376 vi8: self.vi8 - rhs.vi8,
377 vi16: self.vi16 - rhs.vi16,
378 vi32: self.vi32 - rhs.vi32,
379 vi64: self.vi64 - rhs.vi64,
380 }
381 }
382 }
383
384 impl IntoCompressBits for TestRow {
386 fn into_bits(self, out: &mut BitBuffer) {
387 out.extend(Uvlq::from(self.ts).bits);
388 out.extend(Uvlq::from(self.v8).bits);
389 out.extend(Uvlq::from(self.v16).bits);
390 out.extend(Uvlq::from(self.v32).bits);
391 out.extend(Uvlq::from(self.v64).bits);
392 out.extend(Svlq::from(self.vi8).bits);
393 out.extend(Svlq::from(self.vi16).bits);
394 out.extend(Svlq::from(self.vi32).bits);
395 out.extend(Svlq::from(self.vi64).bits);
396 }
397 }
398
399 impl IntoCompressBits for TestRowDelta {
401 fn into_bits(self, out: &mut BitBuffer) {
402 if self.ts < i64::MIN as i128 && self.ts > i64::MAX as i128 {
403 unimplemented!()
404 }
405 encode_delta_i64(self.ts as i64, out);
406
407 encode_delta_i16(self.v8, out);
408 encode_delta_i32(self.v16, out);
409 encode_delta_i64(self.v32, out);
410
411 if self.v64 < i128::MIN as i128 && self.v64 > i128::MAX as i128 {
412 unimplemented!()
413 }
414 encode_delta_i64(self.v64 as i64, out);
415
416 encode_delta_i16(self.vi8, out);
417 encode_delta_i32(self.vi16, out);
418 encode_delta_i64(self.vi32, out);
419
420 if self.vi64 < i64::MIN as i128 && self.vi64 > i64::MAX as i128 {
421 unimplemented!()
422 }
423 encode_delta_i64(self.vi64 as i64, out);
424 }
425 }
426
427 impl FromCompressBits for TestRow {
429 fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str> {
430 let (ts, ts_bits) = <(u64, usize)>::try_from(UvlqRef(input))?;
431 let input = &input[ts_bits..];
432 let (v8, v8_bits) = <(u8, usize)>::try_from(UvlqRef(input))?;
433 let input = &input[v8_bits..];
434 let (v16, v16_bits) = <(u16, usize)>::try_from(UvlqRef(input))?;
435 let input = &input[v16_bits..];
436 let (v32, v32_bits) = <(u32, usize)>::try_from(UvlqRef(input))?;
437 let input = &input[v32_bits..];
438 let (v64, v64_bits) = <(u64, usize)>::try_from(UvlqRef(input))?;
439 let input = &input[v64_bits..];
440
441 let (vi8, vi8_bits) = <(i8, usize)>::try_from(SvlqRef(input))?;
442 let input = &input[vi8_bits..];
443 let (vi16, vi16_bits) = <(i16, usize)>::try_from(SvlqRef(input))?;
444 let input = &input[vi16_bits..];
445 let (vi32, vi32_bits) = <(i32, usize)>::try_from(SvlqRef(input))?;
446 let input = &input[vi32_bits..];
447 let (vi64, vi64_bits) = <(i64, usize)>::try_from(SvlqRef(input))?;
448 let input = &input[vi64_bits..];
449
450 Ok((
451 Self {
452 ts,
453 v8,
454 v16,
455 v32,
456 v64,
457 vi8,
458 vi16,
459 vi32,
460 vi64,
461 },
462 input,
463 ))
464 }
465 }
466
467 impl FromCompressBits for TestRowDelta {
469 fn from_bits(input: &BitBufferSlice) -> Result<(Self, &BitBufferSlice), &'static str> {
470 let (ts, input) = decode_delta_i64(input)?;
471 let Some(input) = input else {
472 return Err("Early EOF");
473 };
474 let (v8, input) = decode_delta_i16(input)?;
475 let Some(input) = input else {
476 return Err("Early EOF");
477 };
478 let (v16, input) = decode_delta_i32(input)?;
479 let Some(input) = input else {
480 return Err("Early EOF");
481 };
482 let (v32, input) = decode_delta_i64(input)?;
483 let Some(input) = input else {
484 return Err("Early EOF");
485 };
486 let (v64, input) = decode_delta_i64(input)?;
487 let Some(input) = input else {
488 return Err("Early EOF");
489 };
490 let (vi8, input) = decode_delta_i16(input)?;
491 let Some(input) = input else {
492 return Err("Early EOF");
493 };
494 let (vi16, input) = decode_delta_i32(input)?;
495 let Some(input) = input else {
496 return Err("Early EOF");
497 };
498 let (vi32, input) = decode_delta_i64(input)?;
499 let Some(input) = input else {
500 return Err("Early EOF");
501 };
502 let (vi64, input) = decode_delta_i64(input)?;
503 let input = input.unwrap_or_default();
504
505 Ok((
506 Self {
507 ts: ts as i128,
508 v8,
509 v16,
510 v32,
511 v64: v64 as i128,
512 vi8,
513 vi16,
514 vi32,
515 vi64: vi64 as i128,
516 },
517 input,
518 ))
519 }
520 }
521
522 impl Compress for TestRow {
524 type Full = TestRow;
525
526 type Delta = TestRowDelta;
527
528 fn into_full(self) -> Self::Full {
529 self
530 }
531
532 fn into_delta(self, prev_row: &Self) -> Self::Delta {
533 let r = self - *prev_row;
534 r
535 }
536
537 fn into_deltadelta(self, prev_prev_row: &Self, prev_row: &Self) -> Self::Delta {
538 (self - *prev_row) - (*prev_row - *prev_prev_row)
539 }
540 }
541
542 impl Decompress for TestRow {
543 type Full = TestRow;
544 type Delta = TestRowDelta;
545
546 fn from_full<'a>(
547 bits: &'a BitBufferSlice,
548 ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
549 TestRow::from_bits(bits).map_err(|_| "failed to unmarshal full row")
550 }
551
552 fn from_delta<'a>(
553 bits: &'a BitBufferSlice,
554 prev_row: &Self,
555 ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
556 let delta =
557 TestRowDelta::from_bits(bits).map_err(|_| "failed to unmarshal delta row")?;
558 Ok((*prev_row + delta.0, delta.1))
559 }
560
561 fn from_deltadelta<'a>(
562 bits: &'a BitBufferSlice,
563 prev_row: &Self,
564 prev_prev_row: &Self,
565 ) -> Result<(Self, &'a BitBufferSlice), &'static str> {
566 let deltadelta = TestRowDelta::from_bits(bits)
568 .map_err(|_| "failed to unmarshal deltadelta row")?;
569 Ok((
570 *prev_row + (*prev_row - *prev_prev_row) + deltadelta.0,
571 deltadelta.1,
572 ))
573 }
574 }
575
576 let mut compressor = Compressor::new(100);
577
578 let lower = -32;
579 let j = 0;
580 for i in lower..10isize {
581 let row = TestRow {
582 ts: (j + i - lower) as u64,
583 v8: (j + i - lower) as u8,
584 v16: (j + i - lower) as u16,
585 v32: (j + i - lower) as u32,
586 v64: (j + i - lower) as u64,
587 vi8: (j + i) as i8,
588 vi16: (j + i) as i16,
589 vi32: (j + i) as i32,
590 vi64: (j + i) as i64,
591 };
592 println!("compressing row {:?}", row);
594 compressor.compress(row);
595 }
596
597 let encoded = compressor.finish();
598 println!("{:?}", encoded);
599
600 let mut decompressor = Decompressor::new(&encoded);
601 for (idx, row) in decompressor.decompress::<TestRow>().enumerate() {
602 println!("{:?}: {:?}", idx, row);
603 }
604 }
605}