Skip to main content

nodedb_codec/
pcodec.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Pcodec wrapper for complex numerical sequences.
4//!
5//! For data where ALP's decimal-to-integer trick doesn't apply (scientific
6//! floats, irregular numerical sequences, CRDT operation counters), Pcodec
7//! builds a probabilistic model of the data distribution, separates
8//! high-order structure from low-order noise, and compresses each
9//! independently.
10//!
11//! Compression: 30-100% better ratio than Zstd on numerical data.
12//! Decode: 1-4 GB/s.
13//!
14//! Wire format: Pcodec's native format with a 5-byte NodeDB header:
15//! ```text
16//! [1 byte]  type tag (0=f64, 1=i64)
17//! [4 bytes] value count (LE u32)
18//! [N bytes] pco compressed data
19//! ```
20
21use crate::error::CodecError;
22
23/// Type tag for f64 data.
24const TAG_F64: u8 = 0;
25/// Type tag for i64 data.
26const TAG_I64: u8 = 1;
27
28// ---------------------------------------------------------------------------
29// f64 encode / decode
30// ---------------------------------------------------------------------------
31
32/// Compress f64 values using Pcodec.
33pub fn encode_f64(values: &[f64]) -> Result<Vec<u8>, CodecError> {
34    let count = values.len() as u32;
35    let compressed = pco::standalone::simple_compress(values, &pco::ChunkConfig::default())
36        .map_err(|e| CodecError::CompressFailed {
37            detail: format!("pcodec f64: {e}"),
38        })?;
39
40    let mut out = Vec::with_capacity(5 + compressed.len());
41    out.push(TAG_F64);
42    out.extend_from_slice(&count.to_le_bytes());
43    out.extend_from_slice(&compressed);
44    Ok(out)
45}
46
47/// Decompress Pcodec f64 data.
48pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
49    if data.len() < 5 {
50        return Err(CodecError::Truncated {
51            expected: 5,
52            actual: data.len(),
53        });
54    }
55
56    let tag = data[0];
57    if tag != TAG_F64 {
58        return Err(CodecError::Corrupt {
59            detail: format!("pcodec expected f64 tag (0), got {tag}"),
60        });
61    }
62
63    let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
64    if count == 0 {
65        return Ok(Vec::new());
66    }
67
68    let values: Vec<f64> = pco::standalone::simple_decompress(&data[5..]).map_err(|e| {
69        CodecError::DecompressFailed {
70            detail: format!("pcodec f64: {e}"),
71        }
72    })?;
73
74    if values.len() != count {
75        return Err(CodecError::Corrupt {
76            detail: format!(
77                "pcodec f64 count mismatch: header says {count}, got {}",
78                values.len()
79            ),
80        });
81    }
82
83    Ok(values)
84}
85
86// ---------------------------------------------------------------------------
87// i64 encode / decode
88// ---------------------------------------------------------------------------
89
90/// Compress i64 values using Pcodec.
91pub fn encode_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
92    let count = values.len() as u32;
93    let compressed = pco::standalone::simple_compress(values, &pco::ChunkConfig::default())
94        .map_err(|e| CodecError::CompressFailed {
95            detail: format!("pcodec i64: {e}"),
96        })?;
97
98    let mut out = Vec::with_capacity(5 + compressed.len());
99    out.push(TAG_I64);
100    out.extend_from_slice(&count.to_le_bytes());
101    out.extend_from_slice(&compressed);
102    Ok(out)
103}
104
105/// Decompress Pcodec i64 data.
106pub fn decode_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
107    if data.len() < 5 {
108        return Err(CodecError::Truncated {
109            expected: 5,
110            actual: data.len(),
111        });
112    }
113
114    let tag = data[0];
115    if tag != TAG_I64 {
116        return Err(CodecError::Corrupt {
117            detail: format!("pcodec expected i64 tag (1), got {tag}"),
118        });
119    }
120
121    let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
122    if count == 0 {
123        return Ok(Vec::new());
124    }
125
126    let values: Vec<i64> = pco::standalone::simple_decompress(&data[5..]).map_err(|e| {
127        CodecError::DecompressFailed {
128            detail: format!("pcodec i64: {e}"),
129        }
130    })?;
131
132    if values.len() != count {
133        return Err(CodecError::Corrupt {
134            detail: format!(
135                "pcodec i64 count mismatch: header says {count}, got {}",
136                values.len()
137            ),
138        });
139    }
140
141    Ok(values)
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn f64_empty() {
150        let encoded = encode_f64(&[]).unwrap();
151        let decoded = decode_f64(&encoded).unwrap();
152        assert!(decoded.is_empty());
153    }
154
155    #[test]
156    fn f64_roundtrip() {
157        let values: Vec<f64> = (0..1000).map(|i| std::f64::consts::PI * i as f64).collect();
158        let encoded = encode_f64(&values).unwrap();
159        let decoded = decode_f64(&encoded).unwrap();
160        assert_eq!(decoded.len(), values.len());
161        for (a, b) in values.iter().zip(decoded.iter()) {
162            assert_eq!(a.to_bits(), b.to_bits(), "mismatch");
163        }
164    }
165
166    #[test]
167    fn f64_compression_ratio() {
168        // Pcodec should compress numerical data better than raw.
169        let mut values = Vec::with_capacity(10_000);
170        let mut rng: u64 = 42;
171        for _ in 0..10_000 {
172            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
173            values.push(((rng >> 33) as f64 / (u32::MAX as f64)) * 1000.0);
174        }
175        let encoded = encode_f64(&values).unwrap();
176        let raw_size = values.len() * 8;
177        let ratio = raw_size as f64 / encoded.len() as f64;
178        assert!(
179            ratio > 1.1,
180            "pcodec should compress random-ish floats >1.1x, got {ratio:.2}x"
181        );
182    }
183
184    #[test]
185    fn i64_empty() {
186        let encoded = encode_i64(&[]).unwrap();
187        let decoded = decode_i64(&encoded).unwrap();
188        assert!(decoded.is_empty());
189    }
190
191    #[test]
192    fn i64_roundtrip() {
193        let values: Vec<i64> = (0..1000).map(|i| i * i * 7 - 500).collect();
194        let encoded = encode_i64(&values).unwrap();
195        let decoded = decode_i64(&encoded).unwrap();
196        assert_eq!(decoded, values);
197    }
198
199    #[test]
200    fn i64_compression_ratio() {
201        let values: Vec<i64> = (0..10_000)
202            .map(|i| 1_700_000_000_000 + i * 10_000)
203            .collect();
204        let encoded = encode_i64(&values).unwrap();
205        let raw_size = values.len() * 8;
206        let ratio = raw_size as f64 / encoded.len() as f64;
207        assert!(
208            ratio > 2.0,
209            "pcodec should compress monotonic i64 >2x, got {ratio:.2}x"
210        );
211    }
212
213    #[test]
214    fn f64_special_values() {
215        let values = vec![0.0, -0.0, f64::INFINITY, f64::NEG_INFINITY, 1.0, -1.0];
216        let encoded = encode_f64(&values).unwrap();
217        let decoded = decode_f64(&encoded).unwrap();
218        for (a, b) in values.iter().zip(decoded.iter()) {
219            assert_eq!(a.to_bits(), b.to_bits());
220        }
221    }
222
223    #[test]
224    fn truncated_errors() {
225        assert!(decode_f64(&[]).is_err());
226        assert!(decode_i64(&[]).is_err());
227        assert!(decode_f64(&[0, 1, 0, 0, 0]).is_err()); // count=1, no data
228    }
229}