Skip to main content

nodedb_codec/
codec_types.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Codec identifiers, resolved codec type, column statistics, and name parsing.
4//!
5//! [`ColumnCodec`] is the user-facing codec selector (includes `Auto`).
6//! [`ResolvedColumnCodec`] is the on-disk form after auto-detection runs.
7//! [`ColumnStatistics`] stores per-column flush-time statistics.
8//! [`parse_codec_name`] is the single gate for codec names from user input.
9
10use serde::{Deserialize, Serialize};
11use zerompk::{FromMessagePack, ToMessagePack};
12
13use crate::error::CodecError;
14
15/// Codec identifier for per-column compression selection.
16///
17/// Stored in partition schema metadata so the reader knows which decoder
18/// to use for each column file.
19#[derive(
20    Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToMessagePack, FromMessagePack,
21)]
22#[serde(rename_all = "snake_case")]
23#[repr(u8)]
24#[msgpack(c_enum)]
25pub enum ColumnCodec {
26    /// Engine selects codec automatically based on column type and data
27    /// distribution (analyzed at flush time).
28    Auto = 0,
29
30    // -- Cascading chains: hot/warm (lz4 terminal) --
31    /// f64 metrics: ALP (decimal→int) → FastLanes → lz4.
32    AlpFastLanesLz4 = 1,
33    /// f64 true doubles: ALP-RD (front-bit dict) → lz4.
34    AlpRdLz4 = 2,
35    /// f64/i64 complex: Pcodec → lz4.
36    PcodecLz4 = 3,
37    /// i64 timestamps/counters: Delta → FastLanes → lz4.
38    DeltaFastLanesLz4 = 4,
39    /// i64/u32 raw integers: FastLanes → lz4.
40    FastLanesLz4 = 5,
41    /// Strings/logs: FSST (substring dict) → lz4.
42    FsstLz4 = 6,
43
44    // -- Cascading chains: cold/S3 (rANS terminal) --
45    /// f64 metrics cold: ALP → FastLanes → rANS.
46    AlpFastLanesRans = 7,
47    /// i64 cold: Delta → FastLanes → rANS.
48    DeltaFastLanesRans = 8,
49    /// Strings cold: FSST → rANS.
50    FsstRans = 9,
51
52    // -- Single-step codecs used by `detect.rs` auto-selection and timeseries column writers --
53    /// Gorilla XOR encoding — f64 codec selected by detect.rs for float columns.
54    Gorilla = 10,
55    /// DoubleDelta — timestamp codec selected by detect.rs for monotonic timestamp columns.
56    DoubleDelta = 11,
57    /// Delta + varint — counter codec selected by detect.rs for integer delta columns.
58    Delta = 12,
59    /// LZ4 block compression — for string/log columns.
60    Lz4 = 13,
61    /// Zstd — for cold/archived partitions.
62    Zstd = 14,
63    /// No compression — for pre-compressed or symbol columns.
64    Raw = 15,
65}
66
67impl ColumnCodec {
68    pub fn is_compressed(&self) -> bool {
69        !matches!(self, Self::Raw | Self::Auto)
70    }
71
72    /// Whether this is a cascading (multi-stage) codec.
73    pub fn is_cascading(&self) -> bool {
74        matches!(
75            self,
76            Self::AlpFastLanesLz4
77                | Self::AlpRdLz4
78                | Self::PcodecLz4
79                | Self::DeltaFastLanesLz4
80                | Self::FastLanesLz4
81                | Self::FsstLz4
82                | Self::AlpFastLanesRans
83                | Self::DeltaFastLanesRans
84                | Self::FsstRans
85        )
86    }
87
88    /// Whether this codec uses rANS as terminal (cold tier).
89    pub fn is_cold_tier(&self) -> bool {
90        matches!(
91            self,
92            Self::AlpFastLanesRans | Self::DeltaFastLanesRans | Self::FsstRans
93        )
94    }
95
96    pub fn as_str(&self) -> &'static str {
97        match self {
98            Self::Auto => "auto",
99            Self::AlpFastLanesLz4 => "alp_fastlanes_lz4",
100            Self::AlpRdLz4 => "alp_rd_lz4",
101            Self::PcodecLz4 => "pcodec_lz4",
102            Self::DeltaFastLanesLz4 => "delta_fastlanes_lz4",
103            Self::FastLanesLz4 => "fastlanes_lz4",
104            Self::FsstLz4 => "fsst_lz4",
105            Self::AlpFastLanesRans => "alp_fastlanes_rans",
106            Self::DeltaFastLanesRans => "delta_fastlanes_rans",
107            Self::FsstRans => "fsst_rans",
108            Self::Gorilla => "gorilla",
109            Self::DoubleDelta => "double_delta",
110            Self::Delta => "delta",
111            Self::Lz4 => "lz4",
112            Self::Zstd => "zstd",
113            Self::Raw => "raw",
114        }
115    }
116
117    /// Resolve `Auto` to a concrete codec using the provided detection result,
118    /// or return an error if this is called with `Auto` where a concrete value
119    /// is required (i.e. a caller forgot to run detection first).
120    ///
121    /// For callers that have already run detection and hold a non-`Auto`
122    /// codec, this is a zero-cost newtype wrap.
123    pub fn try_resolve(self) -> Result<ResolvedColumnCodec, CodecError> {
124        match self {
125            Self::Auto => Err(CodecError::UnresolvedAuto),
126            Self::AlpFastLanesLz4 => Ok(ResolvedColumnCodec::AlpFastLanesLz4),
127            Self::AlpRdLz4 => Ok(ResolvedColumnCodec::AlpRdLz4),
128            Self::PcodecLz4 => Ok(ResolvedColumnCodec::PcodecLz4),
129            Self::DeltaFastLanesLz4 => Ok(ResolvedColumnCodec::DeltaFastLanesLz4),
130            Self::FastLanesLz4 => Ok(ResolvedColumnCodec::FastLanesLz4),
131            Self::FsstLz4 => Ok(ResolvedColumnCodec::FsstLz4),
132            Self::AlpFastLanesRans => Ok(ResolvedColumnCodec::AlpFastLanesRans),
133            Self::DeltaFastLanesRans => Ok(ResolvedColumnCodec::DeltaFastLanesRans),
134            Self::FsstRans => Ok(ResolvedColumnCodec::FsstRans),
135            Self::Gorilla => Ok(ResolvedColumnCodec::Gorilla),
136            Self::DoubleDelta => Ok(ResolvedColumnCodec::DoubleDelta),
137            Self::Delta => Ok(ResolvedColumnCodec::Delta),
138            Self::Lz4 => Ok(ResolvedColumnCodec::Lz4),
139            Self::Zstd => Ok(ResolvedColumnCodec::Zstd),
140            Self::Raw => Ok(ResolvedColumnCodec::Raw),
141        }
142    }
143}
144
145impl std::fmt::Display for ColumnCodec {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.write_str(self.as_str())
148    }
149}
150
151/// Parse a user-supplied codec name string into a [`ColumnCodec`].
152///
153/// Accepts **only** the exact canonical lowercase snake_case forms produced by
154/// [`ColumnCodec::as_str()`]. No case folding, no hyphen variants, no aliases.
155/// This is the single gate that must be used whenever codec names enter the
156/// system from user input (DDL `WITH (codec=…)`, REST params, config files).
157///
158/// # Errors
159///
160/// Returns [`CodecError::UnknownCodec`] if `s` is not an exact match.
161pub fn parse_codec_name(s: &str) -> Result<ColumnCodec, CodecError> {
162    match s {
163        "auto" => Ok(ColumnCodec::Auto),
164        "alp_fastlanes_lz4" => Ok(ColumnCodec::AlpFastLanesLz4),
165        "alp_rd_lz4" => Ok(ColumnCodec::AlpRdLz4),
166        "pcodec_lz4" => Ok(ColumnCodec::PcodecLz4),
167        "delta_fastlanes_lz4" => Ok(ColumnCodec::DeltaFastLanesLz4),
168        "fastlanes_lz4" => Ok(ColumnCodec::FastLanesLz4),
169        "fsst_lz4" => Ok(ColumnCodec::FsstLz4),
170        "alp_fastlanes_rans" => Ok(ColumnCodec::AlpFastLanesRans),
171        "delta_fastlanes_rans" => Ok(ColumnCodec::DeltaFastLanesRans),
172        "fsst_rans" => Ok(ColumnCodec::FsstRans),
173        "gorilla" => Ok(ColumnCodec::Gorilla),
174        "double_delta" => Ok(ColumnCodec::DoubleDelta),
175        "delta" => Ok(ColumnCodec::Delta),
176        "lz4" => Ok(ColumnCodec::Lz4),
177        "zstd" => Ok(ColumnCodec::Zstd),
178        "raw" => Ok(ColumnCodec::Raw),
179        _ => Err(CodecError::UnknownCodec {
180            name: s.to_owned(),
181            valid: "auto, alp_fastlanes_lz4, alp_rd_lz4, pcodec_lz4, delta_fastlanes_lz4, \
182                    fastlanes_lz4, fsst_lz4, alp_fastlanes_rans, delta_fastlanes_rans, \
183                    fsst_rans, gorilla, double_delta, delta, lz4, zstd, raw",
184        }),
185    }
186}
187
188/// A `ColumnCodec` that has been resolved away from `Auto`.
189///
190/// Invariant: this type can never hold the `Auto` variant. All on-disk
191/// column headers (`ColumnMeta.codec`) and per-column statistics
192/// (`ColumnStatistics.codec`) use `ResolvedColumnCodec`, making it a
193/// compile-time guarantee that `Auto` never survives to disk.
194///
195/// The `#[repr(u8)]` discriminants are **identical** to the corresponding
196/// `ColumnCodec` discriminants so that on-disk byte values are unchanged.
197/// `Auto` (discriminant 0) is intentionally absent.
198#[derive(
199    Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, ToMessagePack, FromMessagePack,
200)]
201#[serde(rename_all = "snake_case")]
202#[repr(u8)]
203#[msgpack(c_enum)]
204pub enum ResolvedColumnCodec {
205    AlpFastLanesLz4 = 1,
206    AlpRdLz4 = 2,
207    PcodecLz4 = 3,
208    DeltaFastLanesLz4 = 4,
209    FastLanesLz4 = 5,
210    FsstLz4 = 6,
211    AlpFastLanesRans = 7,
212    DeltaFastLanesRans = 8,
213    FsstRans = 9,
214    Gorilla = 10,
215    DoubleDelta = 11,
216    Delta = 12,
217    Lz4 = 13,
218    Zstd = 14,
219    Raw = 15,
220}
221
222impl ResolvedColumnCodec {
223    /// Convert back to `ColumnCodec` for use with codec pipelines that
224    /// accept the full enum (e.g. `encode_i64_pipeline`, `decode_f64_pipeline`).
225    pub fn into_column_codec(self) -> ColumnCodec {
226        match self {
227            Self::AlpFastLanesLz4 => ColumnCodec::AlpFastLanesLz4,
228            Self::AlpRdLz4 => ColumnCodec::AlpRdLz4,
229            Self::PcodecLz4 => ColumnCodec::PcodecLz4,
230            Self::DeltaFastLanesLz4 => ColumnCodec::DeltaFastLanesLz4,
231            Self::FastLanesLz4 => ColumnCodec::FastLanesLz4,
232            Self::FsstLz4 => ColumnCodec::FsstLz4,
233            Self::AlpFastLanesRans => ColumnCodec::AlpFastLanesRans,
234            Self::DeltaFastLanesRans => ColumnCodec::DeltaFastLanesRans,
235            Self::FsstRans => ColumnCodec::FsstRans,
236            Self::Gorilla => ColumnCodec::Gorilla,
237            Self::DoubleDelta => ColumnCodec::DoubleDelta,
238            Self::Delta => ColumnCodec::Delta,
239            Self::Lz4 => ColumnCodec::Lz4,
240            Self::Zstd => ColumnCodec::Zstd,
241            Self::Raw => ColumnCodec::Raw,
242        }
243    }
244
245    pub fn as_str(self) -> &'static str {
246        match self {
247            Self::AlpFastLanesLz4 => "alp_fastlanes_lz4",
248            Self::AlpRdLz4 => "alp_rd_lz4",
249            Self::PcodecLz4 => "pcodec_lz4",
250            Self::DeltaFastLanesLz4 => "delta_fastlanes_lz4",
251            Self::FastLanesLz4 => "fastlanes_lz4",
252            Self::FsstLz4 => "fsst_lz4",
253            Self::AlpFastLanesRans => "alp_fastlanes_rans",
254            Self::DeltaFastLanesRans => "delta_fastlanes_rans",
255            Self::FsstRans => "fsst_rans",
256            Self::Gorilla => "gorilla",
257            Self::DoubleDelta => "double_delta",
258            Self::Delta => "delta",
259            Self::Lz4 => "lz4",
260            Self::Zstd => "zstd",
261            Self::Raw => "raw",
262        }
263    }
264}
265
266impl std::fmt::Display for ResolvedColumnCodec {
267    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268        f.write_str(self.as_str())
269    }
270}
271
272/// Column data type hint for codec auto-detection.
273#[derive(Debug, Clone, Copy, PartialEq, Eq)]
274#[non_exhaustive]
275pub enum ColumnTypeHint {
276    Timestamp,
277    Float64,
278    Int64,
279    Symbol,
280    String,
281}
282
283/// Per-column statistics computed at flush time.
284///
285/// Stored in partition metadata for predicate pushdown and approximate
286/// query answers without decompression.
287#[derive(Debug, Clone, Serialize, Deserialize, ToMessagePack, FromMessagePack)]
288pub struct ColumnStatistics {
289    /// Codec used for this column in this partition.
290    ///
291    /// Always a concrete, resolved codec — never `Auto`.
292    pub codec: ResolvedColumnCodec,
293    /// Number of non-null values.
294    pub count: u64,
295    /// Minimum value (as f64 for numeric columns, 0.0 for non-numeric).
296    #[serde(skip_serializing_if = "Option::is_none")]
297    pub min: Option<f64>,
298    /// Maximum value.
299    #[serde(skip_serializing_if = "Option::is_none")]
300    pub max: Option<f64>,
301    /// Sum of values (for numeric columns).
302    #[serde(skip_serializing_if = "Option::is_none")]
303    pub sum: Option<f64>,
304    /// Number of distinct values (for symbol/tag columns).
305    #[serde(skip_serializing_if = "Option::is_none")]
306    pub cardinality: Option<u32>,
307    /// Compressed size in bytes for this column.
308    pub compressed_bytes: u64,
309    /// Uncompressed size in bytes.
310    pub uncompressed_bytes: u64,
311}
312
313impl ColumnStatistics {
314    /// Create empty statistics with just the codec.
315    pub fn new(codec: ResolvedColumnCodec) -> Self {
316        Self {
317            codec,
318            count: 0,
319            min: None,
320            max: None,
321            sum: None,
322            cardinality: None,
323            compressed_bytes: 0,
324            uncompressed_bytes: 0,
325        }
326    }
327
328    /// Compute statistics for an i64 column.
329    pub fn from_i64(values: &[i64], codec: ResolvedColumnCodec, compressed_bytes: u64) -> Self {
330        if values.is_empty() {
331            return Self::new(codec);
332        }
333
334        let mut min = values[0];
335        let mut max = values[0];
336        let mut sum: i128 = 0;
337
338        for &v in values {
339            if v < min {
340                min = v;
341            }
342            if v > max {
343                max = v;
344            }
345            sum += v as i128;
346        }
347
348        Self {
349            codec,
350            count: values.len() as u64,
351            min: Some(min as f64),
352            max: Some(max as f64),
353            sum: Some(sum as f64),
354            cardinality: None,
355            compressed_bytes,
356            uncompressed_bytes: (values.len() * 8) as u64,
357        }
358    }
359
360    /// Compute statistics for an f64 column.
361    pub fn from_f64(values: &[f64], codec: ResolvedColumnCodec, compressed_bytes: u64) -> Self {
362        if values.is_empty() {
363            return Self::new(codec);
364        }
365
366        let mut min = values[0];
367        let mut max = values[0];
368        let mut sum: f64 = 0.0;
369
370        for &v in values {
371            if v < min {
372                min = v;
373            }
374            if v > max {
375                max = v;
376            }
377            sum += v;
378        }
379
380        Self {
381            codec,
382            count: values.len() as u64,
383            min: Some(min),
384            max: Some(max),
385            sum: Some(sum),
386            cardinality: None,
387            compressed_bytes,
388            uncompressed_bytes: (values.len() * 8) as u64,
389        }
390    }
391
392    /// Compute statistics for a symbol column.
393    pub fn from_symbols(
394        values: &[u32],
395        cardinality: u32,
396        codec: ResolvedColumnCodec,
397        compressed_bytes: u64,
398    ) -> Self {
399        Self {
400            codec,
401            count: values.len() as u64,
402            min: None,
403            max: None,
404            sum: None,
405            cardinality: Some(cardinality),
406            compressed_bytes,
407            uncompressed_bytes: (values.len() * 4) as u64,
408        }
409    }
410
411    /// Compression ratio (uncompressed / compressed). Returns 1.0 if no data.
412    pub fn compression_ratio(&self) -> f64 {
413        if self.compressed_bytes == 0 {
414            return 1.0;
415        }
416        self.uncompressed_bytes as f64 / self.compressed_bytes as f64
417    }
418}