Skip to main content

hyperi_rustlib/tiered_sink/
codec.rs

1// Project:   hyperi-rustlib
2// File:      src/tiered_sink/codec.rs
3// Purpose:   Compression codec selection for spool storage
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Compression codec selection for spool storage.
10
11use serde::{Deserialize, Serialize};
12use std::io;
13
14/// Compression codec for spool storage.
15///
16/// Different codecs offer different CPU/compression tradeoffs:
17/// - `Zstd`: Best compression ratio, configurable CPU (default, level 1)
18/// - `Lz4`: Fast compression, low CPU
19/// - `Snappy`: Very fast, Kafka-native - avoids transcode if sink uses Snappy
20/// - `None`: No compression - maximum speed when CPU is bottleneck
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum CompressionCodec {
24    /// No compression - fastest, no CPU overhead
25    None,
26    /// LZ4 - fast compression, low CPU
27    ///
28    /// LZ4 has very low CPU overhead with meaningful compression.
29    /// Pure Rust implementation (lz4_flex).
30    Lz4,
31    /// Snappy - very fast, Kafka-native format
32    Snappy,
33    /// Zstd with configurable level (1-22, default 1)
34    ///
35    /// Zstd at level 1 offers excellent compression with speed
36    /// comparable to LZ4, making it the best default.
37    Zstd { level: i32 },
38}
39
40impl Default for CompressionCodec {
41    fn default() -> Self {
42        Self::Zstd { level: 1 }
43    }
44}
45
46impl CompressionCodec {
47    /// Create Zstd codec with default level (3).
48    #[must_use]
49    pub fn zstd() -> Self {
50        Self::Zstd { level: 3 }
51    }
52
53    /// Create Zstd codec with specified level.
54    #[must_use]
55    pub fn zstd_level(level: i32) -> Self {
56        Self::Zstd {
57            level: level.clamp(1, 22),
58        }
59    }
60
61    /// Compress data using this codec.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if compression fails.
66    pub fn compress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
67        match self {
68            Self::None => Ok(data.to_vec()),
69            Self::Lz4 => Ok(lz4_flex::compress_prepend_size(data)),
70            Self::Snappy => {
71                let mut encoder = snap::raw::Encoder::new();
72                encoder.compress_vec(data).map_err(io::Error::other)
73            }
74            Self::Zstd { level } => zstd::encode_all(data, *level).map_err(io::Error::other),
75        }
76    }
77
78    /// Decompress data using this codec.
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if decompression fails.
83    pub fn decompress(&self, data: &[u8]) -> io::Result<Vec<u8>> {
84        match self {
85            Self::None => Ok(data.to_vec()),
86            Self::Lz4 => lz4_flex::decompress_size_prepended(data)
87                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
88            Self::Snappy => {
89                let mut decoder = snap::raw::Decoder::new();
90                decoder
91                    .decompress_vec(data)
92                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
93            }
94            Self::Zstd { .. } => {
95                zstd::decode_all(data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
96            }
97        }
98    }
99
100    /// Returns true if this codec applies compression.
101    #[must_use]
102    pub fn is_compressed(&self) -> bool {
103        !matches!(self, Self::None)
104    }
105}
106
107impl std::fmt::Display for CompressionCodec {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        match self {
110            Self::None => write!(f, "none"),
111            Self::Lz4 => write!(f, "lz4"),
112            Self::Snappy => write!(f, "snappy"),
113            Self::Zstd { level } => write!(f, "zstd(level={level})"),
114        }
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn test_default_is_zstd_level_1() {
124        assert_eq!(
125            CompressionCodec::default(),
126            CompressionCodec::Zstd { level: 1 }
127        );
128    }
129
130    #[test]
131    fn test_none_roundtrip() {
132        let codec = CompressionCodec::None;
133        let data = b"hello world";
134        let compressed = codec.compress(data).unwrap();
135        let decompressed = codec.decompress(&compressed).unwrap();
136        assert_eq!(data.as_slice(), decompressed.as_slice());
137        assert_eq!(compressed, data); // No change
138    }
139
140    #[test]
141    fn test_lz4_roundtrip() {
142        let codec = CompressionCodec::Lz4;
143        let data = b"hello world hello world hello world";
144        let compressed = codec.compress(data).unwrap();
145        let decompressed = codec.decompress(&compressed).unwrap();
146        assert_eq!(data.as_slice(), decompressed.as_slice());
147        assert!(compressed.len() < data.len()); // Actually compressed
148    }
149
150    #[test]
151    fn test_snappy_roundtrip() {
152        let codec = CompressionCodec::Snappy;
153        let data = b"hello world hello world hello world";
154        let compressed = codec.compress(data).unwrap();
155        let decompressed = codec.decompress(&compressed).unwrap();
156        assert_eq!(data.as_slice(), decompressed.as_slice());
157    }
158
159    #[test]
160    fn test_zstd_roundtrip() {
161        let codec = CompressionCodec::zstd();
162        let data = b"hello world hello world hello world";
163        let compressed = codec.compress(data).unwrap();
164        let decompressed = codec.decompress(&compressed).unwrap();
165        assert_eq!(data.as_slice(), decompressed.as_slice());
166        assert!(compressed.len() < data.len());
167    }
168
169    #[test]
170    fn test_zstd_level_clamped() {
171        let codec = CompressionCodec::zstd_level(100);
172        assert!(matches!(codec, CompressionCodec::Zstd { level: 22 }));
173
174        let codec = CompressionCodec::zstd_level(-5);
175        assert!(matches!(codec, CompressionCodec::Zstd { level: 1 }));
176    }
177
178    #[test]
179    fn test_is_compressed() {
180        assert!(!CompressionCodec::None.is_compressed());
181        assert!(CompressionCodec::Lz4.is_compressed());
182        assert!(CompressionCodec::Snappy.is_compressed());
183        assert!(CompressionCodec::zstd().is_compressed());
184    }
185
186    #[test]
187    fn test_display() {
188        assert_eq!(format!("{}", CompressionCodec::None), "none");
189        assert_eq!(format!("{}", CompressionCodec::Lz4), "lz4");
190        assert_eq!(format!("{}", CompressionCodec::Snappy), "snappy");
191        assert_eq!(format!("{}", CompressionCodec::zstd()), "zstd(level=3)");
192    }
193}