reddb_server/storage/timeseries/
compression.rs1pub fn delta_encode_timestamps(timestamps: &[u64]) -> Vec<i64> {
10 if timestamps.is_empty() {
11 return Vec::new();
12 }
13 let mut encoded = Vec::with_capacity(timestamps.len());
14 encoded.push(timestamps[0] as i64); if timestamps.len() == 1 {
17 return encoded;
18 }
19
20 let mut prev_delta = timestamps[1] as i64 - timestamps[0] as i64;
21 encoded.push(prev_delta); for i in 2..timestamps.len() {
24 let delta = timestamps[i] as i64 - timestamps[i - 1] as i64;
25 let dod = delta - prev_delta;
26 encoded.push(dod);
27 prev_delta = delta;
28 }
29
30 encoded
31}
32
33pub fn delta_decode_timestamps(encoded: &[i64]) -> Vec<u64> {
35 if encoded.is_empty() {
36 return Vec::new();
37 }
38 let mut decoded = Vec::with_capacity(encoded.len());
39 decoded.push(encoded[0] as u64); if encoded.len() == 1 {
42 return decoded;
43 }
44
45 let mut prev_delta = encoded[1];
46 decoded.push((encoded[0] + prev_delta) as u64); for val in encoded.iter().skip(2) {
49 let delta = prev_delta + val;
50 let value = *decoded.last().unwrap() as i64 + delta;
51 decoded.push(value as u64);
52 prev_delta = delta;
53 }
54
55 decoded
56}
57
58pub fn xor_encode_values(values: &[f64]) -> Vec<u64> {
61 if values.is_empty() {
62 return Vec::new();
63 }
64 let mut encoded = Vec::with_capacity(values.len());
65 encoded.push(values[0].to_bits());
66
67 for i in 1..values.len() {
68 let xor = values[i].to_bits() ^ values[i - 1].to_bits();
69 encoded.push(xor);
70 }
71
72 encoded
73}
74
75pub fn xor_decode_values(encoded: &[u64]) -> Vec<f64> {
77 if encoded.is_empty() {
78 return Vec::new();
79 }
80 let mut decoded = Vec::with_capacity(encoded.len());
81 decoded.push(f64::from_bits(encoded[0]));
82
83 for i in 1..encoded.len() {
84 let prev_bits = decoded[i - 1].to_bits();
85 decoded.push(f64::from_bits(prev_bits ^ encoded[i]));
86 }
87
88 decoded
89}
90
91pub fn t64_encode(values: &[i64]) -> (Vec<u8>, usize) {
110 if values.is_empty() {
111 return (Vec::new(), 0);
112 }
113 let min = *values.iter().min().unwrap();
114 let max = *values.iter().max().unwrap();
115 let range = (max as i128) - (min as i128);
116 let bit_width: u8 = if range <= 0 {
117 0
118 } else {
119 let ceil_bits = 128 - (range as u128).leading_zeros() as u8;
120 ceil_bits.min(64)
121 };
122
123 let mut out: Vec<u8> = Vec::with_capacity(17 + values.len() * 8);
124 out.extend_from_slice(&min.to_le_bytes());
125 out.extend_from_slice(&max.to_le_bytes());
126 out.push(bit_width);
127
128 if bit_width == 0 {
129 return (out, values.len());
130 }
131
132 let mut bit_buf: u128 = 0;
133 let mut bits_in_buf: u32 = 0;
134 for v in values {
135 let offset = (*v as i128 - min as i128) as u128;
136 bit_buf |= offset << bits_in_buf;
137 bits_in_buf += bit_width as u32;
138 while bits_in_buf >= 8 {
139 out.push(bit_buf as u8);
140 bit_buf >>= 8;
141 bits_in_buf -= 8;
142 }
143 }
144 if bits_in_buf > 0 {
145 out.push(bit_buf as u8);
146 }
147 (out, values.len())
148}
149
150pub fn t64_decode(bytes: &[u8], length: usize) -> Option<Vec<i64>> {
153 if length == 0 {
154 return Some(Vec::new());
155 }
156 if bytes.len() < 17 {
157 return None;
158 }
159 let min = i64::from_le_bytes(bytes[0..8].try_into().ok()?);
160 let _max = i64::from_le_bytes(bytes[8..16].try_into().ok()?);
161 let bit_width = bytes[16];
162 if bit_width == 0 {
163 return Some(vec![min; length]);
164 }
165 if bit_width > 64 {
166 return None;
167 }
168 let mut out = Vec::with_capacity(length);
169 let payload = &bytes[17..];
170 let mut bit_buf: u128 = 0;
171 let mut bits_in_buf: u32 = 0;
172 let mut byte_idx = 0usize;
173 let mask: u128 = if bit_width == 64 {
174 u64::MAX as u128
175 } else {
176 (1u128 << bit_width) - 1
177 };
178 for _ in 0..length {
179 while bits_in_buf < bit_width as u32 {
180 if byte_idx >= payload.len() {
181 return None;
182 }
183 bit_buf |= (payload[byte_idx] as u128) << bits_in_buf;
184 byte_idx += 1;
185 bits_in_buf += 8;
186 }
187 let offset = bit_buf & mask;
188 bit_buf >>= bit_width as u32;
189 bits_in_buf -= bit_width as u32;
190 let v = (min as i128).saturating_add(offset as i128) as i64;
191 out.push(v);
192 }
193 Some(out)
194}
195
196pub fn zstd_compress(bytes: &[u8]) -> Vec<u8> {
206 zstd_compress_at(bytes, 3)
207}
208
209pub fn zstd_compress_at(bytes: &[u8], level: i32) -> Vec<u8> {
212 if bytes.len() < 64 {
213 let mut out = Vec::with_capacity(bytes.len() + 1);
216 out.push(0u8);
217 out.extend_from_slice(bytes);
218 return out;
219 }
220 let clamped = level.clamp(1, 22);
221 match zstd::bulk::compress(bytes, clamped) {
222 Ok(compressed) => {
223 let mut out = Vec::with_capacity(compressed.len() + 1);
224 out.push(1u8);
225 out.extend_from_slice(&compressed);
226 out
227 }
228 Err(_) => {
229 let mut out = Vec::with_capacity(bytes.len() + 1);
232 out.push(0u8);
233 out.extend_from_slice(bytes);
234 out
235 }
236 }
237}
238
239pub fn zstd_decompress(bytes: &[u8]) -> Option<Vec<u8>> {
242 if bytes.is_empty() {
243 return None;
244 }
245 match bytes[0] {
246 0 => Some(bytes[1..].to_vec()),
247 1 => zstd::bulk::decompress(&bytes[1..], 1 << 28).ok(),
248 _ => None,
249 }
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum TsIntCodec {
261 Raw,
263 DeltaOfDelta,
265 T64,
267}
268
269pub fn select_int_codec(values: &[i64]) -> TsIntCodec {
274 if values.len() < 4 {
275 return TsIntCodec::Raw;
276 }
277 let monotonic = values.windows(2).all(|w| w[1] >= w[0]);
279 if monotonic {
280 return TsIntCodec::DeltaOfDelta;
281 }
282 let min = *values.iter().min().unwrap();
284 let max = *values.iter().max().unwrap();
285 let range = (max as i128 - min as i128).max(0) as u128;
286 let bits = if range == 0 {
287 0
288 } else {
289 128 - range.leading_zeros()
290 };
291 if bits <= 20 {
292 return TsIntCodec::T64;
293 }
294 TsIntCodec::Raw
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_delta_encode_decode() {
303 let timestamps: Vec<u64> = vec![1000, 1060, 1120, 1180, 1240, 1300];
304 let encoded = delta_encode_timestamps(×tamps);
305 let decoded = delta_decode_timestamps(&encoded);
306 assert_eq!(timestamps, decoded);
307 }
308
309 #[test]
310 fn test_delta_irregular() {
311 let timestamps: Vec<u64> = vec![100, 200, 250, 400, 405, 500];
312 let encoded = delta_encode_timestamps(×tamps);
313 let decoded = delta_decode_timestamps(&encoded);
314 assert_eq!(timestamps, decoded);
315
316 }
320
321 #[test]
322 fn test_delta_single() {
323 let timestamps: Vec<u64> = vec![42];
324 let encoded = delta_encode_timestamps(×tamps);
325 let decoded = delta_decode_timestamps(&encoded);
326 assert_eq!(timestamps, decoded);
327 }
328
329 #[test]
330 fn test_delta_empty() {
331 let timestamps: Vec<u64> = vec![];
332 let encoded = delta_encode_timestamps(×tamps);
333 let decoded = delta_decode_timestamps(&encoded);
334 assert!(decoded.is_empty());
335 }
336
337 #[test]
338 fn test_delta_compression_ratio() {
339 let timestamps: Vec<u64> = (0..1000).map(|i| 1_000_000 + i * 1000).collect();
341 let encoded = delta_encode_timestamps(×tamps);
342
343 for &dod in &encoded[2..] {
345 assert_eq!(dod, 0, "Regular intervals should have zero delta-of-delta");
346 }
347 }
348
349 #[test]
350 fn test_xor_encode_decode() {
351 let values = vec![72.5, 72.6, 72.55, 72.7, 72.65, 72.8];
352 let encoded = xor_encode_values(&values);
353 let decoded = xor_decode_values(&encoded);
354 assert_eq!(values, decoded);
355 }
356
357 #[test]
358 fn test_xor_compression_similar_values() {
359 let values: Vec<f64> = (0..100).map(|i| 95.0 + (i as f64) * 0.01).collect();
360 let encoded = xor_encode_values(&values);
361
362 let zero_xors = encoded[1..].iter().filter(|&&x| x == 0).count();
364 let _ = zero_xors;
366 }
367
368 #[test]
369 fn test_xor_empty() {
370 assert!(xor_encode_values(&[]).is_empty());
371 assert!(xor_decode_values(&[]).is_empty());
372 }
373
374 #[test]
377 fn t64_round_trips_narrow_range() {
378 let values: Vec<i64> = (0..1024).map(|i| 1000 + (i % 128)).collect();
379 let (bytes, len) = t64_encode(&values);
380 let decoded = t64_decode(&bytes, len).unwrap();
381 assert_eq!(values, decoded);
382 assert!(bytes.len() < values.len() * 8 / 4);
385 }
386
387 #[test]
388 fn t64_handles_constant_sequence_with_zero_bit_width() {
389 let values = vec![42i64; 100];
390 let (bytes, len) = t64_encode(&values);
391 assert_eq!(bytes.len(), 17); let decoded = t64_decode(&bytes, len).unwrap();
393 assert_eq!(values, decoded);
394 }
395
396 #[test]
397 fn t64_empty_returns_empty() {
398 let (bytes, len) = t64_encode(&[]);
399 assert!(bytes.is_empty());
400 assert_eq!(len, 0);
401 assert_eq!(t64_decode(&[], 0).unwrap(), Vec::<i64>::new());
402 }
403
404 #[test]
405 fn t64_handles_negative_values() {
406 let values = vec![-1000, -500, 0, 500, 1000, -750, 250];
407 let (bytes, len) = t64_encode(&values);
408 let decoded = t64_decode(&bytes, len).unwrap();
409 assert_eq!(values, decoded);
410 }
411
412 #[test]
413 fn t64_rejects_corrupted_payload() {
414 let (bytes, _) = t64_encode(&[1i64, 2, 3, 4]);
416 assert!(t64_decode(&bytes[..18], 100).is_none());
417 }
418
419 #[test]
422 fn zstd_small_input_passes_through_uncompressed() {
423 let data = b"short";
424 let compressed = zstd_compress(data);
425 assert_eq!(compressed[0], 0);
427 assert_eq!(&compressed[1..], data);
428 assert_eq!(zstd_decompress(&compressed).unwrap(), data.to_vec());
429 }
430
431 #[test]
432 fn zstd_large_input_compresses_and_round_trips() {
433 let data: Vec<u8> = (0..4096).map(|i| (i % 8) as u8).collect();
434 let compressed = zstd_compress(&data);
435 assert_eq!(compressed[0], 1);
436 assert!(
437 compressed.len() < data.len() / 2,
438 "zstd should compress ≥2x on repetitive input"
439 );
440 let decompressed = zstd_decompress(&compressed).unwrap();
441 assert_eq!(decompressed, data);
442 }
443
444 #[test]
445 fn zstd_decompress_rejects_unknown_marker() {
446 assert!(zstd_decompress(&[0xff, 0, 1, 2]).is_none());
447 assert!(zstd_decompress(&[]).is_none());
448 }
449
450 #[test]
453 fn select_int_codec_picks_delta_for_monotonic_timestamps() {
454 let ts: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 1000).collect();
455 assert_eq!(select_int_codec(&ts), TsIntCodec::DeltaOfDelta);
456 }
457
458 #[test]
459 fn select_int_codec_picks_t64_for_narrow_range() {
460 let vals: Vec<i64> = (0..500).map(|i| ((i * 13 + 7) % 1024) as i64).collect();
463 assert_eq!(select_int_codec(&vals), TsIntCodec::T64);
464 }
465
466 #[test]
467 fn select_int_codec_falls_back_to_raw_on_wide_non_monotonic() {
468 let vals = vec![1_000_000_000i64, -1, 500_000_000, 42, i64::MAX / 2];
469 assert_eq!(select_int_codec(&vals), TsIntCodec::Raw);
470 }
471
472 #[test]
473 fn select_int_codec_returns_raw_for_tiny_inputs() {
474 assert_eq!(select_int_codec(&[]), TsIntCodec::Raw);
475 assert_eq!(select_int_codec(&[1, 2, 3]), TsIntCodec::Raw);
476 }
477}