1use crate::error::{TsdbError, TsdbResult};
20use serde::{Deserialize, Serialize};
21
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
28pub struct RleRun {
29 pub start_timestamp: i64,
31 pub end_timestamp: i64,
33 pub value: f64,
39 pub count: u32,
41}
42
43impl RleRun {
44 #[inline]
46 fn values_equal(a: f64, b: f64) -> bool {
47 a.to_bits() == b.to_bits()
48 }
49}
50
51pub struct RleEncoder {
60 current_value: Option<f64>,
61 run_length: u32,
62 current_start: i64,
63 current_last: i64,
64 encoded: Vec<RleRun>,
65}
66
67impl RleEncoder {
68 pub fn new() -> Self {
70 Self {
71 current_value: None,
72 run_length: 0,
73 current_start: 0,
74 current_last: 0,
75 encoded: Vec::new(),
76 }
77 }
78
79 pub fn push(&mut self, timestamp: i64, value: f64) -> TsdbResult<()> {
83 match self.current_value {
84 None => {
85 self.current_value = Some(value);
87 self.run_length = 1;
88 self.current_start = timestamp;
89 self.current_last = timestamp;
90 }
91 Some(prev) if RleRun::values_equal(prev, value) => {
92 if timestamp < self.current_last {
94 return Err(TsdbError::Compression(format!(
95 "RLE: timestamps must be non-decreasing, got {} after {}",
96 timestamp, self.current_last
97 )));
98 }
99 self.run_length += 1;
100 self.current_last = timestamp;
101 }
102 Some(prev) => {
103 if timestamp < self.current_last {
105 return Err(TsdbError::Compression(format!(
106 "RLE: timestamps must be non-decreasing, got {} after {}",
107 timestamp, self.current_last
108 )));
109 }
110 self.encoded.push(RleRun {
111 start_timestamp: self.current_start,
112 end_timestamp: self.current_last,
113 value: prev,
114 count: self.run_length,
115 });
116 self.current_value = Some(value);
117 self.run_length = 1;
118 self.current_start = timestamp;
119 self.current_last = timestamp;
120 }
121 }
122 Ok(())
123 }
124
125 pub fn finish(mut self) -> Vec<RleRun> {
127 if let Some(v) = self.current_value {
128 self.encoded.push(RleRun {
129 start_timestamp: self.current_start,
130 end_timestamp: self.current_last,
131 value: v,
132 count: self.run_length,
133 });
134 }
135 self.encoded
136 }
137}
138
139impl Default for RleEncoder {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145pub fn rle_encode(data: &[(i64, f64)]) -> TsdbResult<Vec<RleRun>> {
154 let mut encoder = RleEncoder::new();
155 for &(ts, val) in data {
156 encoder.push(ts, val)?;
157 }
158 Ok(encoder.finish())
159}
160
161pub fn rle_decode(runs: &[RleRun]) -> Vec<(i64, f64)> {
171 let total: usize = runs.iter().map(|r| r.count as usize).sum();
172 let mut out = Vec::with_capacity(total);
173
174 for run in runs {
175 if run.count == 0 {
176 continue;
177 }
178 if run.count == 1 {
179 out.push((run.start_timestamp, run.value));
180 } else {
181 let span = run.end_timestamp - run.start_timestamp;
183 let step = span / (run.count as i64 - 1);
184 for i in 0..run.count as i64 {
185 let ts = if i == run.count as i64 - 1 {
186 run.end_timestamp
188 } else {
189 run.start_timestamp + i * step
190 };
191 out.push((ts, run.value));
192 }
193 }
194 }
195 out
196}
197
198#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
204pub struct RleBlock {
205 pub runs: Vec<RleRun>,
207 pub total_samples: u64,
209}
210
211impl RleBlock {
212 pub fn from_data(data: &[(i64, f64)]) -> TsdbResult<Self> {
214 let runs = rle_encode(data)?;
215 let total_samples = data.len() as u64;
216 Ok(Self {
217 runs,
218 total_samples,
219 })
220 }
221
222 pub fn decode(&self) -> Vec<(i64, f64)> {
224 rle_decode(&self.runs)
225 }
226
227 pub fn compression_ratio(&self) -> f64 {
232 let original_bytes = self.total_samples as f64 * 16.0;
233 let encoded_bytes = self.runs.len() as f64 * 28.0 + 8.0; if encoded_bytes == 0.0 {
235 1.0
236 } else {
237 original_bytes / encoded_bytes
238 }
239 }
240
241 pub fn run_count(&self) -> usize {
243 self.runs.len()
244 }
245}
246
247#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn test_empty_encode_decode() {
257 let runs = rle_encode(&[]).expect("encode failed");
258 assert!(runs.is_empty());
259 let decoded = rle_decode(&runs);
260 assert!(decoded.is_empty());
261 }
262
263 #[test]
264 fn test_single_sample() {
265 let data = vec![(1000i64, 42.0f64)];
266 let runs = rle_encode(&data).expect("encode");
267 assert_eq!(runs.len(), 1);
268 assert_eq!(runs[0].count, 1);
269 assert_eq!(runs[0].value, 42.0);
270 let decoded = rle_decode(&runs);
271 assert_eq!(decoded, data);
272 }
273
274 #[test]
275 fn test_constant_series_compresses_to_one_run() {
276 let data: Vec<(i64, f64)> = (0..1000).map(|i| (i as i64 * 1000, 7.5)).collect();
277 let runs = rle_encode(&data).expect("encode");
278 assert_eq!(
279 runs.len(),
280 1,
281 "constant series should produce exactly one run"
282 );
283 assert_eq!(runs[0].count, 1000);
284 assert_eq!(runs[0].value, 7.5);
285 assert_eq!(runs[0].start_timestamp, 0);
286 assert_eq!(runs[0].end_timestamp, 999 * 1000);
287 }
288
289 #[test]
290 fn test_alternating_values_round_trip() {
291 let data: Vec<(i64, f64)> = (0..10)
292 .map(|i| (i as i64 * 100, if i % 2 == 0 { 0.0 } else { 1.0 }))
293 .collect();
294 let runs = rle_encode(&data).expect("encode");
295 assert_eq!(runs.len(), 10);
297 let decoded = rle_decode(&runs);
298 assert_eq!(decoded, data);
299 }
300
301 #[test]
302 fn test_step_function() {
303 let data: Vec<(i64, f64)> = vec![
305 (0, 0.0),
306 (1, 0.0),
307 (2, 0.0),
308 (3, 1.0),
309 (4, 1.0),
310 (5, 1.0),
311 (6, 0.0),
312 (7, 0.0),
313 (8, 2.0),
314 (9, 2.0),
315 ];
316 let runs = rle_encode(&data).expect("encode");
317 assert_eq!(runs.len(), 4);
318 assert_eq!(runs[0].count, 3);
319 assert_eq!(runs[1].count, 3);
320 assert_eq!(runs[2].count, 2);
321 assert_eq!(runs[3].count, 2);
322 let decoded = rle_decode(&runs);
323 assert_eq!(decoded, data);
324 }
325
326 #[test]
327 fn test_nan_not_coalesced() {
328 let nan1 = f64::from_bits(0x7FF8_0000_0000_0001);
330 let nan2 = f64::from_bits(0x7FF8_0000_0000_0002);
331 let data = vec![(0i64, nan1), (1000, nan2)];
332 let runs = rle_encode(&data).expect("encode");
333 assert_eq!(runs.len(), 2, "different NaN payloads should not be merged");
334 }
335
336 #[test]
337 fn test_positive_zero_vs_negative_zero() {
338 let pos_zero = 0.0f64;
340 let neg_zero = -0.0f64;
341 assert_ne!(pos_zero.to_bits(), neg_zero.to_bits());
342 let data = vec![(0i64, pos_zero), (1000, neg_zero)];
343 let runs = rle_encode(&data).expect("encode");
344 assert_eq!(runs.len(), 2);
345 }
346
347 #[test]
348 fn test_out_of_order_timestamps_error() {
349 let mut enc = RleEncoder::new();
350 enc.push(2000, 1.0).expect("push ok");
351 enc.push(1000, 1.0)
352 .expect_err("should fail: ts goes backward");
353 }
354
355 #[test]
356 fn test_rle_block_compression_ratio() {
357 let data: Vec<(i64, f64)> = (0..10_000)
358 .map(|i| (i as i64 * 100, (i / 100) as f64))
359 .collect();
360 let block = RleBlock::from_data(&data).expect("build block");
361 assert_eq!(block.run_count(), 100); let ratio = block.compression_ratio();
363 assert!(
364 ratio > 1.0,
365 "should have positive compression: {:.2}",
366 ratio
367 );
368 }
369
370 #[test]
371 fn test_rle_block_decode_round_trip() {
372 let data: Vec<(i64, f64)> = vec![
373 (0, 10.0),
374 (1000, 10.0),
375 (2000, 10.0),
376 (3000, 20.0),
377 (4000, 20.0),
378 (5000, 30.0),
379 ];
380 let block = RleBlock::from_data(&data).expect("build block");
381 let decoded = block.decode();
382 assert_eq!(decoded, data);
383 }
384
385 #[test]
386 fn test_encoder_default() {
387 let enc = RleEncoder::default();
388 let runs = enc.finish();
389 assert!(runs.is_empty());
390 }
391
392 #[test]
393 fn test_large_constant_block() {
394 let n = 100_000usize;
395 let data: Vec<(i64, f64)> = (0..n)
396 .map(|i| (i as i64 * 10, std::f64::consts::PI))
397 .collect();
398 let runs = rle_encode(&data).expect("encode");
399 assert_eq!(runs.len(), 1);
400 assert_eq!(runs[0].count, n as u32);
401 }
402}