1use crate::error::CodecError;
31
32#[derive(Debug)]
37pub(crate) struct BitWriter {
38 buf: Vec<u8>,
39 bit_pos: usize,
40}
41
42impl BitWriter {
43 pub(crate) fn new() -> Self {
44 Self {
45 buf: Vec::with_capacity(1024),
46 bit_pos: 0,
47 }
48 }
49
50 pub(crate) fn write_bit(&mut self, bit: bool) {
51 let byte_idx = self.bit_pos / 8;
52 let bit_idx = 7 - (self.bit_pos % 8);
53 if byte_idx >= self.buf.len() {
54 self.buf.push(0);
55 }
56 if bit {
57 self.buf[byte_idx] |= 1 << bit_idx;
58 }
59 self.bit_pos += 1;
60 }
61
62 pub(crate) fn write_bits(&mut self, value: u64, num_bits: usize) {
63 for i in (0..num_bits).rev() {
64 self.write_bit((value >> i) & 1 == 1);
65 }
66 }
67
68 pub(crate) fn as_bytes(&self) -> &[u8] {
69 &self.buf
70 }
71
72 pub(crate) fn bit_len(&self) -> usize {
73 self.bit_pos
74 }
75}
76
77pub(crate) struct BitReader<'a> {
78 buf: &'a [u8],
79 bit_pos: usize,
80}
81
82impl<'a> BitReader<'a> {
83 pub(crate) fn new(buf: &'a [u8]) -> Self {
84 Self { buf, bit_pos: 0 }
85 }
86
87 pub(crate) fn read_bit(&mut self) -> Result<bool, CodecError> {
88 let byte_idx = self.bit_pos / 8;
89 if byte_idx >= self.buf.len() {
90 return Err(CodecError::Truncated {
91 expected: byte_idx + 1,
92 actual: self.buf.len(),
93 });
94 }
95 let bit_idx = 7 - (self.bit_pos % 8);
96 let bit = (self.buf[byte_idx] >> bit_idx) & 1 == 1;
97 self.bit_pos += 1;
98 Ok(bit)
99 }
100
101 pub(crate) fn read_bits(&mut self, num_bits: usize) -> Result<u64, CodecError> {
102 let mut value = 0u64;
103 for _ in 0..num_bits {
104 value = (value << 1) | u64::from(self.read_bit()?);
105 }
106 Ok(value)
107 }
108}
109
110pub fn encode(values: &[i64]) -> Vec<u8> {
116 let count = values.len() as u32;
117 let mut out = Vec::with_capacity(20 + values.len() / 4);
118
119 out.extend_from_slice(&count.to_le_bytes());
120
121 if values.is_empty() {
122 return out;
123 }
124
125 out.extend_from_slice(&values[0].to_le_bytes());
126
127 if values.len() == 1 {
128 return out;
129 }
130
131 let first_delta = values[1].wrapping_sub(values[0]);
132 out.extend_from_slice(&first_delta.to_le_bytes());
133
134 if values.len() == 2 {
135 return out;
136 }
137
138 let mut bs = BitWriter::new();
139 let mut prev_delta = first_delta;
140
141 for i in 2..values.len() {
142 let delta = values[i].wrapping_sub(values[i - 1]);
143 let dod = delta.wrapping_sub(prev_delta);
144 encode_dod(&mut bs, dod);
145 prev_delta = delta;
146 }
147
148 out.extend_from_slice(bs.as_bytes());
149 out
150}
151
152pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
154 if data.len() < 4 {
155 return Err(CodecError::Truncated {
156 expected: 4,
157 actual: data.len(),
158 });
159 }
160
161 let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
162 detail: "invalid header".into(),
163 })?) as usize;
164
165 if count == 0 {
166 return Ok(Vec::new());
167 }
168
169 if data.len() < 12 {
170 return Err(CodecError::Truncated {
171 expected: 12,
172 actual: data.len(),
173 });
174 }
175
176 let first_value =
177 i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
178 detail: "invalid first value".into(),
179 })?);
180
181 let mut values = Vec::with_capacity(count);
182 values.push(first_value);
183
184 if count == 1 {
185 return Ok(values);
186 }
187
188 if data.len() < 20 {
189 return Err(CodecError::Truncated {
190 expected: 20,
191 actual: data.len(),
192 });
193 }
194
195 let first_delta =
196 i64::from_le_bytes(data[12..20].try_into().map_err(|_| CodecError::Corrupt {
197 detail: "invalid first delta".into(),
198 })?);
199 values.push(first_value.wrapping_add(first_delta));
200
201 if count == 2 {
202 return Ok(values);
203 }
204
205 let mut reader = BitReader::new(&data[20..]);
206 let mut prev_delta = first_delta;
207
208 for _ in 2..count {
209 let dod = decode_dod(&mut reader)?;
210 let delta = prev_delta.wrapping_add(dod);
211 let value = values[values.len() - 1].wrapping_add(delta);
212 values.push(value);
213 prev_delta = delta;
214 }
215
216 Ok(values)
217}
218
219fn encode_dod(bs: &mut BitWriter, dod: i64) {
224 if dod == 0 {
225 bs.write_bit(false);
226 } else if (-64..=63).contains(&dod) {
227 bs.write_bits(0b10, 2);
228 bs.write_bits((dod as u64) & 0x7F, 7);
229 } else if (-256..=255).contains(&dod) {
230 bs.write_bits(0b110, 3);
231 bs.write_bits((dod as u64) & 0x1FF, 9);
232 } else if (-2048..=2047).contains(&dod) {
233 bs.write_bits(0b1110, 4);
234 bs.write_bits((dod as u64) & 0xFFF, 12);
235 } else {
236 bs.write_bits(0b1111, 4);
237 bs.write_bits(dod as u64, 64);
238 }
239}
240
241fn decode_dod(reader: &mut BitReader<'_>) -> Result<i64, CodecError> {
242 let bit = reader.read_bit()?;
243 if !bit {
244 return Ok(0);
245 }
246
247 let bit2 = reader.read_bit()?;
248 if !bit2 {
249 let raw = reader.read_bits(7)? as i64;
250 return Ok(sign_extend(raw, 7));
251 }
252
253 let bit3 = reader.read_bit()?;
254 if !bit3 {
255 let raw = reader.read_bits(9)? as i64;
256 return Ok(sign_extend(raw, 9));
257 }
258
259 let bit4 = reader.read_bit()?;
260 if !bit4 {
261 let raw = reader.read_bits(12)? as i64;
262 return Ok(sign_extend(raw, 12));
263 }
264
265 let raw = reader.read_bits(64)?;
266 Ok(raw as i64)
267}
268
269fn sign_extend(value: i64, bits: u32) -> i64 {
270 let shift = 64 - bits;
271 (value << shift) >> shift
272}
273
274pub struct DoubleDeltaEncoder {
281 values: Vec<i64>,
282}
283
284impl DoubleDeltaEncoder {
285 pub fn new() -> Self {
286 Self {
287 values: Vec::with_capacity(4096),
288 }
289 }
290
291 pub fn push(&mut self, value: i64) {
293 self.values.push(value);
294 }
295
296 pub fn push_batch(&mut self, values: &[i64]) {
298 self.values.extend_from_slice(values);
299 }
300
301 pub fn count(&self) -> usize {
303 self.values.len()
304 }
305
306 pub fn finish(self) -> Vec<u8> {
308 encode(&self.values)
309 }
310}
311
312impl Default for DoubleDeltaEncoder {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318pub struct DoubleDeltaDecoder {
320 values: Vec<i64>,
321 pos: usize,
322}
323
324impl DoubleDeltaDecoder {
325 pub fn new(data: &[u8]) -> Result<Self, CodecError> {
327 let values = decode(data)?;
328 Ok(Self { values, pos: 0 })
329 }
330
331 pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
333 decode(data)
334 }
335
336 pub fn next_value(&mut self) -> Option<i64> {
338 if self.pos < self.values.len() {
339 let v = self.values[self.pos];
340 self.pos += 1;
341 Some(v)
342 } else {
343 None
344 }
345 }
346
347 pub fn remaining(&self) -> usize {
349 self.values.len() - self.pos
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 #[test]
358 fn empty_roundtrip() {
359 let encoded = encode(&[]);
360 let decoded = decode(&encoded).unwrap();
361 assert!(decoded.is_empty());
362 }
363
364 #[test]
365 fn single_value() {
366 let encoded = encode(&[1_700_000_000_000i64]);
367 let decoded = decode(&encoded).unwrap();
368 assert_eq!(decoded, vec![1_700_000_000_000i64]);
369 assert_eq!(encoded.len(), 12);
370 }
371
372 #[test]
373 fn two_values() {
374 let values = vec![1000i64, 2000];
375 let encoded = encode(&values);
376 let decoded = decode(&encoded).unwrap();
377 assert_eq!(decoded, values);
378 assert_eq!(encoded.len(), 20);
379 }
380
381 #[test]
382 fn constant_rate_timestamps() {
383 let values: Vec<i64> = (0..10_000)
384 .map(|i| 1_700_000_000_000 + i * 10_000)
385 .collect();
386 let encoded = encode(&values);
387 let decoded = decode(&encoded).unwrap();
388 assert_eq!(decoded, values);
389
390 let bits_per_sample = (encoded.len() as f64 * 8.0) / values.len() as f64;
391 assert!(
392 bits_per_sample < 2.0,
393 "constant-rate should compress to ~1 bit/sample, got {bits_per_sample:.1}"
394 );
395 }
396
397 #[test]
398 fn monotonic_with_jitter() {
399 let mut values = Vec::with_capacity(10_000);
400 let mut ts = 1_700_000_000_000i64;
401 let mut rng: u64 = 42;
402 for _ in 0..10_000 {
403 values.push(ts);
404 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
405 let jitter = ((rng >> 33) as i64 % 101) - 50;
406 ts += 10_000 + jitter;
407 }
408 let encoded = encode(&values);
409 let decoded = decode(&encoded).unwrap();
410 assert_eq!(decoded, values);
411
412 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
413 assert!(
414 bytes_per_sample < 2.0,
415 "jittered timestamps should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
416 );
417 }
418
419 #[test]
420 fn non_monotonic_values() {
421 let values: Vec<i64> = vec![100, 50, 200, 10, 300, 5, 1000, -500, 0, 42];
422 let encoded = encode(&values);
423 let decoded = decode(&encoded).unwrap();
424 assert_eq!(decoded, values);
425 }
426
427 #[test]
428 fn negative_values() {
429 let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
430 let encoded = encode(&values);
431 let decoded = decode(&encoded).unwrap();
432 assert_eq!(decoded, values);
433 }
434
435 #[test]
436 fn large_deltas() {
437 let values: Vec<i64> = vec![0, i64::MAX / 2, i64::MIN / 2, i64::MAX / 4, 0];
438 let encoded = encode(&values);
439 let decoded = decode(&encoded).unwrap();
440 assert_eq!(decoded, values);
441 }
442
443 #[test]
444 fn boundary_values() {
445 let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX, 0, i64::MIN];
446 let encoded = encode(&values);
447 let decoded = decode(&encoded).unwrap();
448 assert_eq!(decoded, values);
449 }
450
451 #[test]
452 fn compression_better_than_raw_for_constant_rate() {
453 let values: Vec<i64> = (0..100_000)
454 .map(|i| 1_700_000_000_000 + i * 10_000)
455 .collect();
456 let encoded = encode(&values);
457 let raw_size = values.len() * 8;
458 let ratio = raw_size as f64 / encoded.len() as f64;
459 assert!(
460 ratio > 5.0,
461 "expected >5x compression for constant-rate, got {ratio:.1}x"
462 );
463 }
464
465 #[test]
466 fn streaming_encoder_matches_batch() {
467 let values: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 100).collect();
468 let batch_encoded = encode(&values);
469
470 let mut enc = DoubleDeltaEncoder::new();
471 for &v in &values {
472 enc.push(v);
473 }
474 let stream_encoded = enc.finish();
475
476 assert_eq!(batch_encoded, stream_encoded);
477 }
478
479 #[test]
480 fn streaming_decoder() {
481 let values: Vec<i64> = (0..100).map(|i| 5000 + i * 10).collect();
482 let encoded = encode(&values);
483 let mut dec = DoubleDeltaDecoder::new(&encoded).unwrap();
484
485 for &expected in &values {
486 assert_eq!(dec.next_value(), Some(expected));
487 }
488 assert_eq!(dec.next_value(), None);
489 assert_eq!(dec.remaining(), 0);
490 }
491
492 #[test]
493 fn truncated_input_errors() {
494 assert!(decode(&[]).is_err());
495 assert!(decode(&[1, 0, 0, 0]).is_err());
496 assert!(decode(&[2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).is_err());
497 }
498}