Skip to main content

nodedb_codec/
pipeline.rs

1//! Cascading codec pipeline: chains type-aware encoding → terminal compressor.
2//!
3//! Each `ColumnCodec` variant maps to a fixed pipeline. The `encode_pipeline()`
4//! and `decode_pipeline()` functions dispatch to the appropriate chain.
5//!
6//! Cascading chains:
7//! - `AlpFastLanesLz4`:    f64 → ALP → FastLanes bit-pack → lz4
8//! - `DeltaFastLanesLz4`:  i64 → Delta → FastLanes bit-pack → lz4
9//! - `FastLanesLz4`:       i64 → FastLanes bit-pack → lz4
10//!
11//! The pipeline writes a 1-byte codec ID header so the decoder knows which
12//! chain to reverse. This header is read by `decode_pipeline()`.
13
14use crate::ColumnCodec;
15use crate::error::CodecError;
16
17// ---------------------------------------------------------------------------
18// Pipeline encode
19// ---------------------------------------------------------------------------
20
21/// Encode i64 values through a cascading pipeline.
22///
23/// For cascading codecs, chains the appropriate stages. For legacy codecs,
24/// delegates to the single-step encoder.
25pub fn encode_i64_pipeline(values: &[i64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
26    match codec {
27        // Cascading chains — lz4 terminal.
28        ColumnCodec::DeltaFastLanesLz4 => encode_delta_fastlanes_lz4(values),
29        ColumnCodec::FastLanesLz4 => encode_fastlanes_lz4_i64(values),
30        ColumnCodec::PcodecLz4 => {
31            let pco_bytes = crate::pcodec::encode_i64(values)?;
32            Ok(crate::lz4::encode(&pco_bytes))
33        }
34        // Cascading chains — rANS terminal (cold tier).
35        ColumnCodec::DeltaFastLanesRans => encode_delta_fastlanes_rans(values),
36        // Single-step codecs (small partitions / non-ALP-encodable data).
37        ColumnCodec::DoubleDelta => Ok(crate::double_delta::encode(values)),
38        ColumnCodec::Delta => Ok(crate::delta::encode(values)),
39        ColumnCodec::Gorilla => Ok(crate::gorilla::encode_timestamps(values)),
40        ColumnCodec::Raw => {
41            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
42            Ok(crate::raw::encode(&raw))
43        }
44        ColumnCodec::Lz4 => {
45            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
46            Ok(crate::lz4::encode(&raw))
47        }
48        ColumnCodec::Zstd => {
49            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
50            crate::zstd_codec::encode(&raw)
51        }
52        _ => Ok(crate::delta::encode(values)),
53    }
54}
55
56/// Encode f64 values through a cascading pipeline.
57pub fn encode_f64_pipeline(values: &[f64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
58    match codec {
59        // Cascading chains — lz4 terminal.
60        ColumnCodec::AlpFastLanesLz4 => encode_alp_fastlanes_lz4(values),
61        ColumnCodec::AlpRdLz4 => {
62            let alp_rd_bytes = crate::alp_rd::encode(values)?;
63            Ok(crate::lz4::encode(&alp_rd_bytes))
64        }
65        ColumnCodec::PcodecLz4 => {
66            let pco_bytes = crate::pcodec::encode_f64(values)?;
67            Ok(crate::lz4::encode(&pco_bytes))
68        }
69        // Cascading chains — rANS terminal (cold tier).
70        ColumnCodec::AlpFastLanesRans => encode_alp_fastlanes_rans(values),
71        // Single-step codecs (small partitions / non-ALP-encodable data).
72        ColumnCodec::Gorilla => Ok(crate::gorilla::encode_f64(values)),
73        ColumnCodec::Raw => {
74            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
75            Ok(crate::raw::encode(&raw))
76        }
77        ColumnCodec::Lz4 => {
78            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
79            Ok(crate::lz4::encode(&raw))
80        }
81        ColumnCodec::Zstd => {
82            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
83            crate::zstd_codec::encode(&raw)
84        }
85        _ => Ok(crate::gorilla::encode_f64(values)),
86    }
87}
88
89/// Encode raw bytes (symbol columns or string data) through a pipeline.
90pub fn encode_bytes_pipeline(raw: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
91    match codec {
92        ColumnCodec::FsstLz4 => {
93            // FSST expects an array of strings. Treat raw as a single blob.
94            let fsst_bytes = crate::fsst::encode(&[raw]);
95            Ok(crate::lz4::encode(&fsst_bytes))
96        }
97        ColumnCodec::FsstRans => {
98            let fsst_bytes = crate::fsst::encode(&[raw]);
99            Ok(crate::rans::encode(&fsst_bytes))
100        }
101        ColumnCodec::Raw => Ok(crate::raw::encode(raw)),
102        ColumnCodec::Lz4 => Ok(crate::lz4::encode(raw)),
103        ColumnCodec::Zstd => crate::zstd_codec::encode(raw),
104        ColumnCodec::FastLanesLz4 => {
105            // Symbol IDs are u32 — convert to i64, FastLanes pack, lz4.
106            if raw.len().is_multiple_of(4) {
107                let i64_vals: Vec<i64> = raw
108                    .chunks_exact(4)
109                    .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as i64)
110                    .collect();
111                encode_fastlanes_lz4_i64(&i64_vals)
112            } else {
113                Ok(crate::raw::encode(raw))
114            }
115        }
116        _ => Ok(crate::raw::encode(raw)),
117    }
118}
119
120// ---------------------------------------------------------------------------
121// Pipeline decode
122// ---------------------------------------------------------------------------
123
124/// Decode i64 values from a cascading pipeline.
125pub fn decode_i64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<i64>, CodecError> {
126    match codec {
127        // Cascading — lz4 terminal.
128        ColumnCodec::DeltaFastLanesLz4 => decode_delta_fastlanes_lz4(data),
129        ColumnCodec::FastLanesLz4 => decode_fastlanes_lz4_i64(data),
130        ColumnCodec::PcodecLz4 => {
131            let pco_bytes = crate::lz4::decode(data)?;
132            crate::pcodec::decode_i64(&pco_bytes)
133        }
134        // Cascading — rANS terminal.
135        ColumnCodec::DeltaFastLanesRans => decode_delta_fastlanes_rans(data),
136        // Single-step codecs (small partitions / non-ALP-encodable data).
137        ColumnCodec::DoubleDelta => crate::double_delta::decode(data),
138        ColumnCodec::Delta => crate::delta::decode(data),
139        ColumnCodec::Gorilla => crate::gorilla::decode_timestamps(data),
140        ColumnCodec::Raw => {
141            let raw = crate::raw::decode(data)?;
142            raw_to_i64(&raw)
143        }
144        ColumnCodec::Lz4 => {
145            let raw = crate::lz4::decode(data)?;
146            raw_to_i64(&raw)
147        }
148        ColumnCodec::Zstd => {
149            let raw = crate::zstd_codec::decode(data)?;
150            raw_to_i64(&raw)
151        }
152        _ => crate::delta::decode(data),
153    }
154}
155
156/// Decode f64 values from a cascading pipeline.
157pub fn decode_f64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<f64>, CodecError> {
158    match codec {
159        // Cascading — lz4 terminal.
160        ColumnCodec::AlpFastLanesLz4 => decode_alp_fastlanes_lz4(data),
161        ColumnCodec::AlpRdLz4 => {
162            let alp_rd_bytes = crate::lz4::decode(data)?;
163            crate::alp_rd::decode(&alp_rd_bytes)
164        }
165        ColumnCodec::PcodecLz4 => {
166            let pco_bytes = crate::lz4::decode(data)?;
167            crate::pcodec::decode_f64(&pco_bytes)
168        }
169        // Cascading — rANS terminal.
170        ColumnCodec::AlpFastLanesRans => decode_alp_fastlanes_rans(data),
171        // Single-step codecs (small partitions / non-ALP-encodable data).
172        ColumnCodec::Gorilla => crate::gorilla::decode_f64(data),
173        ColumnCodec::Raw => {
174            let raw = crate::raw::decode(data)?;
175            raw_to_f64(&raw)
176        }
177        ColumnCodec::Lz4 => {
178            let raw = crate::lz4::decode(data)?;
179            raw_to_f64(&raw)
180        }
181        ColumnCodec::Zstd => {
182            let raw = crate::zstd_codec::decode(data)?;
183            raw_to_f64(&raw)
184        }
185        _ => crate::gorilla::decode_f64(data),
186    }
187}
188
189/// Decode raw bytes (symbol columns or string data) from a pipeline.
190pub fn decode_bytes_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
191    match codec {
192        ColumnCodec::FsstLz4 => {
193            let fsst_bytes = crate::lz4::decode(data)?;
194            let strings = crate::fsst::decode(&fsst_bytes)?;
195            strings.into_iter().next().ok_or(CodecError::Corrupt {
196                detail: "FSST decode produced no output".into(),
197            })
198        }
199        ColumnCodec::FsstRans => {
200            let fsst_bytes = crate::rans::decode(data)?;
201            let strings = crate::fsst::decode(&fsst_bytes)?;
202            strings.into_iter().next().ok_or(CodecError::Corrupt {
203                detail: "FSST decode produced no output".into(),
204            })
205        }
206        ColumnCodec::Raw => crate::raw::decode(data),
207        ColumnCodec::Lz4 => crate::lz4::decode(data),
208        ColumnCodec::Zstd => crate::zstd_codec::decode(data),
209        ColumnCodec::FastLanesLz4 => {
210            let i64_vals = decode_fastlanes_lz4_i64(data)?;
211            Ok(i64_vals
212                .iter()
213                .flat_map(|&v| (v as u32).to_le_bytes())
214                .collect())
215        }
216        _ => crate::raw::decode(data).or_else(|_| Ok(data.to_vec())),
217    }
218}
219
220// ---------------------------------------------------------------------------
221// Cascading chain implementations
222// ---------------------------------------------------------------------------
223
224/// ALP → FastLanes → LZ4: f64 metrics (the big win).
225fn encode_alp_fastlanes_lz4(values: &[f64]) -> Result<Vec<u8>, CodecError> {
226    // Stage 1: ALP encodes f64 → FastLanes-packed i64 bytes.
227    let alp_encoded = crate::alp::encode(values);
228    // Stage 2: LZ4 terminal compression on the ALP output.
229    Ok(crate::lz4::encode(&alp_encoded))
230}
231
232fn decode_alp_fastlanes_lz4(data: &[u8]) -> Result<Vec<f64>, CodecError> {
233    // Reverse: LZ4 decompress → ALP decode (which internally FastLanes-decodes).
234    let alp_bytes = crate::lz4::decode(data)?;
235    crate::alp::decode(&alp_bytes)
236}
237
238/// Delta → FastLanes → LZ4: i64 timestamps and counters.
239fn encode_delta_fastlanes_lz4(values: &[i64]) -> Result<Vec<u8>, CodecError> {
240    if values.is_empty() {
241        return Ok(crate::lz4::encode(&crate::fastlanes::encode(&[])));
242    }
243
244    // Stage 1a: Compute deltas.
245    let mut deltas = Vec::with_capacity(values.len());
246    deltas.push(values[0]); // First value stored raw.
247    for i in 1..values.len() {
248        deltas.push(values[i].wrapping_sub(values[i - 1]));
249    }
250
251    // Stage 1b: FastLanes bit-pack the deltas.
252    let packed = crate::fastlanes::encode(&deltas);
253
254    // Stage 2: LZ4 terminal.
255    Ok(crate::lz4::encode(&packed))
256}
257
258fn decode_delta_fastlanes_lz4(data: &[u8]) -> Result<Vec<i64>, CodecError> {
259    // Reverse: LZ4 → FastLanes unpack → reconstruct from deltas.
260    let packed = crate::lz4::decode(data)?;
261    let deltas = crate::fastlanes::decode(&packed)?;
262
263    if deltas.is_empty() {
264        return Ok(Vec::new());
265    }
266
267    // Reconstruct values from deltas.
268    let mut values = Vec::with_capacity(deltas.len());
269    values.push(deltas[0]); // First value is raw.
270    for &d in &deltas[1..] {
271        let prev = values[values.len() - 1];
272        values.push(prev.wrapping_add(d));
273    }
274
275    Ok(values)
276}
277
278/// FastLanes → LZ4: raw integers (symbol IDs, non-delta columns).
279fn encode_fastlanes_lz4_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
280    let packed = crate::fastlanes::encode(values);
281    Ok(crate::lz4::encode(&packed))
282}
283
284fn decode_fastlanes_lz4_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
285    let packed = crate::lz4::decode(data)?;
286    crate::fastlanes::decode(&packed)
287}
288
289/// ALP → FastLanes → rANS: f64 metrics cold tier.
290fn encode_alp_fastlanes_rans(values: &[f64]) -> Result<Vec<u8>, CodecError> {
291    let alp_encoded = crate::alp::encode(values);
292    Ok(crate::rans::encode(&alp_encoded))
293}
294
295fn decode_alp_fastlanes_rans(data: &[u8]) -> Result<Vec<f64>, CodecError> {
296    let alp_bytes = crate::rans::decode(data)?;
297    crate::alp::decode(&alp_bytes)
298}
299
300/// Delta → FastLanes → rANS: i64 cold tier.
301fn encode_delta_fastlanes_rans(values: &[i64]) -> Result<Vec<u8>, CodecError> {
302    if values.is_empty() {
303        return Ok(crate::rans::encode(&crate::fastlanes::encode(&[])));
304    }
305
306    let mut deltas = Vec::with_capacity(values.len());
307    deltas.push(values[0]);
308    for i in 1..values.len() {
309        deltas.push(values[i].wrapping_sub(values[i - 1]));
310    }
311
312    let packed = crate::fastlanes::encode(&deltas);
313    Ok(crate::rans::encode(&packed))
314}
315
316fn decode_delta_fastlanes_rans(data: &[u8]) -> Result<Vec<i64>, CodecError> {
317    let packed = crate::rans::decode(data)?;
318    let deltas = crate::fastlanes::decode(&packed)?;
319
320    if deltas.is_empty() {
321        return Ok(Vec::new());
322    }
323
324    let mut values = Vec::with_capacity(deltas.len());
325    values.push(deltas[0]);
326    for &d in &deltas[1..] {
327        let prev = values[values.len() - 1];
328        values.push(prev.wrapping_add(d));
329    }
330
331    Ok(values)
332}
333
334// ---------------------------------------------------------------------------
335// Helpers
336// ---------------------------------------------------------------------------
337
338fn raw_to_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
339    if !data.len().is_multiple_of(8) {
340        return Err(CodecError::Corrupt {
341            detail: "i64 data not aligned to 8 bytes".into(),
342        });
343    }
344    Ok(data
345        .chunks_exact(8)
346        .map(|c| i64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
347        .collect())
348}
349
350fn raw_to_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
351    if !data.len().is_multiple_of(8) {
352        return Err(CodecError::Corrupt {
353            detail: "f64 data not aligned to 8 bytes".into(),
354        });
355    }
356    Ok(data
357        .chunks_exact(8)
358        .map(|c| f64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
359        .collect())
360}
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365
366    #[test]
367    fn alp_fastlanes_lz4_decimal_metrics() {
368        let values: Vec<f64> = (0..10_000).map(|i| i as f64 * 0.1).collect();
369        let encoded = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4).unwrap();
370        let decoded = decode_f64_pipeline(&encoded, ColumnCodec::AlpFastLanesLz4).unwrap();
371
372        for (i, (a, b)) in values.iter().zip(decoded.iter()).enumerate() {
373            assert_eq!(a.to_bits(), b.to_bits(), "mismatch at {i}");
374        }
375
376        let raw_size = values.len() * 8;
377        let ratio = raw_size as f64 / encoded.len() as f64;
378        assert!(
379            ratio > 3.0,
380            "ALP+FL+LZ4 should compress decimals >3x, got {ratio:.1}x"
381        );
382    }
383
384    #[test]
385    fn alp_beats_gorilla_on_decimals() {
386        let mut values = Vec::with_capacity(10_000);
387        let mut rng: u64 = 42;
388        for _ in 0..10_000 {
389            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
390            let cpu = ((rng >> 33) as f64 / (u32::MAX as f64)) * 100.0;
391            values.push((cpu * 10.0).round() / 10.0);
392        }
393
394        let alp_size = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4)
395            .unwrap()
396            .len();
397        let gorilla_size = encode_f64_pipeline(&values, ColumnCodec::Gorilla)
398            .unwrap()
399            .len();
400
401        assert!(
402            alp_size < gorilla_size,
403            "ALP ({alp_size}) should beat Gorilla ({gorilla_size}) on decimal metrics"
404        );
405    }
406
407    #[test]
408    fn delta_fastlanes_lz4_timestamps() {
409        let values: Vec<i64> = (0..10_000)
410            .map(|i| 1_700_000_000_000 + i * 10_000)
411            .collect();
412        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
413        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
414        assert_eq!(decoded, values);
415
416        let raw_size = values.len() * 8;
417        let ratio = raw_size as f64 / encoded.len() as f64;
418        assert!(
419            ratio > 5.0,
420            "Delta+FL+LZ4 should compress timestamps >5x, got {ratio:.1}x"
421        );
422    }
423
424    #[test]
425    fn delta_fastlanes_lz4_jittered_timestamps() {
426        let mut values = Vec::with_capacity(10_000);
427        let mut ts = 1_700_000_000_000i64;
428        let mut rng: u64 = 42;
429        for _ in 0..10_000 {
430            values.push(ts);
431            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
432            let jitter = ((rng >> 33) as i64 % 101) - 50;
433            ts += 10_000 + jitter;
434        }
435        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
436        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
437        assert_eq!(decoded, values);
438    }
439
440    #[test]
441    fn delta_fastlanes_lz4_counters() {
442        let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
443        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
444        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
445        assert_eq!(decoded, values);
446    }
447
448    #[test]
449    fn fastlanes_lz4_symbol_ids() {
450        let values: Vec<i64> = (0..5000).map(|i| i % 150).collect();
451        let encoded = encode_i64_pipeline(&values, ColumnCodec::FastLanesLz4).unwrap();
452        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::FastLanesLz4).unwrap();
453        assert_eq!(decoded, values);
454    }
455
456    #[test]
457    fn legacy_codecs_still_work() {
458        let i64_vals: Vec<i64> = (0..1000).collect();
459        for codec in [
460            ColumnCodec::DoubleDelta,
461            ColumnCodec::Delta,
462            ColumnCodec::Gorilla,
463        ] {
464            let encoded = encode_i64_pipeline(&i64_vals, codec).unwrap();
465            let decoded = decode_i64_pipeline(&encoded, codec).unwrap();
466            assert_eq!(decoded, i64_vals, "legacy i64 codec {codec} failed");
467        }
468
469        let f64_vals: Vec<f64> = (0..1000).map(|i| i as f64 * 0.5).collect();
470        let encoded = encode_f64_pipeline(&f64_vals, ColumnCodec::Gorilla).unwrap();
471        let decoded = decode_f64_pipeline(&encoded, ColumnCodec::Gorilla).unwrap();
472        for (a, b) in f64_vals.iter().zip(decoded.iter()) {
473            assert_eq!(a.to_bits(), b.to_bits());
474        }
475    }
476
477    #[test]
478    fn empty_values() {
479        let empty_i64: Vec<i64> = vec![];
480        let empty_f64: Vec<f64> = vec![];
481
482        for codec in [
483            ColumnCodec::DeltaFastLanesLz4,
484            ColumnCodec::FastLanesLz4,
485            ColumnCodec::AlpFastLanesLz4,
486        ] {
487            if matches!(codec, ColumnCodec::AlpFastLanesLz4) {
488                let enc = encode_f64_pipeline(&empty_f64, codec).unwrap();
489                let dec = decode_f64_pipeline(&enc, codec).unwrap();
490                assert!(dec.is_empty());
491            } else {
492                let enc = encode_i64_pipeline(&empty_i64, codec).unwrap();
493                let dec = decode_i64_pipeline(&enc, codec).unwrap();
494                assert!(dec.is_empty());
495            }
496        }
497    }
498
499    #[test]
500    fn bytes_pipeline_roundtrip() {
501        let raw: Vec<u8> = (0..1000u32).flat_map(|i| i.to_le_bytes()).collect();
502        for codec in [ColumnCodec::Raw, ColumnCodec::Lz4] {
503            let encoded = encode_bytes_pipeline(&raw, codec).unwrap();
504            let decoded = decode_bytes_pipeline(&encoded, codec).unwrap();
505            assert_eq!(decoded, raw, "bytes pipeline {codec} failed");
506        }
507    }
508}