1use crate::double_delta::{BitReader, BitWriter};
21use crate::error::CodecError;
22
23#[derive(Debug)]
32pub struct GorillaEncoder {
33 buf: BitWriter,
34 prev_ts: i64,
35 prev_delta: i64,
36 prev_value: u64,
37 prev_leading: u8,
38 prev_trailing: u8,
39 count: u64,
40}
41
42impl GorillaEncoder {
43 pub fn new() -> Self {
44 Self {
45 buf: BitWriter::new(),
46 prev_ts: 0,
47 prev_delta: 0,
48 prev_value: 0,
49 prev_leading: u8::MAX,
50 prev_trailing: 0,
51 count: 0,
52 }
53 }
54
55 pub fn encode(&mut self, timestamp_ms: i64, value: f64) {
57 let value_bits = value.to_bits();
58
59 if self.count == 0 {
60 self.buf.write_bits(timestamp_ms as u64, 64);
61 self.buf.write_bits(value_bits, 64);
62 self.prev_ts = timestamp_ms;
63 self.prev_value = value_bits;
64 self.count = 1;
65 return;
66 }
67
68 let delta = timestamp_ms - self.prev_ts;
70 let dod = delta - self.prev_delta;
71 self.encode_timestamp_dod(dod);
72 self.prev_ts = timestamp_ms;
73 self.prev_delta = delta;
74
75 let xor = self.prev_value ^ value_bits;
77 self.encode_value_xor(xor);
78 self.prev_value = value_bits;
79
80 self.count += 1;
81 }
82
83 fn encode_timestamp_dod(&mut self, dod: i64) {
84 if dod == 0 {
85 self.buf.write_bit(false);
86 } else if (-64..=63).contains(&dod) {
87 self.buf.write_bits(0b10, 2);
88 self.buf.write_bits((dod as u64) & 0x7F, 7);
89 } else if (-256..=255).contains(&dod) {
90 self.buf.write_bits(0b110, 3);
91 self.buf.write_bits((dod as u64) & 0x1FF, 9);
92 } else if (-2048..=2047).contains(&dod) {
93 self.buf.write_bits(0b1110, 4);
94 self.buf.write_bits((dod as u64) & 0xFFF, 12);
95 } else {
96 self.buf.write_bits(0b1111, 4);
97 self.buf.write_bits(dod as u64, 64);
98 }
99 }
100
101 fn encode_value_xor(&mut self, xor: u64) {
102 if xor == 0 {
103 self.buf.write_bit(false);
104 return;
105 }
106
107 self.buf.write_bit(true);
108
109 let leading = xor.leading_zeros() as u8;
110 let trailing = xor.trailing_zeros() as u8;
111
112 if self.prev_leading != u8::MAX
113 && leading >= self.prev_leading
114 && trailing >= self.prev_trailing
115 {
116 self.buf.write_bit(false);
118 let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
119 self.buf
120 .write_bits(xor >> self.prev_trailing, meaningful_bits as usize);
121 } else {
122 self.buf.write_bit(true);
124 self.buf.write_bits(leading as u64, 6);
125 let meaningful_bits = 64 - leading - trailing;
126 self.buf.write_bits((meaningful_bits - 1) as u64, 6);
127 self.buf
128 .write_bits(xor >> trailing, meaningful_bits as usize);
129 self.prev_leading = leading;
130 self.prev_trailing = trailing;
131 }
132 }
133
134 pub fn finish(self) -> Vec<u8> {
138 let count_bytes = (self.count as u32).to_le_bytes();
139 let bitstream = self.buf.as_bytes();
140 let mut out = Vec::with_capacity(4 + bitstream.len());
141 out.extend_from_slice(&count_bytes);
142 out.extend_from_slice(bitstream);
143 out
144 }
145
146 pub fn count(&self) -> u64 {
147 self.count
148 }
149
150 pub fn compressed_size(&self) -> usize {
151 self.buf.bit_len().div_ceil(8)
152 }
153}
154
155impl Default for GorillaEncoder {
156 fn default() -> Self {
157 Self::new()
158 }
159}
160
161pub struct GorillaDecoder<'a> {
167 reader: BitReader<'a>,
168 prev_ts: i64,
169 prev_delta: i64,
170 prev_value: u64,
171 prev_leading: u8,
172 prev_trailing: u8,
173 count: u64,
174 total: u64,
175 first: bool,
176}
177
178impl<'a> GorillaDecoder<'a> {
179 pub fn new(buf: &'a [u8]) -> Self {
183 if buf.len() < 4 {
184 return Self {
185 reader: BitReader::new(&[]),
186 prev_ts: 0,
187 prev_delta: 0,
188 prev_value: 0,
189 prev_leading: 0,
190 prev_trailing: 0,
191 count: 0,
192 total: 0,
193 first: true,
194 };
195 }
196 let total = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as u64;
197 Self {
198 reader: BitReader::new(&buf[4..]),
199 prev_ts: 0,
200 prev_delta: 0,
201 prev_value: 0,
202 prev_leading: 0,
203 prev_trailing: 0,
204 count: 0,
205 total,
206 first: true,
207 }
208 }
209
210 pub fn next_sample(&mut self) -> Option<(i64, f64)> {
212 if self.count >= self.total {
213 return None;
214 }
215
216 if self.first {
217 self.first = false;
218 let ts = self.reader.read_bits(64).ok()? as i64;
219 let val = self.reader.read_bits(64).ok()?;
220 self.prev_ts = ts;
221 self.prev_value = val;
222 self.count = 1;
223 return Some((ts, f64::from_bits(val)));
224 }
225
226 let ts = self.decode_timestamp().ok()?;
227 let val = self.decode_value().ok()?;
228 self.count += 1;
229 Some((ts, f64::from_bits(val)))
230 }
231
232 fn decode_timestamp(&mut self) -> Result<i64, CodecError> {
233 let bit = self.reader.read_bit()?;
234 let dod = if !bit {
235 0i64
236 } else {
237 let bit2 = self.reader.read_bit()?;
238 if !bit2 {
239 let raw = self.reader.read_bits(7)? as i64;
240 sign_extend(raw, 7)
241 } else {
242 let bit3 = self.reader.read_bit()?;
243 if !bit3 {
244 let raw = self.reader.read_bits(9)? as i64;
245 sign_extend(raw, 9)
246 } else {
247 let bit4 = self.reader.read_bit()?;
248 if !bit4 {
249 let raw = self.reader.read_bits(12)? as i64;
250 sign_extend(raw, 12)
251 } else {
252 self.reader.read_bits(64)? as i64
253 }
254 }
255 }
256 };
257
258 let delta = self.prev_delta + dod;
259 let ts = self.prev_ts + delta;
260 self.prev_ts = ts;
261 self.prev_delta = delta;
262 Ok(ts)
263 }
264
265 fn decode_value(&mut self) -> Result<u64, CodecError> {
266 let bit = self.reader.read_bit()?;
267 if !bit {
268 return Ok(self.prev_value);
269 }
270
271 let bit2 = self.reader.read_bit()?;
272 let xor = if !bit2 {
273 let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
274 let bits = self.reader.read_bits(meaningful_bits as usize)?;
275 bits << self.prev_trailing
276 } else {
277 let leading = self.reader.read_bits(6)? as u8;
278 let meaningful_bits = self.reader.read_bits(6)? as u8 + 1;
279 let trailing = 64 - leading - meaningful_bits;
280 let bits = self.reader.read_bits(meaningful_bits as usize)?;
281 self.prev_leading = leading;
282 self.prev_trailing = trailing;
283 bits << trailing
284 };
285
286 let val = self.prev_value ^ xor;
287 self.prev_value = val;
288 Ok(val)
289 }
290
291 pub fn decode_all(&mut self) -> Vec<(i64, f64)> {
293 let mut samples = Vec::new();
294 while let Some(s) = self.next_sample() {
295 samples.push(s);
296 }
297 samples
298 }
299}
300
301fn sign_extend(value: i64, bits: u32) -> i64 {
302 let shift = 64 - bits;
303 (value << shift) >> shift
304}
305
306pub fn encode_f64(values: &[f64]) -> Vec<u8> {
315 let mut enc = GorillaEncoder::new();
316 for (i, &v) in values.iter().enumerate() {
317 enc.encode(i as i64, v);
318 }
319 enc.finish()
320}
321
322pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
324 let mut dec = GorillaDecoder::new(data);
325 let samples = dec.decode_all();
326 if samples.len() != dec.total as usize {
327 return Err(CodecError::Truncated {
328 expected: dec.total as usize,
329 actual: samples.len(),
330 });
331 }
332 Ok(samples.into_iter().map(|(_, v)| v).collect())
333}
334
335pub fn encode_timestamps(timestamps: &[i64]) -> Vec<u8> {
340 let mut enc = GorillaEncoder::new();
341 for &ts in timestamps {
342 enc.encode(ts, 0.0);
343 }
344 enc.finish()
345}
346
347pub fn decode_timestamps(data: &[u8]) -> Result<Vec<i64>, CodecError> {
349 let mut dec = GorillaDecoder::new(data);
350 let samples = dec.decode_all();
351 if samples.len() != dec.total as usize {
352 return Err(CodecError::Truncated {
353 expected: dec.total as usize,
354 actual: samples.len(),
355 });
356 }
357 Ok(samples.into_iter().map(|(ts, _)| ts).collect())
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 #[test]
365 fn empty_encoder() {
366 let enc = GorillaEncoder::new();
367 assert_eq!(enc.count(), 0);
368 let data = enc.finish();
369 assert_eq!(data.len(), 4);
370 assert_eq!(u32::from_le_bytes(data[0..4].try_into().unwrap()), 0);
371 }
372
373 #[test]
374 fn single_sample_roundtrip() {
375 let mut enc = GorillaEncoder::new();
376 enc.encode(1000, 42.5);
377 let data = enc.finish();
378
379 let mut dec = GorillaDecoder::new(&data);
380 let (ts, val) = dec.next_sample().unwrap();
381 assert_eq!(ts, 1000);
382 assert!((val - 42.5).abs() < f64::EPSILON);
383 assert!(dec.next_sample().is_none());
384 }
385
386 #[test]
387 fn monotonic_timestamps_compress_well() {
388 let mut enc = GorillaEncoder::new();
389 for i in 0..1000 {
390 enc.encode(1_000_000 + i * 10_000, 100.0 + (i as f64) * 0.001);
391 }
392 let data = enc.finish();
393
394 assert!(
395 data.len() < 8000,
396 "expected good compression, got {} bytes for 1000 samples",
397 data.len()
398 );
399
400 let mut dec = GorillaDecoder::new(&data);
401 let samples = dec.decode_all();
402 assert_eq!(samples.len(), 1000);
403 assert_eq!(samples[0].0, 1_000_000);
404 }
405
406 #[test]
407 fn identical_values_compress_minimally() {
408 let mut enc = GorillaEncoder::new();
409 for i in 0..100 {
410 enc.encode(1000 + i * 1000, 42.0);
411 }
412 let data = enc.finish();
413
414 assert!(
415 data.len() < 100,
416 "identical values should compress well, got {} bytes",
417 data.len()
418 );
419
420 let mut dec = GorillaDecoder::new(&data);
421 let samples = dec.decode_all();
422 assert_eq!(samples.len(), 100);
423 for s in &samples {
424 assert!((s.1 - 42.0).abs() < f64::EPSILON);
425 }
426 }
427
428 #[test]
429 fn f64_batch_roundtrip() {
430 let values: Vec<f64> = (0..500).map(|i| 42.0 + i as f64 * 0.1).collect();
431 let encoded = encode_f64(&values);
432 let decoded = decode_f64(&encoded).unwrap();
433 assert_eq!(values.len(), decoded.len());
434 for (a, b) in values.iter().zip(decoded.iter()) {
435 assert_eq!(a.to_bits(), b.to_bits());
436 }
437 }
438
439 #[test]
440 fn timestamp_batch_roundtrip() {
441 let timestamps: Vec<i64> = (0..1000).map(|i| 1_700_000_000_000 + i * 10_000).collect();
442 let encoded = encode_timestamps(×tamps);
443 let decoded = decode_timestamps(&encoded).unwrap();
444 assert_eq!(timestamps, decoded);
445 }
446
447 #[test]
448 fn varying_values_roundtrip() {
449 let mut enc = GorillaEncoder::new();
450 let test_values = [
451 0.0,
452 1.0,
453 -1.0,
454 f64::MAX,
455 f64::MIN,
456 std::f64::consts::PI,
457 1e-300,
458 1e300,
459 ];
460 for (i, &val) in test_values.iter().enumerate() {
461 enc.encode(i as i64 * 1000, val);
462 }
463 let data = enc.finish();
464
465 let mut dec = GorillaDecoder::new(&data);
466 let samples = dec.decode_all();
467 assert_eq!(samples.len(), test_values.len());
468 for (i, &expected) in test_values.iter().enumerate() {
469 assert_eq!(samples[i].1.to_bits(), expected.to_bits());
470 }
471 }
472
473 #[test]
474 fn compression_ratio() {
475 let mut enc = GorillaEncoder::new();
476 let mut rng_state: u64 = 12345;
477 for i in 0..10_000 {
478 rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
479 let jitter = ((rng_state >> 33) as f64) / (u32::MAX as f64) * 2.0 - 1.0;
480 let value = 50.0 + jitter * 5.0;
481 enc.encode(1_700_000_000_000 + i * 10_000, value);
482 }
483 let data = enc.finish();
484
485 let raw_size = 10_000 * 16;
486 let ratio = raw_size as f64 / data.len() as f64;
487 assert!(
488 ratio > 2.0,
489 "compression ratio {ratio:.1}:1 too low (expected >2:1)"
490 );
491 }
492}