Skip to main content

nodedb_codec/
lib.rs

1//! Compression codecs for NodeDB timeseries columnar storage.
2//!
3//! Provides per-column codec selection with **cascading compression**:
4//! type-aware encoding (ALP, FastLanes, FSST, Pcodec) followed by a terminal
5//! byte compressor (lz4_flex for hot/warm, rANS for cold/S3).
6//!
7//! Cascading chains (hot/warm — lz4 terminal):
8//! - `AlpFastLanesLz4`:   f64 metrics → ALP → FastLanes → lz4
9//! - `DeltaFastLanesLz4`: i64 timestamps/counters → Delta → FastLanes → lz4
10//! - `FastLanesLz4`:      i64 raw integers → FastLanes → lz4
11//! - `FsstLz4`:           strings/logs → FSST → lz4
12//! - `PcodecLz4`:         complex numerics → Pcodec → lz4
13//! - `AlpRdLz4`:          true doubles → ALP-RD → lz4
14//!
15//! Cold/S3 tier chains (rANS terminal):
16//! - `AlpFastLanesRans`, `DeltaFastLanesRans`, `FsstRans`
17//!
18//! Shared by Origin and Lite. Compiles to WASM.
19
20pub mod alp;
21pub mod alp_rd;
22pub mod crdt_compress;
23pub mod delta;
24pub mod detect;
25pub mod double_delta;
26pub mod error;
27pub mod fastlanes;
28pub mod fsst;
29pub mod gorilla;
30pub mod lz4;
31pub mod pcodec;
32pub mod pipeline;
33pub mod rans;
34pub mod raw;
35pub mod spherical;
36pub mod zstd_codec;
37
38/// Number of values to sample for codec auto-detection and exponent selection.
39/// Used by ALP, ALP-RD, and the codec detector.
40pub const CODEC_SAMPLE_SIZE: usize = 1024;
41
42pub use crdt_compress::CrdtOp;
43pub use delta::{DeltaDecoder, DeltaEncoder};
44pub use detect::detect_codec;
45pub use double_delta::{DoubleDeltaDecoder, DoubleDeltaEncoder};
46pub use error::CodecError;
47pub use gorilla::{GorillaDecoder, GorillaEncoder};
48pub use lz4::{Lz4Decoder, Lz4Encoder};
49pub use pipeline::{
50    decode_bytes_pipeline, decode_f64_pipeline, decode_i64_pipeline, encode_bytes_pipeline,
51    encode_f64_pipeline, encode_i64_pipeline,
52};
53pub use raw::{RawDecoder, RawEncoder};
54pub use zstd_codec::{ZstdDecoder, ZstdEncoder};
55
56use serde::{Deserialize, Serialize};
57use zerompk::{FromMessagePack, ToMessagePack};
58
59/// Codec identifier for per-column compression selection.
60///
61/// Stored in partition schema metadata so the reader knows which decoder
62/// to use for each column file.
63#[derive(
64    Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToMessagePack, FromMessagePack,
65)]
66#[serde(rename_all = "snake_case")]
67#[repr(u8)]
68#[msgpack(c_enum)]
69pub enum ColumnCodec {
70    /// Engine selects codec automatically based on column type and data
71    /// distribution (analyzed at flush time).
72    Auto = 0,
73
74    // -- Cascading chains: hot/warm (lz4 terminal) --
75    /// f64 metrics: ALP (decimal→int) → FastLanes → lz4.
76    AlpFastLanesLz4 = 1,
77    /// f64 true doubles: ALP-RD (front-bit dict) → lz4.
78    AlpRdLz4 = 2,
79    /// f64/i64 complex: Pcodec → lz4.
80    PcodecLz4 = 3,
81    /// i64 timestamps/counters: Delta → FastLanes → lz4.
82    DeltaFastLanesLz4 = 4,
83    /// i64/u32 raw integers: FastLanes → lz4.
84    FastLanesLz4 = 5,
85    /// Strings/logs: FSST (substring dict) → lz4.
86    FsstLz4 = 6,
87
88    // -- Cascading chains: cold/S3 (rANS terminal) --
89    /// f64 metrics cold: ALP → FastLanes → rANS.
90    AlpFastLanesRans = 7,
91    /// i64 cold: Delta → FastLanes → rANS.
92    DeltaFastLanesRans = 8,
93    /// Strings cold: FSST → rANS.
94    FsstRans = 9,
95
96    // -- Legacy single-step codecs (small partitions, backward compat) --
97    /// Gorilla XOR encoding — legacy f64 codec.
98    Gorilla = 10,
99    /// DoubleDelta — legacy timestamp codec.
100    DoubleDelta = 11,
101    /// Delta + varint — legacy counter codec.
102    Delta = 12,
103    /// LZ4 block compression — for string/log columns.
104    Lz4 = 13,
105    /// Zstd — for cold/archived partitions.
106    Zstd = 14,
107    /// No compression — for pre-compressed or symbol columns.
108    Raw = 15,
109}
110
111impl ColumnCodec {
112    pub fn is_compressed(&self) -> bool {
113        !matches!(self, Self::Raw | Self::Auto)
114    }
115
116    /// Whether this is a cascading (multi-stage) codec.
117    pub fn is_cascading(&self) -> bool {
118        matches!(
119            self,
120            Self::AlpFastLanesLz4
121                | Self::AlpRdLz4
122                | Self::PcodecLz4
123                | Self::DeltaFastLanesLz4
124                | Self::FastLanesLz4
125                | Self::FsstLz4
126                | Self::AlpFastLanesRans
127                | Self::DeltaFastLanesRans
128                | Self::FsstRans
129        )
130    }
131
132    /// Whether this codec uses rANS as terminal (cold tier).
133    pub fn is_cold_tier(&self) -> bool {
134        matches!(
135            self,
136            Self::AlpFastLanesRans | Self::DeltaFastLanesRans | Self::FsstRans
137        )
138    }
139
140    pub fn as_str(&self) -> &'static str {
141        match self {
142            Self::Auto => "auto",
143            Self::AlpFastLanesLz4 => "alp_fastlanes_lz4",
144            Self::AlpRdLz4 => "alp_rd_lz4",
145            Self::PcodecLz4 => "pcodec_lz4",
146            Self::DeltaFastLanesLz4 => "delta_fastlanes_lz4",
147            Self::FastLanesLz4 => "fastlanes_lz4",
148            Self::FsstLz4 => "fsst_lz4",
149            Self::AlpFastLanesRans => "alp_fastlanes_rans",
150            Self::DeltaFastLanesRans => "delta_fastlanes_rans",
151            Self::FsstRans => "fsst_rans",
152            Self::Gorilla => "gorilla",
153            Self::DoubleDelta => "double_delta",
154            Self::Delta => "delta",
155            Self::Lz4 => "lz4",
156            Self::Zstd => "zstd",
157            Self::Raw => "raw",
158        }
159    }
160}
161
162impl std::fmt::Display for ColumnCodec {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.write_str(self.as_str())
165    }
166}
167
168/// Column data type hint for codec auto-detection.
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum ColumnTypeHint {
171    Timestamp,
172    Float64,
173    Int64,
174    Symbol,
175    String,
176}
177
178/// Per-column statistics computed at flush time.
179///
180/// Stored in partition metadata for predicate pushdown and approximate
181/// query answers without decompression.
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct ColumnStatistics {
184    /// Codec used for this column in this partition.
185    pub codec: ColumnCodec,
186    /// Number of non-null values.
187    pub count: u64,
188    /// Minimum value (as f64 for numeric columns, 0.0 for non-numeric).
189    #[serde(skip_serializing_if = "Option::is_none")]
190    pub min: Option<f64>,
191    /// Maximum value.
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub max: Option<f64>,
194    /// Sum of values (for numeric columns).
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub sum: Option<f64>,
197    /// Number of distinct values (for symbol/tag columns).
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub cardinality: Option<u32>,
200    /// Compressed size in bytes for this column.
201    pub compressed_bytes: u64,
202    /// Uncompressed size in bytes.
203    pub uncompressed_bytes: u64,
204}
205
206impl ColumnStatistics {
207    /// Create empty statistics with just the codec.
208    pub fn new(codec: ColumnCodec) -> Self {
209        Self {
210            codec,
211            count: 0,
212            min: None,
213            max: None,
214            sum: None,
215            cardinality: None,
216            compressed_bytes: 0,
217            uncompressed_bytes: 0,
218        }
219    }
220
221    /// Compute statistics for an i64 column.
222    pub fn from_i64(values: &[i64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
223        if values.is_empty() {
224            return Self::new(codec);
225        }
226
227        let mut min = values[0];
228        let mut max = values[0];
229        let mut sum: i128 = 0;
230
231        for &v in values {
232            if v < min {
233                min = v;
234            }
235            if v > max {
236                max = v;
237            }
238            sum += v as i128;
239        }
240
241        Self {
242            codec,
243            count: values.len() as u64,
244            min: Some(min as f64),
245            max: Some(max as f64),
246            sum: Some(sum as f64),
247            cardinality: None,
248            compressed_bytes,
249            uncompressed_bytes: (values.len() * 8) as u64,
250        }
251    }
252
253    /// Compute statistics for an f64 column.
254    pub fn from_f64(values: &[f64], codec: ColumnCodec, compressed_bytes: u64) -> Self {
255        if values.is_empty() {
256            return Self::new(codec);
257        }
258
259        let mut min = values[0];
260        let mut max = values[0];
261        let mut sum: f64 = 0.0;
262
263        for &v in values {
264            if v < min {
265                min = v;
266            }
267            if v > max {
268                max = v;
269            }
270            sum += v;
271        }
272
273        Self {
274            codec,
275            count: values.len() as u64,
276            min: Some(min),
277            max: Some(max),
278            sum: Some(sum),
279            cardinality: None,
280            compressed_bytes,
281            uncompressed_bytes: (values.len() * 8) as u64,
282        }
283    }
284
285    /// Compute statistics for a symbol column.
286    pub fn from_symbols(
287        values: &[u32],
288        cardinality: u32,
289        codec: ColumnCodec,
290        compressed_bytes: u64,
291    ) -> Self {
292        Self {
293            codec,
294            count: values.len() as u64,
295            min: None,
296            max: None,
297            sum: None,
298            cardinality: Some(cardinality),
299            compressed_bytes,
300            uncompressed_bytes: (values.len() * 4) as u64,
301        }
302    }
303
304    /// Compression ratio (uncompressed / compressed). Returns 1.0 if no data.
305    pub fn compression_ratio(&self) -> f64 {
306        if self.compressed_bytes == 0 {
307            return 1.0;
308        }
309        self.uncompressed_bytes as f64 / self.compressed_bytes as f64
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn column_codec_serde_roundtrip() {
319        for codec in [
320            ColumnCodec::Auto,
321            ColumnCodec::AlpFastLanesLz4,
322            ColumnCodec::AlpRdLz4,
323            ColumnCodec::PcodecLz4,
324            ColumnCodec::DeltaFastLanesLz4,
325            ColumnCodec::FastLanesLz4,
326            ColumnCodec::FsstLz4,
327            ColumnCodec::AlpFastLanesRans,
328            ColumnCodec::DeltaFastLanesRans,
329            ColumnCodec::FsstRans,
330            ColumnCodec::Gorilla,
331            ColumnCodec::DoubleDelta,
332            ColumnCodec::Delta,
333            ColumnCodec::Lz4,
334            ColumnCodec::Zstd,
335            ColumnCodec::Raw,
336        ] {
337            let json = sonic_rs::to_string(&codec).unwrap();
338            let back: ColumnCodec = sonic_rs::from_str(&json).unwrap();
339            assert_eq!(codec, back, "serde roundtrip failed for {codec}");
340        }
341    }
342
343    #[test]
344    fn column_statistics_i64() {
345        let values = vec![10i64, 20, 30, 40, 50];
346        let stats = ColumnStatistics::from_i64(&values, ColumnCodec::Delta, 12);
347        assert_eq!(stats.count, 5);
348        assert_eq!(stats.min, Some(10.0));
349        assert_eq!(stats.max, Some(50.0));
350        assert_eq!(stats.sum, Some(150.0));
351        assert_eq!(stats.uncompressed_bytes, 40);
352        assert_eq!(stats.compressed_bytes, 12);
353    }
354
355    #[test]
356    fn column_statistics_f64() {
357        let values = vec![1.5f64, 2.5, 3.5];
358        let stats = ColumnStatistics::from_f64(&values, ColumnCodec::Gorilla, 8);
359        assert_eq!(stats.count, 3);
360        assert_eq!(stats.min, Some(1.5));
361        assert_eq!(stats.max, Some(3.5));
362        assert_eq!(stats.sum, Some(7.5));
363    }
364
365    #[test]
366    fn column_statistics_symbols() {
367        let values = vec![0u32, 1, 2, 0, 1];
368        let stats = ColumnStatistics::from_symbols(&values, 3, ColumnCodec::Raw, 20);
369        assert_eq!(stats.count, 5);
370        assert_eq!(stats.cardinality, Some(3));
371        assert!(stats.min.is_none());
372    }
373
374    #[test]
375    fn compression_ratio_calculation() {
376        let stats = ColumnStatistics {
377            codec: ColumnCodec::Delta,
378            count: 100,
379            min: None,
380            max: None,
381            sum: None,
382            cardinality: None,
383            compressed_bytes: 200,
384            uncompressed_bytes: 800,
385        };
386        assert!((stats.compression_ratio() - 4.0).abs() < f64::EPSILON);
387    }
388}