Skip to main content

rustfs_kafka/compression/
mod.rs

1//! Compression types for Kafka messages.
2//!
3//! Kafka supports pluggable compression at the message-set level. The
4//! compression type is encoded in the lower 3 bits of the `attributes`
5//! field in each message.
6//!
7//! Actual compression and decompression is handled by the `kafka-protocol` crate.
8
9/// Compression types supported by Kafka.
10///
11/// The discriminant values correspond to the compression encoding in the
12/// `attributes` field of a Kafka message
13/// in the Kafka wire protocol (lower 3 bits).
14///
15/// | Variant | Value | Kafka constant |
16/// |---------|-------|---------------|
17/// | `NONE`  | 0     | `none`        |
18/// | `GZIP`  | 1     | `gzip`        |
19/// | `SNAPPY`| 2     | `snappy`      |
20/// | `LZ4`   | 3     | `lz4`         |
21/// | `ZSTD`  | 4     | `zstd`        |
22#[derive(Debug, Copy, Clone, Default, PartialEq, Eq)]
23pub enum Compression {
24    /// No compression.
25    #[default]
26    NONE = 0,
27    /// GZIP compression (RFC 1952). Higher compression ratio but slower.
28    GZIP = 1,
29    /// Snappy compression. Good balance of speed and ratio.
30    SNAPPY = 2,
31    /// LZ4 compression. Fastest compression and decompression speed.
32    LZ4 = 3,
33    /// ZSTD compression. Excellent balance of ratio and speed.
34    ZSTD = 4,
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40
41    #[test]
42    fn test_compression_discriminant_values() {
43        assert_eq!(Compression::NONE as i32, 0);
44        assert_eq!(Compression::GZIP as i32, 1);
45        assert_eq!(Compression::SNAPPY as i32, 2);
46        assert_eq!(Compression::LZ4 as i32, 3);
47        assert_eq!(Compression::ZSTD as i32, 4);
48    }
49
50    #[test]
51    fn test_compression_default() {
52        assert_eq!(Compression::default(), Compression::NONE);
53    }
54
55    #[test]
56    fn test_compression_debug() {
57        assert_eq!(format!("{:?}", Compression::NONE), "NONE");
58        assert_eq!(format!("{:?}", Compression::GZIP), "GZIP");
59        assert_eq!(format!("{:?}", Compression::SNAPPY), "SNAPPY");
60        assert_eq!(format!("{:?}", Compression::LZ4), "LZ4");
61        assert_eq!(format!("{:?}", Compression::ZSTD), "ZSTD");
62    }
63}
64
65#[cfg(test)]
66mod proptests {
67    use super::*;
68    use proptest::prelude::*;
69
70    proptest! {
71        #[test]
72        fn compression_discriminant_in_range(disc in 0i8..=4i8) {
73            assert!(matches!(disc, 0..=4));
74        }
75
76        #[test]
77        fn compression_debug_roundtrip(disc in 0u8..=4u8) {
78            let compression = match disc {
79                0 => Compression::NONE,
80                1 => Compression::GZIP,
81                2 => Compression::SNAPPY,
82                3 => Compression::LZ4,
83                4 => Compression::ZSTD,
84                _ => return Ok(()),
85            };
86            let debug_str = format!("{compression:?}");
87            assert!(!debug_str.is_empty());
88            let as_i32 = compression as i32;
89            assert!((0..=4).contains(&as_i32));
90        }
91    }
92}