1use crate::compression::gorilla::{gorilla_decode, gorilla_encode};
24use crate::compression::rle::{rle_decode, rle_encode, RleRun};
25use crate::error::{TsdbError, TsdbResult};
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34pub enum CompressionAlgorithm {
35 Raw,
37 Gorilla,
39 Rle,
41 Dictionary,
43}
44
45impl std::fmt::Display for CompressionAlgorithm {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 match self {
48 Self::Raw => write!(f, "raw"),
49 Self::Gorilla => write!(f, "gorilla"),
50 Self::Rle => write!(f, "rle"),
51 Self::Dictionary => write!(f, "dictionary"),
52 }
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct CompressedBlock {
63 pub algorithm: CompressionAlgorithm,
65 pub data: Vec<u8>,
67 pub sample_count: u32,
69 pub min_timestamp: i64,
71 pub max_timestamp: i64,
73}
74
75impl CompressedBlock {
76 pub fn compression_ratio(&self) -> f64 {
78 let original = self.sample_count as f64 * 16.0; let compressed = self.data.len() as f64;
80 if compressed == 0.0 {
81 1.0
82 } else {
83 original / compressed
84 }
85 }
86
87 pub fn decode(&self) -> TsdbResult<Vec<(i64, f64)>> {
89 match self.algorithm {
90 CompressionAlgorithm::Raw => decode_raw(&self.data),
91 CompressionAlgorithm::Gorilla => gorilla_decode(&self.data),
92 CompressionAlgorithm::Rle => {
93 let runs = decode_rle_runs(&self.data)?;
94 Ok(rle_decode(&runs))
95 }
96 CompressionAlgorithm::Dictionary => Err(TsdbError::Decompression(
97 "Dictionary blocks require string-aware decoding; use DictionaryBlock directly"
98 .to_string(),
99 )),
100 }
101 }
102}
103
104#[derive(Debug, Default)]
110pub struct SampleStats {
111 pub total: usize,
112 pub unique_values: usize,
113 pub dod_sum: i64,
115 pub zero_xor_count: usize,
117}
118
119pub struct AdaptiveCompressor {
133 samples: Vec<(i64, f64)>,
134 forced: Option<CompressionAlgorithm>,
136}
137
138impl AdaptiveCompressor {
139 pub fn new() -> Self {
141 Self {
142 samples: Vec::new(),
143 forced: None,
144 }
145 }
146
147 pub fn with_algorithm(mut self, algo: CompressionAlgorithm) -> Self {
149 self.forced = Some(algo);
150 self
151 }
152
153 pub fn push(&mut self, timestamp: i64, value: f64) {
155 self.samples.push((timestamp, value));
156 }
157
158 pub fn extend(&mut self, samples: &[(i64, f64)]) {
160 self.samples.extend_from_slice(samples);
161 }
162
163 fn analyse(&self) -> SampleStats {
166 let mut stats = SampleStats {
167 total: self.samples.len(),
168 ..Default::default()
169 };
170 if self.samples.is_empty() {
171 return stats;
172 }
173
174 let mut bits: Vec<u64> = self.samples.iter().map(|&(_, v)| v.to_bits()).collect();
176 bits.sort_unstable();
177 bits.dedup();
178 stats.unique_values = bits.len();
179
180 let mut prev_bits = self.samples[0].1.to_bits();
182 let mut prev_ts = self.samples[0].0;
183 let mut prev_delta = 0i64;
184 for &(ts, val) in &self.samples[1..] {
185 let cur_bits = val.to_bits();
186 if cur_bits == prev_bits {
187 stats.zero_xor_count += 1;
188 }
189 prev_bits = cur_bits;
190
191 let delta = ts - prev_ts;
192 let dod = delta - prev_delta;
193 stats.dod_sum = stats.dod_sum.saturating_add(dod.abs());
194 prev_delta = delta;
195 prev_ts = ts;
196 }
197 stats
198 }
199
200 fn select_algorithm(&self, stats: &SampleStats) -> CompressionAlgorithm {
202 if stats.total == 0 {
203 return CompressionAlgorithm::Raw;
204 }
205
206 let repeat_ratio = stats.zero_xor_count as f64 / stats.total.max(1) as f64;
208 if repeat_ratio >= 0.7 {
209 return CompressionAlgorithm::Rle;
210 }
211
212 let cardinality_ratio = stats.unique_values as f64 / stats.total as f64;
214 if cardinality_ratio < 0.05 && stats.total > 10 {
215 return CompressionAlgorithm::Rle;
216 }
217
218 CompressionAlgorithm::Gorilla
220 }
221
222 pub fn finish(self) -> TsdbResult<CompressedBlock> {
226 if self.samples.is_empty() {
227 return Ok(CompressedBlock {
228 algorithm: CompressionAlgorithm::Raw,
229 data: Vec::new(),
230 sample_count: 0,
231 min_timestamp: 0,
232 max_timestamp: 0,
233 });
234 }
235
236 let min_ts = self.samples.first().map(|&(t, _)| t).unwrap_or(0);
237 let max_ts = self.samples.last().map(|&(t, _)| t).unwrap_or(0);
238 let sample_count = self.samples.len() as u32;
239
240 let algo = self.forced.unwrap_or_else(|| {
241 let stats = self.analyse();
242 self.select_algorithm(&stats)
243 });
244
245 let data = match algo {
246 CompressionAlgorithm::Raw => encode_raw(&self.samples),
247 CompressionAlgorithm::Gorilla => gorilla_encode(&self.samples)?,
248 CompressionAlgorithm::Rle => {
249 let runs = rle_encode(&self.samples)?;
250 encode_rle_runs(&runs)?
251 }
252 CompressionAlgorithm::Dictionary => {
253 return Err(TsdbError::Compression(
254 "Dictionary compression requires string data; use DictionaryEncoder instead"
255 .to_string(),
256 ));
257 }
258 };
259
260 Ok(CompressedBlock {
261 algorithm: algo,
262 data,
263 sample_count,
264 min_timestamp: min_ts,
265 max_timestamp: max_ts,
266 })
267 }
268}
269
270impl Default for AdaptiveCompressor {
271 fn default() -> Self {
272 Self::new()
273 }
274}
275
276fn encode_raw(data: &[(i64, f64)]) -> Vec<u8> {
281 let mut out = Vec::with_capacity(data.len() * 16);
282 for &(ts, val) in data {
283 out.extend_from_slice(&ts.to_le_bytes());
284 out.extend_from_slice(&val.to_bits().to_le_bytes());
285 }
286 out
287}
288
289fn decode_raw(data: &[u8]) -> TsdbResult<Vec<(i64, f64)>> {
290 if data.len() % 16 != 0 {
291 return Err(TsdbError::Decompression(format!(
292 "Raw block length {} is not a multiple of 16",
293 data.len()
294 )));
295 }
296 let mut out = Vec::with_capacity(data.len() / 16);
297 for chunk in data.chunks_exact(16) {
298 let ts = i64::from_le_bytes([
299 chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
300 ]);
301 let val_bits = u64::from_le_bytes([
302 chunk[8], chunk[9], chunk[10], chunk[11], chunk[12], chunk[13], chunk[14], chunk[15],
303 ]);
304 out.push((ts, f64::from_bits(val_bits)));
305 }
306 Ok(out)
307}
308
309fn encode_rle_runs(runs: &[RleRun]) -> TsdbResult<Vec<u8>> {
319 let run_count = runs.len() as u32;
320 let mut out = Vec::with_capacity(4 + runs.len() * 28);
321 out.extend_from_slice(&run_count.to_le_bytes());
322 for run in runs {
323 out.extend_from_slice(&run.start_timestamp.to_le_bytes());
324 out.extend_from_slice(&run.end_timestamp.to_le_bytes());
325 out.extend_from_slice(&run.value.to_bits().to_le_bytes());
326 out.extend_from_slice(&run.count.to_le_bytes());
327 }
328 Ok(out)
329}
330
331fn decode_rle_runs(data: &[u8]) -> TsdbResult<Vec<RleRun>> {
332 if data.len() < 4 {
333 return Err(TsdbError::Decompression(
334 "RLE binary: data too short for run count".to_string(),
335 ));
336 }
337 let run_count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
338 let expected_len = 4 + run_count * 28;
339 if data.len() < expected_len {
340 return Err(TsdbError::Decompression(format!(
341 "RLE binary: expected {} bytes for {} runs, got {}",
342 expected_len,
343 run_count,
344 data.len()
345 )));
346 }
347 let mut runs = Vec::with_capacity(run_count);
348 for i in 0..run_count {
349 let off = 4 + i * 28;
350 let start_timestamp = i64::from_le_bytes([
351 data[off],
352 data[off + 1],
353 data[off + 2],
354 data[off + 3],
355 data[off + 4],
356 data[off + 5],
357 data[off + 6],
358 data[off + 7],
359 ]);
360 let end_timestamp = i64::from_le_bytes([
361 data[off + 8],
362 data[off + 9],
363 data[off + 10],
364 data[off + 11],
365 data[off + 12],
366 data[off + 13],
367 data[off + 14],
368 data[off + 15],
369 ]);
370 let val_bits = u64::from_le_bytes([
371 data[off + 16],
372 data[off + 17],
373 data[off + 18],
374 data[off + 19],
375 data[off + 20],
376 data[off + 21],
377 data[off + 22],
378 data[off + 23],
379 ]);
380 let count = u32::from_le_bytes([
381 data[off + 24],
382 data[off + 25],
383 data[off + 26],
384 data[off + 27],
385 ]);
386 runs.push(RleRun {
387 start_timestamp,
388 end_timestamp,
389 value: f64::from_bits(val_bits),
390 count,
391 });
392 }
393 Ok(runs)
394}
395
396#[cfg(test)]
401mod tests {
402 use super::*;
403
404 fn make_regular(n: usize, interval_ms: i64, base_val: f64) -> Vec<(i64, f64)> {
405 (0..n)
406 .map(|i| (i as i64 * interval_ms, base_val + (i % 5) as f64 * 0.01))
407 .collect()
408 }
409
410 #[test]
411 fn test_empty_block() {
412 let block = AdaptiveCompressor::new().finish().expect("finish");
413 assert_eq!(block.sample_count, 0);
414 assert!(block.decode().expect("decode").is_empty());
415 }
416
417 #[test]
418 fn test_constant_data_selects_rle() {
419 let data: Vec<(i64, f64)> = (0..200).map(|i| (i as i64 * 1000, 5.0)).collect();
420 let mut comp = AdaptiveCompressor::new();
421 comp.extend(&data);
422 let block = comp.finish().expect("finish");
423 assert_eq!(block.algorithm, CompressionAlgorithm::Rle);
425 let decoded = block.decode().expect("decode");
426 assert_eq!(decoded, data);
427 }
428
429 #[test]
430 fn test_sensor_data_selects_gorilla() {
431 let data: Vec<(i64, f64)> = (0..200)
435 .map(|i| {
436 let ts = i as i64 * 1000;
437 let val = (i as f64 * 0.123456).cos() * 100.0 + 50.0;
439 (ts, val)
440 })
441 .collect();
442 let mut comp = AdaptiveCompressor::new();
443 comp.extend(&data);
444 let block = comp.finish().expect("finish");
445 assert_eq!(block.algorithm, CompressionAlgorithm::Gorilla);
446 let decoded = block.decode().expect("decode");
447 assert_eq!(decoded.len(), data.len());
448 for (orig, dec) in data.iter().zip(decoded.iter()) {
449 assert_eq!(orig.0, dec.0);
450 assert_eq!(orig.1.to_bits(), dec.1.to_bits());
451 }
452 }
453
454 #[test]
455 fn test_forced_algorithm_gorilla() {
456 let data: Vec<(i64, f64)> = (0..50).map(|i| (i as i64 * 1000, 7.0)).collect();
457 let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Gorilla);
458 comp.extend(&data);
459 let block = comp.finish().expect("finish");
460 assert_eq!(block.algorithm, CompressionAlgorithm::Gorilla);
461 let decoded = block.decode().expect("decode");
462 assert_eq!(decoded, data);
463 }
464
465 #[test]
466 fn test_forced_algorithm_rle() {
467 let data = make_regular(50, 500, 1.5);
468 let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Rle);
469 comp.extend(&data);
470 let block = comp.finish().expect("finish");
471 assert_eq!(block.algorithm, CompressionAlgorithm::Rle);
472 let decoded = block.decode().expect("decode");
473 assert_eq!(decoded, data);
474 }
475
476 #[test]
477 fn test_forced_algorithm_raw() {
478 let data: Vec<(i64, f64)> = (0..10).map(|i| (i as i64, i as f64)).collect();
479 let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Raw);
480 comp.extend(&data);
481 let block = comp.finish().expect("finish");
482 assert_eq!(block.algorithm, CompressionAlgorithm::Raw);
483 let decoded = block.decode().expect("decode");
484 assert_eq!(decoded, data);
485 }
486
487 #[test]
488 fn test_raw_encode_decode_round_trip() {
489 let data: Vec<(i64, f64)> = vec![(0, 1.0), (1000, 2.5), (2000, -std::f64::consts::PI)];
490 let raw = encode_raw(&data);
491 let decoded = decode_raw(&raw).expect("decode");
492 assert_eq!(decoded, data);
493 }
494
495 #[test]
496 fn test_rle_binary_round_trip() {
497 let data: Vec<(i64, f64)> = vec![(0, 1.0), (1000, 1.0), (2000, 2.0), (3000, 2.0)];
498 let runs = rle_encode(&data).expect("rle encode");
499 let encoded = encode_rle_runs(&runs).expect("encode runs");
500 let decoded_runs = decode_rle_runs(&encoded).expect("decode runs");
501 assert_eq!(runs.len(), decoded_runs.len());
502 for (a, b) in runs.iter().zip(decoded_runs.iter()) {
503 assert_eq!(a.start_timestamp, b.start_timestamp);
504 assert_eq!(a.end_timestamp, b.end_timestamp);
505 assert_eq!(a.value.to_bits(), b.value.to_bits());
506 assert_eq!(a.count, b.count);
507 }
508 let decoded = rle_decode(&decoded_runs);
509 assert_eq!(decoded, data);
510 }
511
512 #[test]
513 fn test_compression_ratio() {
514 let data: Vec<(i64, f64)> = (0..1000).map(|i| (i as i64 * 1000, 5.0)).collect();
515 let mut comp = AdaptiveCompressor::new();
516 comp.extend(&data);
517 let block = comp.finish().expect("finish");
518 assert!(
519 block.compression_ratio() > 1.0,
520 "should have positive compression"
521 );
522 }
523
524 #[test]
525 fn test_metadata_fields() {
526 let data: Vec<(i64, f64)> = vec![(100, 1.0), (200, 2.0), (300, 3.0)];
527 let mut comp = AdaptiveCompressor::new();
528 comp.extend(&data);
529 let block = comp.finish().expect("finish");
530 assert_eq!(block.min_timestamp, 100);
531 assert_eq!(block.max_timestamp, 300);
532 assert_eq!(block.sample_count, 3);
533 }
534
535 #[test]
536 fn test_dictionary_forced_returns_error() {
537 let data = vec![(0i64, 1.0f64)];
538 let mut comp = AdaptiveCompressor::new().with_algorithm(CompressionAlgorithm::Dictionary);
539 comp.extend(&data);
540 let result = comp.finish();
541 assert!(result.is_err());
542 }
543
544 #[test]
545 fn test_analyse_stats_constant() {
546 let data: Vec<(i64, f64)> = (0..100).map(|i| (i as i64 * 1000, 42.0)).collect();
547 let mut comp = AdaptiveCompressor::new();
548 comp.extend(&data);
549 let stats = comp.analyse();
550 assert_eq!(stats.total, 100);
551 assert_eq!(stats.unique_values, 1);
552 assert_eq!(stats.zero_xor_count, 99); }
554}