Skip to main content

nodedb_codec/
pipeline.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Cascading codec pipeline: chains type-aware encoding → terminal compressor.
4//!
5//! Each `ColumnCodec` variant maps to a fixed pipeline. The `encode_pipeline()`
6//! and `decode_pipeline()` functions dispatch to the appropriate chain.
7//!
8//! Cascading chains:
9//! - `AlpFastLanesLz4`:    f64 → ALP → FastLanes bit-pack → lz4
10//! - `DeltaFastLanesLz4`:  i64 → Delta → FastLanes bit-pack → lz4
11//! - `FastLanesLz4`:       i64 → FastLanes bit-pack → lz4
12//!
13//! The pipeline writes a 1-byte codec ID header so the decoder knows which
14//! chain to reverse. This header is read by `decode_pipeline()`.
15
16use crate::ColumnCodec;
17use crate::error::CodecError;
18
19// ---------------------------------------------------------------------------
20// Pipeline encode
21// ---------------------------------------------------------------------------
22
23/// Encode i64 values through a cascading pipeline.
24///
25/// For cascading codecs, chains the appropriate stages. For legacy codecs,
26/// delegates to the single-step encoder.
27pub fn encode_i64_pipeline(values: &[i64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
28    match codec {
29        // Cascading chains — lz4 terminal.
30        ColumnCodec::DeltaFastLanesLz4 => encode_delta_fastlanes_lz4(values),
31        ColumnCodec::FastLanesLz4 => encode_fastlanes_lz4_i64(values),
32        ColumnCodec::PcodecLz4 => {
33            let pco_bytes = crate::pcodec::encode_i64(values)?;
34            Ok(crate::lz4::encode(&pco_bytes))
35        }
36        // Cascading chains — rANS terminal (cold tier).
37        ColumnCodec::DeltaFastLanesRans => encode_delta_fastlanes_rans(values),
38        // Single-step codecs (small partitions / non-ALP-encodable data).
39        ColumnCodec::DoubleDelta => Ok(crate::double_delta::encode(values)),
40        ColumnCodec::Delta => Ok(crate::delta::encode(values)),
41        ColumnCodec::Gorilla => Ok(crate::gorilla::encode_timestamps(values)),
42        ColumnCodec::Raw => {
43            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
44            Ok(crate::raw::encode(&raw))
45        }
46        ColumnCodec::Lz4 => {
47            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
48            Ok(crate::lz4::encode(&raw))
49        }
50        ColumnCodec::Zstd => {
51            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
52            crate::zstd_codec::encode(&raw)
53        }
54        _ => Ok(crate::delta::encode(values)),
55    }
56}
57
58/// Encode f64 values through a cascading pipeline.
59pub fn encode_f64_pipeline(values: &[f64], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
60    match codec {
61        // Cascading chains — lz4 terminal.
62        ColumnCodec::AlpFastLanesLz4 => encode_alp_fastlanes_lz4(values),
63        ColumnCodec::AlpRdLz4 => {
64            let alp_rd_bytes = crate::alp_rd::encode(values)?;
65            Ok(crate::lz4::encode(&alp_rd_bytes))
66        }
67        ColumnCodec::PcodecLz4 => {
68            let pco_bytes = crate::pcodec::encode_f64(values)?;
69            Ok(crate::lz4::encode(&pco_bytes))
70        }
71        // Cascading chains — rANS terminal (cold tier).
72        ColumnCodec::AlpFastLanesRans => encode_alp_fastlanes_rans(values),
73        // Single-step codecs (small partitions / non-ALP-encodable data).
74        ColumnCodec::Gorilla => Ok(crate::gorilla::encode_f64(values)),
75        ColumnCodec::Raw => {
76            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
77            Ok(crate::raw::encode(&raw))
78        }
79        ColumnCodec::Lz4 => {
80            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
81            Ok(crate::lz4::encode(&raw))
82        }
83        ColumnCodec::Zstd => {
84            let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
85            crate::zstd_codec::encode(&raw)
86        }
87        _ => Ok(crate::gorilla::encode_f64(values)),
88    }
89}
90
91/// Encode raw bytes (symbol columns or string data) through a pipeline.
92pub fn encode_bytes_pipeline(raw: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
93    match codec {
94        ColumnCodec::FsstLz4 => {
95            // FSST expects an array of strings. Treat raw as a single blob.
96            let fsst_bytes = crate::fsst::encode(&[raw]);
97            Ok(crate::lz4::encode(&fsst_bytes))
98        }
99        ColumnCodec::FsstRans => {
100            let fsst_bytes = crate::fsst::encode(&[raw]);
101            Ok(crate::rans::encode(&fsst_bytes))
102        }
103        ColumnCodec::Raw => Ok(crate::raw::encode(raw)),
104        ColumnCodec::Lz4 => Ok(crate::lz4::encode(raw)),
105        ColumnCodec::Zstd => crate::zstd_codec::encode(raw),
106        ColumnCodec::FastLanesLz4 => {
107            // Symbol IDs are u32 — convert to i64, FastLanes pack, lz4.
108            if raw.len().is_multiple_of(4) {
109                let i64_vals: Vec<i64> = raw
110                    .chunks_exact(4)
111                    .map(|c| u32::from_le_bytes([c[0], c[1], c[2], c[3]]) as i64)
112                    .collect();
113                encode_fastlanes_lz4_i64(&i64_vals)
114            } else {
115                Ok(crate::raw::encode(raw))
116            }
117        }
118        _ => Ok(crate::raw::encode(raw)),
119    }
120}
121
122// ---------------------------------------------------------------------------
123// Pipeline decode
124// ---------------------------------------------------------------------------
125
126/// Decode i64 values from a cascading pipeline.
127pub fn decode_i64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<i64>, CodecError> {
128    match codec {
129        // Cascading — lz4 terminal.
130        ColumnCodec::DeltaFastLanesLz4 => decode_delta_fastlanes_lz4(data),
131        ColumnCodec::FastLanesLz4 => decode_fastlanes_lz4_i64(data),
132        ColumnCodec::PcodecLz4 => {
133            let pco_bytes = crate::lz4::decode(data)?;
134            crate::pcodec::decode_i64(&pco_bytes)
135        }
136        // Cascading — rANS terminal.
137        ColumnCodec::DeltaFastLanesRans => decode_delta_fastlanes_rans(data),
138        // Single-step codecs (small partitions / non-ALP-encodable data).
139        ColumnCodec::DoubleDelta => crate::double_delta::decode(data),
140        ColumnCodec::Delta => crate::delta::decode(data),
141        ColumnCodec::Gorilla => crate::gorilla::decode_timestamps(data),
142        ColumnCodec::Raw => {
143            let raw = crate::raw::decode(data)?;
144            raw_to_i64(&raw)
145        }
146        ColumnCodec::Lz4 => {
147            let raw = crate::lz4::decode(data)?;
148            raw_to_i64(&raw)
149        }
150        ColumnCodec::Zstd => {
151            let raw = crate::zstd_codec::decode(data)?;
152            raw_to_i64(&raw)
153        }
154        _ => crate::delta::decode(data),
155    }
156}
157
158/// Decode f64 values from a cascading pipeline.
159pub fn decode_f64_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<f64>, CodecError> {
160    match codec {
161        // Cascading — lz4 terminal.
162        ColumnCodec::AlpFastLanesLz4 => decode_alp_fastlanes_lz4(data),
163        ColumnCodec::AlpRdLz4 => {
164            let alp_rd_bytes = crate::lz4::decode(data)?;
165            crate::alp_rd::decode(&alp_rd_bytes)
166        }
167        ColumnCodec::PcodecLz4 => {
168            let pco_bytes = crate::lz4::decode(data)?;
169            crate::pcodec::decode_f64(&pco_bytes)
170        }
171        // Cascading — rANS terminal.
172        ColumnCodec::AlpFastLanesRans => decode_alp_fastlanes_rans(data),
173        // Single-step codecs (small partitions / non-ALP-encodable data).
174        ColumnCodec::Gorilla => crate::gorilla::decode_f64(data),
175        ColumnCodec::Raw => {
176            let raw = crate::raw::decode(data)?;
177            raw_to_f64(&raw)
178        }
179        ColumnCodec::Lz4 => {
180            let raw = crate::lz4::decode(data)?;
181            raw_to_f64(&raw)
182        }
183        ColumnCodec::Zstd => {
184            let raw = crate::zstd_codec::decode(data)?;
185            raw_to_f64(&raw)
186        }
187        _ => crate::gorilla::decode_f64(data),
188    }
189}
190
191/// Decode raw bytes (symbol columns or string data) from a pipeline.
192pub fn decode_bytes_pipeline(data: &[u8], codec: ColumnCodec) -> Result<Vec<u8>, CodecError> {
193    match codec {
194        ColumnCodec::FsstLz4 => {
195            let fsst_bytes = crate::lz4::decode(data)?;
196            let strings = crate::fsst::decode(&fsst_bytes)?;
197            strings.into_iter().next().ok_or(CodecError::Corrupt {
198                detail: "FSST decode produced no output".into(),
199            })
200        }
201        ColumnCodec::FsstRans => {
202            let fsst_bytes = crate::rans::decode(data)?;
203            let strings = crate::fsst::decode(&fsst_bytes)?;
204            strings.into_iter().next().ok_or(CodecError::Corrupt {
205                detail: "FSST decode produced no output".into(),
206            })
207        }
208        ColumnCodec::Raw => crate::raw::decode(data),
209        ColumnCodec::Lz4 => crate::lz4::decode(data),
210        ColumnCodec::Zstd => crate::zstd_codec::decode(data),
211        ColumnCodec::FastLanesLz4 => {
212            let i64_vals = decode_fastlanes_lz4_i64(data)?;
213            Ok(i64_vals
214                .iter()
215                .flat_map(|&v| (v as u32).to_le_bytes())
216                .collect())
217        }
218        _ => crate::raw::decode(data).or_else(|_| Ok(data.to_vec())),
219    }
220}
221
222// ---------------------------------------------------------------------------
223// Cascading chain implementations
224// ---------------------------------------------------------------------------
225
226/// ALP → FastLanes → LZ4: f64 metrics (the big win).
227fn encode_alp_fastlanes_lz4(values: &[f64]) -> Result<Vec<u8>, CodecError> {
228    // Stage 1: ALP encodes f64 → FastLanes-packed i64 bytes.
229    let alp_encoded = crate::alp::encode(values);
230    // Stage 2: LZ4 terminal compression on the ALP output.
231    Ok(crate::lz4::encode(&alp_encoded))
232}
233
234fn decode_alp_fastlanes_lz4(data: &[u8]) -> Result<Vec<f64>, CodecError> {
235    // Reverse: LZ4 decompress → ALP decode (which internally FastLanes-decodes).
236    let alp_bytes = crate::lz4::decode(data)?;
237    crate::alp::decode(&alp_bytes)
238}
239
240/// Delta → FastLanes → LZ4: i64 timestamps and counters.
241fn encode_delta_fastlanes_lz4(values: &[i64]) -> Result<Vec<u8>, CodecError> {
242    if values.is_empty() {
243        return Ok(crate::lz4::encode(&crate::fastlanes::encode(&[])));
244    }
245
246    // Stage 1a: Compute deltas.
247    let mut deltas = Vec::with_capacity(values.len());
248    deltas.push(values[0]); // First value stored raw.
249    for i in 1..values.len() {
250        deltas.push(values[i].wrapping_sub(values[i - 1]));
251    }
252
253    // Stage 1b: FastLanes bit-pack the deltas.
254    let packed = crate::fastlanes::encode(&deltas);
255
256    // Stage 2: LZ4 terminal.
257    Ok(crate::lz4::encode(&packed))
258}
259
260fn decode_delta_fastlanes_lz4(data: &[u8]) -> Result<Vec<i64>, CodecError> {
261    // Reverse: LZ4 → FastLanes unpack → reconstruct from deltas.
262    let packed = crate::lz4::decode(data)?;
263    let deltas = crate::fastlanes::decode(&packed)?;
264
265    if deltas.is_empty() {
266        return Ok(Vec::new());
267    }
268
269    // Reconstruct values from deltas.
270    let mut values = Vec::with_capacity(deltas.len());
271    values.push(deltas[0]); // First value is raw.
272    for &d in &deltas[1..] {
273        let prev = values[values.len() - 1];
274        values.push(prev.wrapping_add(d));
275    }
276
277    Ok(values)
278}
279
280/// FastLanes → LZ4: raw integers (symbol IDs, non-delta columns).
281fn encode_fastlanes_lz4_i64(values: &[i64]) -> Result<Vec<u8>, CodecError> {
282    let packed = crate::fastlanes::encode(values);
283    Ok(crate::lz4::encode(&packed))
284}
285
286fn decode_fastlanes_lz4_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
287    let packed = crate::lz4::decode(data)?;
288    crate::fastlanes::decode(&packed)
289}
290
291/// ALP → FastLanes → rANS: f64 metrics cold tier.
292fn encode_alp_fastlanes_rans(values: &[f64]) -> Result<Vec<u8>, CodecError> {
293    let alp_encoded = crate::alp::encode(values);
294    Ok(crate::rans::encode(&alp_encoded))
295}
296
297fn decode_alp_fastlanes_rans(data: &[u8]) -> Result<Vec<f64>, CodecError> {
298    let alp_bytes = crate::rans::decode(data)?;
299    crate::alp::decode(&alp_bytes)
300}
301
302/// Delta → FastLanes → rANS: i64 cold tier.
303fn encode_delta_fastlanes_rans(values: &[i64]) -> Result<Vec<u8>, CodecError> {
304    if values.is_empty() {
305        return Ok(crate::rans::encode(&crate::fastlanes::encode(&[])));
306    }
307
308    let mut deltas = Vec::with_capacity(values.len());
309    deltas.push(values[0]);
310    for i in 1..values.len() {
311        deltas.push(values[i].wrapping_sub(values[i - 1]));
312    }
313
314    let packed = crate::fastlanes::encode(&deltas);
315    Ok(crate::rans::encode(&packed))
316}
317
318fn decode_delta_fastlanes_rans(data: &[u8]) -> Result<Vec<i64>, CodecError> {
319    let packed = crate::rans::decode(data)?;
320    let deltas = crate::fastlanes::decode(&packed)?;
321
322    if deltas.is_empty() {
323        return Ok(Vec::new());
324    }
325
326    let mut values = Vec::with_capacity(deltas.len());
327    values.push(deltas[0]);
328    for &d in &deltas[1..] {
329        let prev = values[values.len() - 1];
330        values.push(prev.wrapping_add(d));
331    }
332
333    Ok(values)
334}
335
336// ---------------------------------------------------------------------------
337// Helpers
338// ---------------------------------------------------------------------------
339
340fn raw_to_i64(data: &[u8]) -> Result<Vec<i64>, CodecError> {
341    if !data.len().is_multiple_of(8) {
342        return Err(CodecError::Corrupt {
343            detail: "i64 data not aligned to 8 bytes".into(),
344        });
345    }
346    Ok(data
347        .chunks_exact(8)
348        .map(|c| i64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
349        .collect())
350}
351
352fn raw_to_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
353    if !data.len().is_multiple_of(8) {
354        return Err(CodecError::Corrupt {
355            detail: "f64 data not aligned to 8 bytes".into(),
356        });
357    }
358    Ok(data
359        .chunks_exact(8)
360        .map(|c| f64::from_le_bytes([c[0], c[1], c[2], c[3], c[4], c[5], c[6], c[7]]))
361        .collect())
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[test]
369    fn alp_fastlanes_lz4_decimal_metrics() {
370        let values: Vec<f64> = (0..10_000).map(|i| i as f64 * 0.1).collect();
371        let encoded = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4).unwrap();
372        let decoded = decode_f64_pipeline(&encoded, ColumnCodec::AlpFastLanesLz4).unwrap();
373
374        for (i, (a, b)) in values.iter().zip(decoded.iter()).enumerate() {
375            assert_eq!(a.to_bits(), b.to_bits(), "mismatch at {i}");
376        }
377
378        let raw_size = values.len() * 8;
379        let ratio = raw_size as f64 / encoded.len() as f64;
380        assert!(
381            ratio > 3.0,
382            "ALP+FL+LZ4 should compress decimals >3x, got {ratio:.1}x"
383        );
384    }
385
386    #[test]
387    fn alp_beats_gorilla_on_decimals() {
388        let mut values = Vec::with_capacity(10_000);
389        let mut rng: u64 = 42;
390        for _ in 0..10_000 {
391            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
392            let cpu = ((rng >> 33) as f64 / (u32::MAX as f64)) * 100.0;
393            values.push((cpu * 10.0).round() / 10.0);
394        }
395
396        let alp_size = encode_f64_pipeline(&values, ColumnCodec::AlpFastLanesLz4)
397            .unwrap()
398            .len();
399        let gorilla_size = encode_f64_pipeline(&values, ColumnCodec::Gorilla)
400            .unwrap()
401            .len();
402
403        assert!(
404            alp_size < gorilla_size,
405            "ALP ({alp_size}) should beat Gorilla ({gorilla_size}) on decimal metrics"
406        );
407    }
408
409    #[test]
410    fn delta_fastlanes_lz4_timestamps() {
411        let values: Vec<i64> = (0..10_000)
412            .map(|i| 1_700_000_000_000 + i * 10_000)
413            .collect();
414        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
415        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
416        assert_eq!(decoded, values);
417
418        let raw_size = values.len() * 8;
419        let ratio = raw_size as f64 / encoded.len() as f64;
420        assert!(
421            ratio > 5.0,
422            "Delta+FL+LZ4 should compress timestamps >5x, got {ratio:.1}x"
423        );
424    }
425
426    #[test]
427    fn delta_fastlanes_lz4_jittered_timestamps() {
428        let mut values = Vec::with_capacity(10_000);
429        let mut ts = 1_700_000_000_000i64;
430        let mut rng: u64 = 42;
431        for _ in 0..10_000 {
432            values.push(ts);
433            rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
434            let jitter = ((rng >> 33) as i64 % 101) - 50;
435            ts += 10_000 + jitter;
436        }
437        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
438        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
439        assert_eq!(decoded, values);
440    }
441
442    #[test]
443    fn delta_fastlanes_lz4_counters() {
444        let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
445        let encoded = encode_i64_pipeline(&values, ColumnCodec::DeltaFastLanesLz4).unwrap();
446        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::DeltaFastLanesLz4).unwrap();
447        assert_eq!(decoded, values);
448    }
449
450    #[test]
451    fn fastlanes_lz4_symbol_ids() {
452        let values: Vec<i64> = (0..5000).map(|i| i % 150).collect();
453        let encoded = encode_i64_pipeline(&values, ColumnCodec::FastLanesLz4).unwrap();
454        let decoded = decode_i64_pipeline(&encoded, ColumnCodec::FastLanesLz4).unwrap();
455        assert_eq!(decoded, values);
456    }
457
458    #[test]
459    fn legacy_codecs_still_work() {
460        let i64_vals: Vec<i64> = (0..1000).collect();
461        for codec in [
462            ColumnCodec::DoubleDelta,
463            ColumnCodec::Delta,
464            ColumnCodec::Gorilla,
465        ] {
466            let encoded = encode_i64_pipeline(&i64_vals, codec).unwrap();
467            let decoded = decode_i64_pipeline(&encoded, codec).unwrap();
468            assert_eq!(decoded, i64_vals, "legacy i64 codec {codec} failed");
469        }
470
471        let f64_vals: Vec<f64> = (0..1000).map(|i| i as f64 * 0.5).collect();
472        let encoded = encode_f64_pipeline(&f64_vals, ColumnCodec::Gorilla).unwrap();
473        let decoded = decode_f64_pipeline(&encoded, ColumnCodec::Gorilla).unwrap();
474        for (a, b) in f64_vals.iter().zip(decoded.iter()) {
475            assert_eq!(a.to_bits(), b.to_bits());
476        }
477    }
478
479    #[test]
480    fn empty_values() {
481        let empty_i64: Vec<i64> = vec![];
482        let empty_f64: Vec<f64> = vec![];
483
484        for codec in [
485            ColumnCodec::DeltaFastLanesLz4,
486            ColumnCodec::FastLanesLz4,
487            ColumnCodec::AlpFastLanesLz4,
488        ] {
489            if matches!(codec, ColumnCodec::AlpFastLanesLz4) {
490                let enc = encode_f64_pipeline(&empty_f64, codec).unwrap();
491                let dec = decode_f64_pipeline(&enc, codec).unwrap();
492                assert!(dec.is_empty());
493            } else {
494                let enc = encode_i64_pipeline(&empty_i64, codec).unwrap();
495                let dec = decode_i64_pipeline(&enc, codec).unwrap();
496                assert!(dec.is_empty());
497            }
498        }
499    }
500
501    #[test]
502    fn bytes_pipeline_roundtrip() {
503        let raw: Vec<u8> = (0..1000u32).flat_map(|i| i.to_le_bytes()).collect();
504        for codec in [ColumnCodec::Raw, ColumnCodec::Lz4] {
505            let encoded = encode_bytes_pipeline(&raw, codec).unwrap();
506            let decoded = decode_bytes_pipeline(&encoded, codec).unwrap();
507            assert_eq!(decoded, raw, "bytes pipeline {codec} failed");
508        }
509    }
510}