Skip to main content

nodedb_codec/
pcodec.rs

1//! Pcodec wrapper for complex numerical sequences.
2//!
3//! For data where ALP's decimal-to-integer trick doesn't apply (scientific
4//! floats, irregular numerical sequences, CRDT operation counters), Pcodec
5//! builds a probabilistic model of the data distribution, separates
6//! high-order structure from low-order noise, and compresses each
7//! independently.
8//!
9//! Compression: 30-100% better ratio than Zstd on numerical data.
10//! Decode: 1-4 GB/s.
11//!
12//! Wire format: Pcodec's native format with a 5-byte NodeDB header:
13//! ```text
14//! [1 byte]  type tag (0=f64, 1=i64)
15//! [4 bytes] value count (LE u32)
16//! [N bytes] pco compressed data
17//! ```
18
19use crate::error::CodecError;
20
21/// Type tag for f64 data.
22const TAG_F64: u8 = 0;
23/// Type tag for i64 data.
24const TAG_I64: u8 = 1;
25
26// ---------------------------------------------------------------------------
27// f64 encode / decode
28// ---------------------------------------------------------------------------
29
30/// Compress f64 values using Pcodec.
31pub fn encode_f64(values: &[f64]) -> Result<Vec<u8>, CodecError> {
32    let count = values.len() as u32;
33    let compressed = pco::standalone::simple_compress(values, &pco::ChunkConfig::default())
34        .map_err(|e| CodecError::CompressFailed {
35            detail: format!("pcodec f64: {e}"),
36        })?;
37
38    let mut out = Vec::with_capacity(5 + compressed.len());
39    out.push(TAG_F64);
40    out.extend_from_slice(&count.to_le_bytes());
41    out.extend_from_slice(&compressed);
42    Ok(out)
43}
44
45/// Decompress Pcodec f64 data.
46pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
47    if data.len() < 5 {
48        return Err(CodecError::Truncated {
49            expected: 5,
50            actual: data.len(),
51        });
52    }
53
54    let tag = data[0];
55    if tag != TAG_F64 {
56        return Err(CodecError::Corrupt {
57            detail: format!("pcodec expected f64 tag (0), got {tag}"),
58        });
59    }
60
61    let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
62    if count == 0 {
63        return Ok(Vec::new());
64    }
65
66    let values: Vec<f64> = pco::standalone::simple_decompress(&data[5..]).map_err(|e| {
67        CodecError::DecompressFailed {
68            detail: format!("pcodec f64: {e}"),
69        }
70    })?;
71
72    if values.len() != count {
73        return Err(CodecError::Corrupt {
74            detail: format!(
75                "pcodec f64 count mismatch: header says {count}, got {}",
76                values.len()
77            ),
78        });
79    }
80
81    Ok(values)
82}
83
84// ---------------------------------------------------------------------------
85// i64 encode / decode
86// ---------------------------------------------------------------------------
87
88/// Compress i64 values using Pcodec.
89pub fn encode_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
90    let count = values.len() as u32;
91    let compressed = pco::standalone::simple_compress(values, &pco::ChunkConfig::default())
92        .map_err(|e| CodecError::CompressFailed {
93            detail: format!("pcodec i64: {e}"),
94        })?;
95
96    let mut out = Vec::with_capacity(5 + compressed.len());
97    out.push(TAG_I64);
98    out.extend_from_slice(&count.to_le_bytes());
99    out.extend_from_slice(&compressed);
100    Ok(out)
101}
102
103/// Decompress Pcodec i64 data.
104pub fn decode_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
105    if data.len() < 5 {
106        return Err(CodecError::Truncated {
107            expected: 5,
108            actual: data.len(),
109        });
110    }
111
112    let tag = data[0];
113    if tag != TAG_I64 {
114        return Err(CodecError::Corrupt {
115            detail: format!("pcodec expected i64 tag (1), got {tag}"),
116        });
117    }
118
119    let count = u32::from_le_bytes([data[1], data[2], data[3], data[4]]) as usize;
120    if count == 0 {
121        return Ok(Vec::new());
122    }
123
124    let values: Vec<i64> = pco::standalone::simple_decompress(&data[5..]).map_err(|e| {
125        CodecError::DecompressFailed {
126            detail: format!("pcodec i64: {e}"),
127        }
128    })?;
129
130    if values.len() != count {
131        return Err(CodecError::Corrupt {
132            detail: format!(
133                "pcodec i64 count mismatch: header says {count}, got {}",
134                values.len()
135            ),
136        });
137    }
138
139    Ok(values)
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn f64_empty() {
148        let encoded = encode_f64(&[]).unwrap();
149        let decoded = decode_f64(&encoded).unwrap();
150        assert!(decoded.is_empty());
151    }
152
153    #[test]
154    fn f64_roundtrip() {
155        let values: Vec<f64> = (0..1000).map(|i| std::f64::consts::PI * i as f64).collect();
156        let encoded = encode_f64(&values).unwrap();
157        let decoded = decode_f64(&encoded).unwrap();
158        assert_eq!(decoded.len(), values.len());
159        for (a, b) in values.iter().zip(decoded.iter()) {
160            assert_eq!(a.to_bits(), b.to_bits(), "mismatch");
161        }
162    }
163
164    #[test]
165    fn f64_compression_ratio() {
166        // Pcodec should compress numerical data better than raw.
167        let mut values = Vec::with_capacity(10_000);
168        let mut rng: u64 = 42;
169        for _ in 0..10_000 {
170            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
171            values.push(((rng >> 33) as f64 / (u32::MAX as f64)) * 1000.0);
172        }
173        let encoded = encode_f64(&values).unwrap();
174        let raw_size = values.len() * 8;
175        let ratio = raw_size as f64 / encoded.len() as f64;
176        assert!(
177            ratio > 1.1,
178            "pcodec should compress random-ish floats >1.1x, got {ratio:.2}x"
179        );
180    }
181
182    #[test]
183    fn i64_empty() {
184        let encoded = encode_i64(&[]).unwrap();
185        let decoded = decode_i64(&encoded).unwrap();
186        assert!(decoded.is_empty());
187    }
188
189    #[test]
190    fn i64_roundtrip() {
191        let values: Vec<i64> = (0..1000).map(|i| i * i * 7 - 500).collect();
192        let encoded = encode_i64(&values).unwrap();
193        let decoded = decode_i64(&encoded).unwrap();
194        assert_eq!(decoded, values);
195    }
196
197    #[test]
198    fn i64_compression_ratio() {
199        let values: Vec<i64> = (0..10_000)
200            .map(|i| 1_700_000_000_000 + i * 10_000)
201            .collect();
202        let encoded = encode_i64(&values).unwrap();
203        let raw_size = values.len() * 8;
204        let ratio = raw_size as f64 / encoded.len() as f64;
205        assert!(
206            ratio > 2.0,
207            "pcodec should compress monotonic i64 >2x, got {ratio:.2}x"
208        );
209    }
210
211    #[test]
212    fn f64_special_values() {
213        let values = vec![0.0, -0.0, f64::INFINITY, f64::NEG_INFINITY, 1.0, -1.0];
214        let encoded = encode_f64(&values).unwrap();
215        let decoded = decode_f64(&encoded).unwrap();
216        for (a, b) in values.iter().zip(decoded.iter()) {
217            assert_eq!(a.to_bits(), b.to_bits());
218        }
219    }
220
221    #[test]
222    fn truncated_errors() {
223        assert!(decode_f64(&[]).is_err());
224        assert!(decode_i64(&[]).is_err());
225        assert!(decode_f64(&[0, 1, 0, 0, 0]).is_err()); // count=1, no data
226    }
227}