fluvio_compression/
lib.rs

1use std::str::FromStr;
2
3mod error;
4
5use bytes::Bytes;
6
7#[cfg(feature = "gzip")]
8mod gzip;
9
10#[cfg(feature = "snap")]
11mod snappy;
12
13#[cfg(feature = "lz4")]
14mod lz4;
15
16#[cfg(feature = "zstd")]
17mod zstd;
18
19pub use error::CompressionError;
20use serde::{Serialize, Deserialize};
21
22/// The compression algorithm used to compress and decompress records in fluvio batches
23#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
24#[serde(rename_all = "lowercase")]
25#[repr(i8)]
26#[derive(Default)]
27pub enum Compression {
28    #[default]
29    None = 0,
30    #[cfg(feature = "gzip")]
31    Gzip = 1,
32    #[cfg(feature = "snap")]
33    Snappy = 2,
34    #[cfg(feature = "lz4")]
35    Lz4 = 3,
36    #[cfg(feature = "zstd")]
37    Zstd = 4,
38}
39
40impl TryFrom<i8> for Compression {
41    type Error = CompressionError;
42    fn try_from(v: i8) -> Result<Self, CompressionError> {
43        match v {
44            0 => Ok(Compression::None),
45            #[cfg(feature = "gzip")]
46            1 => Ok(Compression::Gzip),
47            #[cfg(feature = "snap")]
48            2 => Ok(Compression::Snappy),
49            #[cfg(feature = "lz4")]
50            3 => Ok(Compression::Lz4),
51            #[cfg(feature = "zstd")]
52            4 => Ok(Compression::Zstd),
53            _ => Err(CompressionError::UnknownCompressionFormat(format!(
54                "i8 representation: {v}"
55            ))),
56        }
57    }
58}
59
60impl FromStr for Compression {
61    type Err = CompressionError;
62
63    fn from_str(s: &str) -> Result<Self, Self::Err> {
64        match s.to_lowercase().as_str() {
65            "none" => Ok(Compression::None),
66            #[cfg(feature = "gzip")]
67            "gzip" => Ok(Compression::Gzip),
68            #[cfg(feature = "snap")]
69            "snappy" => Ok(Compression::Snappy),
70            #[cfg(feature = "lz4")]
71            "lz4" => Ok(Compression::Lz4),
72            #[cfg(feature = "zstd")]
73            "zstd" => Ok(Compression::Zstd),
74            _ => Err(CompressionError::UnknownCompressionFormat(s.into())),
75        }
76    }
77}
78
79impl std::fmt::Display for Compression {
80    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81        match *self {
82            Compression::None => write!(f, "none"),
83            #[cfg(feature = "gzip")]
84            Compression::Gzip => write!(f, "gzip"),
85            #[cfg(feature = "snap")]
86            Compression::Snappy => write!(f, "snappy"),
87            #[cfg(feature = "lz4")]
88            Compression::Lz4 => write!(f, "lz4"),
89            #[cfg(feature = "zstd")]
90            Compression::Zstd => write!(f, "zstd"),
91        }
92    }
93}
94
95impl Compression {
96    /// Compress the given data, returning the compressed data
97    pub fn compress(&self, src: &[u8]) -> Result<Bytes, CompressionError> {
98        match *self {
99            Compression::None => Ok(Bytes::copy_from_slice(src)),
100            #[cfg(feature = "gzip")]
101            Compression::Gzip => gzip::compress(src),
102            #[cfg(feature = "snap")]
103            Compression::Snappy => snappy::compress(src),
104            #[cfg(feature = "lz4")]
105            Compression::Lz4 => lz4::compress(src),
106            #[cfg(feature = "zstd")]
107            Compression::Zstd => zstd::compress(src),
108        }
109    }
110
111    /// Uncompresss the given data, returning the uncompressed data if any compression was applied, otherwise returns None
112    #[allow(unused_variables)]
113    pub fn uncompress(&self, src: &[u8]) -> Result<Option<Vec<u8>>, CompressionError> {
114        match *self {
115            Compression::None => Ok(None),
116            #[cfg(feature = "gzip")]
117            Compression::Gzip => {
118                let output = gzip::uncompress(src)?;
119                Ok(Some(output))
120            }
121            #[cfg(feature = "snap")]
122            Compression::Snappy => {
123                let output = snappy::uncompress(src)?;
124                Ok(Some(output))
125            }
126            #[cfg(feature = "lz4")]
127            Compression::Lz4 => {
128                let output = lz4::uncompress(src)?;
129                Ok(Some(output))
130            }
131            #[cfg(feature = "zstd")]
132            Compression::Zstd => {
133                let output = zstd::uncompress(src)?;
134                Ok(Some(output))
135            }
136        }
137    }
138}
139
140#[cfg(any(feature = "gzip", feature = "snap", feature = "lz4", feature = "zstd"))]
141impl From<fluvio_types::compression::Compression> for Compression {
142    fn from(fcc: fluvio_types::compression::Compression) -> Self {
143        use fluvio_types::compression::Compression as CompressionType;
144
145        match fcc {
146            CompressionType::None => Compression::None,
147            #[cfg(feature = "gzip")]
148            CompressionType::Gzip => Compression::Gzip,
149            #[cfg(feature = "snap")]
150            CompressionType::Snappy => Compression::Snappy,
151            #[cfg(feature = "lz4")]
152            CompressionType::Lz4 => Compression::Lz4,
153            #[cfg(feature = "zstd")]
154            CompressionType::Zstd => Compression::Zstd,
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::Compression;
162
163    #[test]
164    fn converts_from_fluvio_compression() {
165        use fluvio_types::compression::Compression as CompressionType;
166
167        assert_eq!(Compression::from(CompressionType::None), Compression::None);
168
169        #[cfg(feature = "gzip")]
170        assert_eq!(Compression::from(CompressionType::Gzip), Compression::Gzip);
171        #[cfg(feature = "snap")]
172        assert_eq!(
173            Compression::from(CompressionType::Snappy),
174            Compression::Snappy
175        );
176
177        #[cfg(feature = "lz4")]
178        assert_eq!(Compression::from(CompressionType::Lz4), Compression::Lz4);
179
180        #[cfg(feature = "zstd")]
181        assert_eq!(Compression::from(CompressionType::Zstd), Compression::Zstd);
182    }
183}