Skip to main content

msg_wire/compression/
mod.rs

1use bytes::Bytes;
2use std::io;
3
4mod gzip;
5pub use gzip::*;
6
7mod lz4;
8pub use lz4::*;
9
10mod snappy;
11pub use snappy::*;
12
13mod zstd;
14pub use zstd::*;
15
16/// The possible compression type used for a message.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18#[repr(u8)]
19pub enum CompressionType {
20    None = 0,
21    Gzip = 1,
22    Zstd = 2,
23    Snappy = 3,
24    Lz4 = 4,
25}
26
27impl TryFrom<u8> for CompressionType {
28    type Error = u8;
29
30    fn try_from(value: u8) -> Result<Self, Self::Error> {
31        match value {
32            0 => Ok(CompressionType::None),
33            1 => Ok(CompressionType::Gzip),
34            2 => Ok(CompressionType::Zstd),
35            3 => Ok(CompressionType::Snappy),
36            4 => Ok(CompressionType::Lz4),
37            _ => Err(value),
38        }
39    }
40}
41
42/// This trait is used to implement message-level compression algorithms for payloads.
43/// On outgoing messages, the payload is compressed before being sent using the `compress` method.
44pub trait Compressor: Send + Sync + Unpin + 'static {
45    /// Returns the compression type assigned to this compressor.
46    fn compression_type(&self) -> CompressionType;
47
48    /// Compresses a byte slice payload into a `Bytes` object.
49    fn compress(&self, data: &[u8]) -> Result<Bytes, io::Error>;
50}
51
52/// This trait is used to implement message-level decompression algorithms for payloads.
53/// On incoming messages, the payload is decompressed using the `decompress` method.
54pub trait Decompressor: Send + Sync + Unpin + 'static {
55    /// Decompresses a compressed byte slice into a `Bytes` object.
56    fn decompress(&self, data: &[u8]) -> Result<Bytes, io::Error>;
57}
58
59/// Tries to decompress a payload using the given compression type.
60/// If the compression type is `None`, the payload is returned as-is.
61///
62/// ## Errors
63/// - If the compression type is not supported
64/// - If the payload is invalid
65/// - If the decompression fails
66pub fn try_decompress_payload(compression_type: u8, data: Bytes) -> Result<Bytes, io::Error> {
67    match CompressionType::try_from(compression_type) {
68        Ok(supported_compression_type) => match supported_compression_type {
69            CompressionType::None => Ok(data),
70            CompressionType::Gzip => GzipDecompressor.decompress(data.as_ref()),
71            CompressionType::Zstd => ZstdDecompressor.decompress(data.as_ref()),
72            CompressionType::Snappy => SnappyDecompressor.decompress(data.as_ref()),
73            CompressionType::Lz4 => Lz4Decompressor.decompress(data.as_ref()),
74        },
75        Err(unsupported_compression_type) => Err(io::Error::new(
76            io::ErrorKind::InvalidData,
77            format!("unsupported compression type: {unsupported_compression_type}"),
78        )),
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85
86    #[test]
87    fn test_gzip_compression() {
88        let compressor = GzipCompressor::new(6);
89        let decompressor = GzipDecompressor;
90
91        let data =
92            Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
93        println!("Before: {:?}", data.len());
94        let compressed = compressor.compress(&data).unwrap();
95        println!("After: {:?}", compressed.len());
96        let decompressed = decompressor.decompress(&compressed).unwrap();
97
98        assert_eq!(data, decompressed);
99    }
100
101    #[test]
102    fn test_zstd_compression() {
103        let compressor = ZstdCompressor::new(6);
104        let decompressor = ZstdDecompressor;
105
106        let data =
107            Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
108        println!("Before: {:?}", data.len());
109        let compressed = compressor.compress(&data).unwrap();
110        println!("After: {:?}", compressed.len());
111        let decompressed = decompressor.decompress(&compressed).unwrap();
112
113        assert_eq!(data, decompressed);
114    }
115
116    #[test]
117    fn test_snappy_compression() {
118        let compressor = SnappyCompressor;
119        let decompressor = SnappyDecompressor;
120
121        let data =
122            Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
123        println!("Before: {:?}", data.len());
124        let compressed = compressor.compress(&data).unwrap();
125        println!("After: {:?}", compressed.len());
126        let decompressed = decompressor.decompress(&compressed).unwrap();
127
128        assert_eq!(data, decompressed);
129    }
130
131    #[test]
132    fn test_lz4_compression() {
133        let compressor = Lz4Compressor;
134        let decompressor = Lz4Decompressor;
135
136        let data =
137            Bytes::from("hellooooooooooooooooo wwwwwoooooooooooooooooooooooooooooooooooooorld");
138        println!("Before: {:?}", data.len());
139        let compressed = compressor.compress(&data).unwrap();
140        println!("After: {:?}", compressed.len());
141        let decompressed = decompressor.decompress(&compressed).unwrap();
142
143        assert_eq!(data, decompressed);
144    }
145
146    fn compression_test<C: Compressor>(data: &Bytes, comp: C) -> (std::time::Duration, f64, Bytes) {
147        let uncompressed_size = data.len() as f64;
148        let start = std::time::Instant::now();
149
150        let compressed = comp.compress(data).unwrap();
151
152        let time = std::time::Instant::now() - start;
153        let compressed_size = compressed.len() as f64;
154        let shrinkage = uncompressed_size / compressed_size * 100.0;
155
156        (time, shrinkage, compressed)
157    }
158
159    fn decompression_test<D: Decompressor>(data: &Bytes, decomp: D) -> std::time::Duration {
160        let start = std::time::Instant::now();
161        decomp.decompress(data).unwrap();
162        std::time::Instant::now() - start
163    }
164
165    #[test]
166    fn test_compare_compression_algorithms_ssz_block() {
167        let data = Bytes::from(
168            std::fs::read("../testdata/mainnetCapellaBlock7928030.ssz")
169                .expect("failed to read test file"),
170        );
171
172        println!("uncompressed data size: {} bytes", data.len());
173
174        let gzip = GzipCompressor::new(6);
175        let (gzip_time, gzip_perf, gzip_comp) = compression_test(&data, gzip);
176        println!("gzip compression shrank the data by {gzip_perf:.2}% in {gzip_time:?}");
177
178        let zstd = ZstdCompressor::new(6);
179        let (zstd_time, zstd_perf, zstd_comp) = compression_test(&data, zstd);
180        println!("zstd compression shrank the data by {zstd_perf:.2}% in {zstd_time:?}");
181
182        let snappy = SnappyCompressor;
183        let (snappy_time, snappy_perf, snappy_comp) = compression_test(&data, snappy);
184        println!("snappy compression shrank the data by {snappy_perf:.2}% in {snappy_time:?}");
185
186        let lz4 = Lz4Compressor;
187        let (lz4_time, lz4_perf, lz4_comp) = compression_test(&data, lz4);
188        println!("lz4 compression shrank the data by {lz4_perf:.2}% in {lz4_time:?}");
189
190        println!("------ SSZ BLOCK -------");
191
192        let gzip = GzipDecompressor;
193        let gzip_time = decompression_test(&gzip_comp, gzip);
194        println!("gzip decompression took {gzip_time:?}");
195
196        let zstd = ZstdDecompressor;
197        let zstd_time = decompression_test(&zstd_comp, zstd);
198        println!("zstd decompression took {zstd_time:?}");
199
200        let snappy = SnappyDecompressor;
201        let snappy_time = decompression_test(&snappy_comp, snappy);
202        println!("snappy decompression took {snappy_time:?}");
203
204        let lz4 = Lz4Decompressor;
205        let lz4_time = decompression_test(&lz4_comp, lz4);
206        println!("lz4 decompression took {lz4_time:?}");
207    }
208
209    #[test]
210    fn test_compare_compression_algorithms_blob_tx() {
211        let data = Bytes::from(
212            std::fs::read("../testdata/blobTransactionRaw").expect("failed to read test file"),
213        );
214
215        println!("uncompressed data size: {} bytes", data.len());
216
217        let gzip = GzipCompressor::new(6);
218        let (gzip_time, gzip_perf, gzip_comp) = compression_test(&data, gzip);
219        println!("gzip compression shrank the data by {gzip_perf:.2}% in {gzip_time:?}");
220
221        let zstd = ZstdCompressor::new(6);
222        let (zstd_time, zstd_perf, zstd_comp) = compression_test(&data, zstd);
223        println!("zstd compression shrank the data by {zstd_perf:.2}% in {zstd_time:?}");
224
225        let snappy = SnappyCompressor;
226        let (snappy_time, snappy_perf, snappy_comp) = compression_test(&data, snappy);
227        println!("snappy compression shrank the data by {snappy_perf:.2}% in {snappy_time:?}");
228
229        let lz4 = Lz4Compressor;
230        let (lz4_time, lz4_perf, lz4_comp) = compression_test(&data, lz4);
231        println!("lz4 compression shrank the data by {lz4_perf:.2}% in {lz4_time:?}");
232
233        println!("------ BLOB TX ------");
234
235        let gzip = GzipDecompressor;
236        let gzip_time = decompression_test(&gzip_comp, gzip);
237        println!("gzip decompression took {gzip_time:?}");
238
239        let zstd = ZstdDecompressor;
240        let zstd_time = decompression_test(&zstd_comp, zstd);
241        println!("zstd decompression took {zstd_time:?}");
242
243        let snappy = SnappyDecompressor;
244        let snappy_time = decompression_test(&snappy_comp, snappy);
245        println!("snappy decompression took {snappy_time:?}");
246
247        let lz4 = Lz4Decompressor;
248        let lz4_time = decompression_test(&lz4_comp, lz4);
249        println!("lz4 decompression took {lz4_time:?}");
250    }
251}