1use crate::types::DataPoint;
10use chrono::DateTime;
11#[cfg(test)]
12use chrono::Utc;
13
14struct BitWriter {
20 buffer: Vec<u8>,
21 current_byte: u8,
22 bit_position: u8,
23}
24
25impl BitWriter {
26 fn new() -> Self {
27 Self {
28 buffer: Vec::new(),
29 current_byte: 0,
30 bit_position: 0,
31 }
32 }
33
34 fn write_bit(&mut self, bit: bool) {
35 if bit {
36 self.current_byte |= 1 << (7 - self.bit_position);
37 }
38 self.bit_position += 1;
39
40 if self.bit_position == 8 {
41 self.buffer.push(self.current_byte);
42 self.current_byte = 0;
43 self.bit_position = 0;
44 }
45 }
46
47 fn write_bits(&mut self, value: u64, num_bits: u8) {
48 for i in (0..num_bits).rev() {
49 self.write_bit((value >> i) & 1 == 1);
50 }
51 }
52
53 fn finish(mut self) -> Vec<u8> {
54 if self.bit_position > 0 {
55 self.buffer.push(self.current_byte);
56 }
57 self.buffer
58 }
59}
60
61struct BitReader<'a> {
67 data: &'a [u8],
68 byte_position: usize,
69 bit_position: u8,
70}
71
72impl<'a> BitReader<'a> {
73 fn new(data: &'a [u8]) -> Self {
74 Self {
75 data,
76 byte_position: 0,
77 bit_position: 0,
78 }
79 }
80
81 fn read_bit(&mut self) -> Option<bool> {
82 if self.byte_position >= self.data.len() {
83 return None;
84 }
85
86 let bit = (self.data[self.byte_position] >> (7 - self.bit_position)) & 1 == 1;
87 self.bit_position += 1;
88
89 if self.bit_position == 8 {
90 self.byte_position += 1;
91 self.bit_position = 0;
92 }
93
94 Some(bit)
95 }
96
97 fn read_bits(&mut self, num_bits: u8) -> Option<u64> {
98 let mut value = 0u64;
99 for _ in 0..num_bits {
100 value = (value << 1) | (self.read_bit()? as u64);
101 }
102 Some(value)
103 }
104}
105
106pub struct Compressor {
112 writer: BitWriter,
113 first_timestamp: Option<i64>,
114 prev_timestamp: i64,
115 prev_delta: i64,
116 prev_value_bits: u64,
117 prev_leading_zeros: u8,
118 prev_trailing_zeros: u8,
119 count: usize,
120}
121
122impl Compressor {
123 pub fn new() -> Self {
124 Self {
125 writer: BitWriter::new(),
126 first_timestamp: None,
127 prev_timestamp: 0,
128 prev_delta: 0,
129 prev_value_bits: 0,
130 prev_leading_zeros: 64,
131 prev_trailing_zeros: 64,
132 count: 0,
133 }
134 }
135
136 pub fn compress(&mut self, point: &DataPoint) {
138 let timestamp = point.timestamp_millis();
139 let value_bits = point.value.to_bits();
140
141 if self.first_timestamp.is_none() {
142 self.first_timestamp = Some(timestamp);
143 self.writer.write_bits(timestamp as u64, 64);
144 self.writer.write_bits(value_bits, 64);
145 self.prev_timestamp = timestamp;
146 self.prev_value_bits = value_bits;
147 self.count = 1;
148 return;
149 }
150
151 self.compress_timestamp(timestamp);
152 self.compress_value(value_bits);
153
154 self.prev_timestamp = timestamp;
155 self.prev_value_bits = value_bits;
156 self.count += 1;
157 }
158
159 fn compress_timestamp(&mut self, timestamp: i64) {
160 let delta = timestamp - self.prev_timestamp;
161 let delta_of_delta = delta - self.prev_delta;
162
163 if delta_of_delta == 0 {
164 self.writer.write_bit(false);
165 } else if (-63..=64).contains(&delta_of_delta) {
166 self.writer.write_bits(0b10, 2);
167 self.writer.write_bits((delta_of_delta + 63) as u64, 7);
168 } else if (-255..=256).contains(&delta_of_delta) {
169 self.writer.write_bits(0b110, 3);
170 self.writer.write_bits((delta_of_delta + 255) as u64, 9);
171 } else if (-2047..=2048).contains(&delta_of_delta) {
172 self.writer.write_bits(0b1110, 4);
173 self.writer.write_bits((delta_of_delta + 2047) as u64, 12);
174 } else {
175 self.writer.write_bits(0b1111, 4);
176 self.writer.write_bits(delta_of_delta as u64, 64);
177 }
178
179 self.prev_delta = delta;
180 }
181
182 fn compress_value(&mut self, value_bits: u64) {
183 let xor = self.prev_value_bits ^ value_bits;
184
185 if xor == 0 {
186 self.writer.write_bit(false);
187 return;
188 }
189
190 self.writer.write_bit(true);
191
192 let leading_zeros = xor.leading_zeros() as u8;
193 let trailing_zeros = xor.trailing_zeros() as u8;
194
195 if leading_zeros >= self.prev_leading_zeros && trailing_zeros >= self.prev_trailing_zeros {
196 self.writer.write_bit(false);
197 let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
198 let shifted = xor >> self.prev_trailing_zeros;
199 self.writer.write_bits(shifted, meaningful_bits);
200 } else {
201 self.writer.write_bit(true);
202 self.writer.write_bits(leading_zeros as u64, 6);
203
204 let meaningful_bits = 64 - leading_zeros - trailing_zeros;
205 self.writer.write_bits(meaningful_bits as u64, 6);
206
207 let shifted = xor >> trailing_zeros;
208 self.writer.write_bits(shifted, meaningful_bits);
209
210 self.prev_leading_zeros = leading_zeros;
211 self.prev_trailing_zeros = trailing_zeros;
212 }
213 }
214
215 pub fn finish(self) -> CompressedBlock {
217 let data = self.writer.finish();
218 let checksum = crc32fast::hash(&data);
219 CompressedBlock {
220 data,
221 first_timestamp: self.first_timestamp.unwrap_or(0),
222 last_timestamp: self.prev_timestamp,
223 count: self.count,
224 checksum,
225 }
226 }
227}
228
229impl Default for Compressor {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235#[derive(Debug, Clone)]
241pub struct CompressedBlock {
242 pub data: Vec<u8>,
243 pub first_timestamp: i64,
244 pub last_timestamp: i64,
245 pub count: usize,
246 pub checksum: u32,
247}
248
249impl CompressedBlock {
250 pub fn compression_ratio(&self) -> f64 {
252 let uncompressed_size = self.count * 16;
253 if self.data.is_empty() {
254 return 1.0;
255 }
256 uncompressed_size as f64 / self.data.len() as f64
257 }
258
259 pub fn verify_checksum(&self) -> bool {
261 crc32fast::hash(&self.data) == self.checksum
262 }
263}
264
265#[allow(dead_code)]
271pub struct Decompressor<'a> {
272 reader: BitReader<'a>,
273 first_timestamp: i64,
274 prev_timestamp: i64,
275 prev_delta: i64,
276 prev_value_bits: u64,
277 prev_leading_zeros: u8,
278 prev_trailing_zeros: u8,
279 remaining: usize,
280 first_read: bool,
281}
282
283impl<'a> Decompressor<'a> {
284 pub fn new(block: &'a CompressedBlock) -> Self {
285 Self {
286 reader: BitReader::new(&block.data),
287 first_timestamp: block.first_timestamp,
288 prev_timestamp: 0,
289 prev_delta: 0,
290 prev_value_bits: 0,
291 prev_leading_zeros: 0,
292 prev_trailing_zeros: 0,
293 remaining: block.count,
294 first_read: true,
295 }
296 }
297
298 pub fn next(&mut self) -> Option<DataPoint> {
300 if self.remaining == 0 {
301 return None;
302 }
303
304 self.remaining -= 1;
305
306 if self.first_read {
307 self.first_read = false;
308 let timestamp = self.reader.read_bits(64)? as i64;
309 let value_bits = self.reader.read_bits(64)?;
310 self.prev_timestamp = timestamp;
311 self.prev_value_bits = value_bits;
312
313 return Some(DataPoint {
314 timestamp: DateTime::from_timestamp_millis(timestamp)?,
315 value: f64::from_bits(value_bits),
316 });
317 }
318
319 let timestamp = self.decompress_timestamp()?;
320 let value_bits = self.decompress_value()?;
321
322 self.prev_timestamp = timestamp;
323 self.prev_value_bits = value_bits;
324
325 Some(DataPoint {
326 timestamp: DateTime::from_timestamp_millis(timestamp)?,
327 value: f64::from_bits(value_bits),
328 })
329 }
330
331 fn decompress_timestamp(&mut self) -> Option<i64> {
332 let delta_of_delta = if !self.reader.read_bit()? {
333 0i64
334 } else if !self.reader.read_bit()? {
335 self.reader.read_bits(7)? as i64 - 63
336 } else if !self.reader.read_bit()? {
337 self.reader.read_bits(9)? as i64 - 255
338 } else if !self.reader.read_bit()? {
339 self.reader.read_bits(12)? as i64 - 2047
340 } else {
341 self.reader.read_bits(64)? as i64
342 };
343
344 self.prev_delta += delta_of_delta;
345 Some(self.prev_timestamp + self.prev_delta)
346 }
347
348 fn decompress_value(&mut self) -> Option<u64> {
349 if !self.reader.read_bit()? {
350 return Some(self.prev_value_bits);
351 }
352
353 let xor = if !self.reader.read_bit()? {
354 let meaningful_bits = 64 - self.prev_leading_zeros - self.prev_trailing_zeros;
355 let value = self.reader.read_bits(meaningful_bits)?;
356 value << self.prev_trailing_zeros
357 } else {
358 let leading_zeros = self.reader.read_bits(6)? as u8;
359 let meaningful_bits = self.reader.read_bits(6)? as u8;
360 let trailing_zeros = 64 - leading_zeros - meaningful_bits;
361
362 self.prev_leading_zeros = leading_zeros;
363 self.prev_trailing_zeros = trailing_zeros;
364
365 let value = self.reader.read_bits(meaningful_bits)?;
366 value << trailing_zeros
367 };
368
369 Some(self.prev_value_bits ^ xor)
370 }
371
372 pub fn decompress_all(&mut self) -> Vec<DataPoint> {
374 let mut points = Vec::with_capacity(self.remaining);
375 while let Some(point) = self.next() {
376 points.push(point);
377 }
378 points
379 }
380}
381
382#[cfg(test)]
387mod tests {
388 use super::*;
389 use chrono::Duration;
390
391 #[test]
392 fn test_compress_decompress() {
393 let mut compressor = Compressor::new();
394
395 let base_time = Utc::now();
396 let points: Vec<DataPoint> = (0..100)
397 .map(|i| DataPoint {
398 timestamp: base_time + Duration::seconds(i),
399 value: 42.0 + (i as f64 * 0.1),
400 })
401 .collect();
402
403 for point in &points {
404 compressor.compress(point);
405 }
406
407 let block = compressor.finish();
408 assert!(block.compression_ratio() > 1.0);
409
410 let mut decompressor = Decompressor::new(&block);
411 let decompressed = decompressor.decompress_all();
412
413 assert_eq!(decompressed.len(), points.len());
414 for (original, decoded) in points.iter().zip(decompressed.iter()) {
415 assert_eq!(original.value, decoded.value);
416 }
417 }
418
419 #[test]
420 fn test_compression_ratio() {
421 let mut compressor = Compressor::new();
422
423 let base_time = Utc::now();
424 for i in 0..1000 {
425 compressor.compress(&DataPoint {
426 timestamp: base_time + Duration::seconds(i),
427 value: 100.0 + (i as f64 % 10.0),
428 });
429 }
430
431 let block = compressor.finish();
432 let ratio = block.compression_ratio();
433
434 assert!(ratio > 2.0, "Expected compression ratio > 2, got {}", ratio);
435 }
436}