kafka_protocol/compression/
gzip.rs

1use std::io::Write;
2
3use anyhow::{Context, Result};
4use bytes::buf::BufMut;
5use bytes::{Bytes, BytesMut};
6use flate2::write::{GzDecoder, GzEncoder};
7use flate2::Compression;
8
9use crate::protocol::buf::{ByteBuf, ByteBufMut};
10
11use super::{Compressor, Decompressor};
12
13/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
14/// for more information.
15pub struct Gzip;
16
17impl<B: ByteBufMut> Compressor<B> for Gzip {
18    type BufMut = BytesMut;
19    fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
20    where
21        F: FnOnce(&mut Self::BufMut) -> Result<R>,
22    {
23        // Write uncompressed bytes into a temporary buffer
24        let mut tmp = BytesMut::new();
25        let res = f(&mut tmp)?;
26
27        // Compress directly into the target buffer
28        let mut e = GzEncoder::new(buf.writer(), Compression::default());
29        e.write_all(&tmp).context("Failed to compress gzip")?;
30        e.finish().context("Failed to compress gzip")?;
31
32        Ok(res)
33    }
34}
35
36impl<B: ByteBuf> Decompressor<B> for Gzip {
37    type Buf = Bytes;
38    fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
39    where
40        F: FnOnce(&mut Self::Buf) -> Result<R>,
41    {
42        let mut tmp = BytesMut::new();
43
44        // Decompress directly from the input buffer
45        let mut d = GzDecoder::new((&mut tmp).writer());
46        d.write_all(&buf.copy_to_bytes(buf.remaining()))
47            .context("Failed to decompress gzip")?;
48        d.finish().context("Failed to decompress gzip")?;
49
50        f(&mut tmp.into())
51    }
52}