1use crate::error::{TsdbError, TsdbResult};
7use bit_vec::BitVec;
8use std::ops::Range;
9
10pub struct GorillaCompressor {
27 prev_value: f64,
29
30 prev_leading_zeros: u8,
32
33 prev_trailing_zeros: u8,
35
36 bits: BitVec,
38
39 count: u32,
41}
42
43impl GorillaCompressor {
44 pub fn new(first_value: f64) -> Self {
49 let mut bits = BitVec::new();
50
51 let value_bits = first_value.to_bits();
53 for i in (0..64).rev() {
54 bits.push((value_bits >> i) & 1 == 1);
55 }
56
57 Self {
58 prev_value: first_value,
59 prev_leading_zeros: 64, prev_trailing_zeros: 64,
61 bits,
62 count: 1, }
64 }
65
66 pub fn compress(&mut self, value: f64) {
68 let xor = value.to_bits() ^ self.prev_value.to_bits();
69
70 if xor == 0 {
71 self.bits.push(false);
73 } else {
74 self.bits.push(true); let leading_zeros = xor.leading_zeros() as u8;
77 let trailing_zeros = xor.trailing_zeros() as u8;
78
79 if self.prev_leading_zeros < 64
82 && leading_zeros >= self.prev_leading_zeros
83 && trailing_zeros >= self.prev_trailing_zeros
84 {
85 self.bits.push(false);
87
88 let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
89 let shifted_xor = xor >> self.prev_trailing_zeros;
90
91 for i in (0..meaningful_bits).rev() {
92 self.bits.push((shifted_xor >> i) & 1 == 1);
93 }
94 } else {
95 self.bits.push(true);
97
98 let leading_zeros_capped = leading_zeros.min(31);
100 for i in (0..5).rev() {
101 self.bits.push((leading_zeros_capped >> i) & 1 == 1);
102 }
103
104 let actual_leading = leading_zeros_capped;
108 let meaningful_bits = (64 - leading_zeros - trailing_zeros).max(1);
109 for i in (0..6).rev() {
110 self.bits.push((meaningful_bits >> i) & 1 == 1);
111 }
112
113 let shifted_xor = xor >> trailing_zeros;
115 for i in (0..meaningful_bits).rev() {
116 self.bits.push((shifted_xor >> i) & 1 == 1);
117 }
118
119 self.prev_leading_zeros = actual_leading;
120 self.prev_trailing_zeros = 64 - actual_leading - meaningful_bits;
121 }
122 }
123
124 self.prev_value = value;
125 self.count += 1;
126 }
127
128 pub fn finish(self) -> Vec<u8> {
132 let mut result = Vec::with_capacity(4 + (self.bits.len() + 7) / 8);
133 result.extend_from_slice(&self.count.to_be_bytes());
135 result.extend_from_slice(&self.bits.to_bytes());
136 result
137 }
138
139 pub fn compression_ratio(&self, original_count: usize) -> f64 {
141 let original_bytes = original_count * 8; let compressed_bytes = 4 + (self.bits.len() + 7) / 8; original_bytes as f64 / compressed_bytes as f64
144 }
145}
146
147pub struct GorillaDecompressor {
149 bits: BitVec,
151
152 pos: usize,
154
155 prev_value: f64,
157
158 prev_leading_zeros: u8,
160
161 prev_trailing_zeros: u8,
163
164 total_count: u32,
166
167 decompressed_count: u32,
169}
170
171impl GorillaDecompressor {
172 pub fn new(compressed: &[u8]) -> TsdbResult<Self> {
176 if compressed.len() < 4 {
177 return Err(TsdbError::Decompression(
178 "Compressed data too short (missing count header)".to_string(),
179 ));
180 }
181
182 let total_count =
184 u32::from_be_bytes([compressed[0], compressed[1], compressed[2], compressed[3]]);
185
186 let bits = BitVec::from_bytes(&compressed[4..]);
188
189 if bits.len() < 64 {
190 return Err(TsdbError::Decompression(
191 "Compressed data too short (missing first value)".to_string(),
192 ));
193 }
194
195 let first_value_bits = Self::read_bits_as_u64(&bits, 0..64);
197 let first_value = f64::from_bits(first_value_bits);
198
199 Ok(Self {
200 bits,
201 pos: 64,
202 prev_value: first_value,
203 prev_leading_zeros: 64, prev_trailing_zeros: 64,
205 total_count,
206 decompressed_count: 1, })
208 }
209
210 pub fn first_value(&self) -> f64 {
212 self.prev_value
213 }
214
215 pub fn total_count(&self) -> u32 {
217 self.total_count
218 }
219
220 pub fn next_value(&mut self) -> Option<f64> {
224 if self.decompressed_count >= self.total_count {
226 return None;
227 }
228
229 if self.pos >= self.bits.len() {
230 return None;
231 }
232
233 let changed = self.bits[self.pos];
234 self.pos += 1;
235
236 if !changed {
237 self.decompressed_count += 1;
239 return Some(self.prev_value);
240 }
241
242 if self.pos >= self.bits.len() {
243 return None;
244 }
245
246 let use_prev_block = !self.bits[self.pos]; self.pos += 1;
248
249 let meaningful_bits = if use_prev_block && self.prev_leading_zeros < 64 {
250 64 - self.prev_leading_zeros - self.prev_trailing_zeros
252 } else {
253 if self.pos + 11 > self.bits.len() {
255 return None; }
257
258 let leading_zeros = Self::read_bits_as_u8(&self.bits, self.pos..self.pos + 5);
259 self.pos += 5;
260
261 let block_size = Self::read_bits_as_u8(&self.bits, self.pos..self.pos + 6);
262 self.pos += 6;
263
264 let trailing_zeros = (64 - leading_zeros).saturating_sub(block_size);
266 self.prev_leading_zeros = leading_zeros;
267 self.prev_trailing_zeros = trailing_zeros;
268
269 block_size
270 };
271
272 if meaningful_bits == 0 {
273 self.decompressed_count += 1;
275 return Some(self.prev_value);
276 }
277
278 if self.pos + meaningful_bits as usize > self.bits.len() {
279 return None; }
281
282 let xor_shifted =
284 Self::read_bits_as_u64(&self.bits, self.pos..self.pos + meaningful_bits as usize);
285 self.pos += meaningful_bits as usize;
286
287 let xor = if self.prev_trailing_zeros >= 64 {
289 0
290 } else {
291 xor_shifted << self.prev_trailing_zeros
292 };
293 let value_bits = self.prev_value.to_bits() ^ xor;
294 let value = f64::from_bits(value_bits);
295
296 self.prev_value = value;
297 self.decompressed_count += 1;
298 Some(value)
299 }
300
301 pub fn decompress_all(mut self) -> Vec<f64> {
303 let first = self.first_value();
304 let mut values = vec![first];
305
306 while let Some(value) = self.next_value() {
307 values.push(value);
308 }
309
310 values
311 }
312
313 fn read_bits_as_u64(bits: &BitVec, range: Range<usize>) -> u64 {
315 let mut result: u64 = 0;
316 let range_len = range.len();
317 for (i, bit_idx) in range.enumerate() {
318 if bit_idx < bits.len() && bits[bit_idx] {
319 result |= 1 << (range_len - 1 - i);
320 }
321 }
322 result
323 }
324
325 fn read_bits_as_u8(bits: &BitVec, range: Range<usize>) -> u8 {
327 Self::read_bits_as_u64(bits, range) as u8
328 }
329}
330
331pub struct DeltaOfDeltaCompressor {
336 prev_timestamp: i64,
338
339 prev_delta: i64,
341
342 bits: BitVec,
344
345 count: u32,
347}
348
349impl DeltaOfDeltaCompressor {
350 pub fn new(first_timestamp: i64) -> Self {
354 let mut bits = BitVec::new();
355
356 for i in (0..64).rev() {
358 bits.push((first_timestamp >> i) & 1 == 1);
359 }
360
361 Self {
362 prev_timestamp: first_timestamp,
363 prev_delta: 0,
364 bits,
365 count: 1, }
367 }
368
369 pub fn compress(&mut self, timestamp: i64) {
371 let delta = timestamp - self.prev_timestamp;
372 let delta_of_delta = delta - self.prev_delta;
373
374 match delta_of_delta {
376 0 => {
377 self.bits.push(false);
379 }
380 -63..=64 => {
381 self.bits.push(true);
383 self.bits.push(false);
384 self.encode_signed(delta_of_delta, 7);
385 }
386 -255..=256 => {
387 self.bits.push(true);
389 self.bits.push(true);
390 self.bits.push(false);
391 self.encode_signed(delta_of_delta, 9);
392 }
393 -2047..=2048 => {
394 self.bits.push(true);
396 self.bits.push(true);
397 self.bits.push(true);
398 self.bits.push(false);
399 self.encode_signed(delta_of_delta, 12);
400 }
401 _ => {
402 self.bits.push(true);
404 self.bits.push(true);
405 self.bits.push(true);
406 self.bits.push(true);
407 self.encode_signed(delta_of_delta, 64);
408 }
409 }
410
411 self.prev_timestamp = timestamp;
412 self.prev_delta = delta;
413 self.count += 1;
414 }
415
416 fn encode_signed(&mut self, value: i64, bit_count: usize) {
418 for i in (0..bit_count).rev() {
419 self.bits.push((value >> i) & 1 == 1);
420 }
421 }
422
423 pub fn finish(self) -> Vec<u8> {
427 let mut result = Vec::with_capacity(4 + (self.bits.len() + 7) / 8);
428 result.extend_from_slice(&self.count.to_be_bytes());
430 result.extend_from_slice(&self.bits.to_bytes());
431 result
432 }
433
434 pub fn compression_ratio(&self, original_count: usize) -> f64 {
436 let original_bytes = original_count * 8; let compressed_bytes = 4 + (self.bits.len() + 7) / 8; original_bytes as f64 / compressed_bytes as f64
439 }
440}
441
442pub struct DeltaOfDeltaDecompressor {
444 bits: BitVec,
446
447 pos: usize,
449
450 prev_timestamp: i64,
452
453 prev_delta: i64,
455
456 total_count: u32,
458
459 decompressed_count: u32,
461}
462
463impl DeltaOfDeltaDecompressor {
464 pub fn new(compressed: &[u8]) -> TsdbResult<Self> {
468 if compressed.len() < 4 {
469 return Err(TsdbError::Decompression(
470 "Compressed timestamp data too short (missing count header)".to_string(),
471 ));
472 }
473
474 let total_count =
476 u32::from_be_bytes([compressed[0], compressed[1], compressed[2], compressed[3]]);
477
478 let bits = BitVec::from_bytes(&compressed[4..]);
480
481 if bits.len() < 64 {
482 return Err(TsdbError::Decompression(
483 "Compressed timestamp data too short (missing first timestamp)".to_string(),
484 ));
485 }
486
487 let first_timestamp = Self::read_bits_as_i64(&bits, 0..64);
489
490 Ok(Self {
491 bits,
492 pos: 64,
493 prev_timestamp: first_timestamp,
494 prev_delta: 0,
495 total_count,
496 decompressed_count: 1, })
498 }
499
500 pub fn first_timestamp(&self) -> i64 {
502 self.prev_timestamp
503 }
504
505 pub fn total_count(&self) -> u32 {
507 self.total_count
508 }
509
510 pub fn next_timestamp(&mut self) -> Option<i64> {
514 if self.decompressed_count >= self.total_count {
516 return None;
517 }
518
519 if self.pos >= self.bits.len() {
520 return None;
521 }
522
523 let prefix = self.read_prefix();
525
526 let delta_of_delta = match prefix {
527 0 => 0, 1 => {
529 if self.pos + 7 > self.bits.len() {
531 return None;
532 }
533 let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 7);
534 self.pos += 7;
535 Self::sign_extend(value, 7)
536 }
537 2 => {
538 if self.pos + 9 > self.bits.len() {
540 return None;
541 }
542 let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 9);
543 self.pos += 9;
544 Self::sign_extend(value, 9)
545 }
546 3 => {
547 if self.pos + 12 > self.bits.len() {
549 return None;
550 }
551 let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 12);
552 self.pos += 12;
553 Self::sign_extend(value, 12)
554 }
555 4 => {
556 if self.pos + 64 > self.bits.len() {
558 return None;
559 }
560 let value = Self::read_bits_as_i64(&self.bits, self.pos..self.pos + 64);
561 self.pos += 64;
562 value
563 }
564 _ => return None,
565 };
566
567 let delta = self.prev_delta + delta_of_delta;
568 let timestamp = self.prev_timestamp + delta;
569
570 self.prev_timestamp = timestamp;
571 self.prev_delta = delta;
572 self.decompressed_count += 1;
573
574 Some(timestamp)
575 }
576
577 fn read_prefix(&mut self) -> u8 {
579 if self.pos >= self.bits.len() || !self.bits[self.pos] {
580 self.pos += 1;
581 return 0; }
583
584 self.pos += 1;
585 if self.pos >= self.bits.len() || !self.bits[self.pos] {
586 self.pos += 1;
587 return 1; }
589
590 self.pos += 1;
591 if self.pos >= self.bits.len() || !self.bits[self.pos] {
592 self.pos += 1;
593 return 2; }
595
596 self.pos += 1;
597 if self.pos >= self.bits.len() || !self.bits[self.pos] {
598 self.pos += 1;
599 return 3; }
601
602 self.pos += 1;
603 4 }
605
606 fn read_bits_as_i64(bits: &BitVec, range: Range<usize>) -> i64 {
608 let mut result: i64 = 0;
609 let range_len = range.len();
610 for (i, bit_idx) in range.enumerate() {
611 if bit_idx < bits.len() && bits[bit_idx] {
612 result |= 1 << (range_len - 1 - i);
613 }
614 }
615 result
616 }
617
618 fn sign_extend(value: i64, bits: usize) -> i64 {
620 let sign_bit = 1i64 << (bits - 1);
621 if value & sign_bit != 0 {
622 value | (!0i64 << bits)
624 } else {
625 value
627 }
628 }
629
630 pub fn decompress_all(mut self) -> Vec<i64> {
632 let first = self.first_timestamp();
633 let mut timestamps = vec![first];
634
635 while let Some(timestamp) = self.next_timestamp() {
636 timestamps.push(timestamp);
637 }
638
639 timestamps
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn test_gorilla_compression_unchanged() {
649 let mut compressor = GorillaCompressor::new(25.0);
651 for _ in 0..999 {
652 compressor.compress(25.0);
653 }
654
655 let compressed = compressor.finish();
656 let original_size = 1000 * 8; let compressed_size = compressed.len();
658
659 println!(
660 "Unchanged values (1000): {} bytes → {} bytes (ratio: {:.1}:1)",
661 original_size,
662 compressed_size,
663 original_size as f64 / compressed_size as f64
664 );
665
666 assert!(compressed_size < original_size / 50); }
671
672 #[test]
673 fn test_gorilla_round_trip() {
674 let values = vec![20.0, 20.1, 20.2, 20.1, 20.0, 20.1, 20.2, 20.3, 20.2, 20.1];
676
677 let mut compressor = GorillaCompressor::new(values[0]);
678 for &value in &values[1..] {
679 compressor.compress(value);
680 }
681
682 let compressed = compressor.finish();
683 let decompressor = GorillaDecompressor::new(&compressed).unwrap();
684 let decompressed = decompressor.decompress_all();
685
686 assert_eq!(values.len(), decompressed.len());
687
688 for (original, decompressed_val) in values.iter().zip(decompressed.iter()) {
690 assert_eq!(original, decompressed_val);
691 }
692 }
693
694 #[test]
695 fn test_delta_of_delta_regular_sampling() {
696 let mut timestamps = Vec::new();
698 let base = 1640000000000i64; for i in 0..1000 {
700 timestamps.push(base + i * 1000); }
702
703 let mut compressor = DeltaOfDeltaCompressor::new(timestamps[0]);
704 for &ts in ×tamps[1..] {
705 compressor.compress(ts);
706 }
707
708 let compressed = compressor.finish();
709 let original_size = 1000 * 8; let compressed_size = compressed.len();
711
712 println!(
713 "Regular 1Hz sampling: {} bytes → {} bytes (ratio: {:.1}:1)",
714 original_size,
715 compressed_size,
716 original_size as f64 / compressed_size as f64
717 );
718
719 assert!(compressed_size < original_size / 30); let decompressor = DeltaOfDeltaDecompressor::new(&compressed).unwrap();
724 let decompressed = decompressor.decompress_all();
725
726 assert_eq!(timestamps, decompressed);
727 }
728
729 #[test]
730 fn test_delta_of_delta_irregular_sampling() {
731 let timestamps = vec![
733 1000, 2000, 3000, 3100, 5000, 6000, 7000, ];
735
736 let mut compressor = DeltaOfDeltaCompressor::new(timestamps[0]);
737 for &ts in ×tamps[1..] {
738 compressor.compress(ts);
739 }
740
741 let compressed = compressor.finish();
742
743 let decompressor = DeltaOfDeltaDecompressor::new(&compressed).unwrap();
745 let decompressed = decompressor.decompress_all();
746
747 assert_eq!(timestamps, decompressed);
748 }
749
750 #[test]
751 fn test_compression_ratio_calculation() {
752 let mut compressor = GorillaCompressor::new(100.0);
753 for i in 0..99 {
754 compressor.compress(100.0 + (i as f64 * 0.1));
755 }
756
757 let ratio = compressor.compression_ratio(100);
758 assert!(ratio > 1.0); println!("Compression ratio: {:.1}:1", ratio);
760 }
761}