kafka_protocol/compression/
gzip.rs1use std::io::Write;
2
3use anyhow::{Context, Result};
4use bytes::buf::BufMut;
5use bytes::{Bytes, BytesMut};
6use flate2::write::{GzDecoder, GzEncoder};
7use flate2::Compression;
8
9use crate::protocol::buf::{ByteBuf, ByteBufMut};
10
11use super::{Compressor, Decompressor};
12
13pub struct Gzip;
16
17impl<B: ByteBufMut> Compressor<B> for Gzip {
18 type BufMut = BytesMut;
19 fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
20 where
21 F: FnOnce(&mut Self::BufMut) -> Result<R>,
22 {
23 let mut tmp = BytesMut::new();
25 let res = f(&mut tmp)?;
26
27 let mut e = GzEncoder::new(buf.writer(), Compression::default());
29 e.write_all(&tmp).context("Failed to compress gzip")?;
30 e.finish().context("Failed to compress gzip")?;
31
32 Ok(res)
33 }
34}
35
36impl<B: ByteBuf> Decompressor<B> for Gzip {
37 type Buf = Bytes;
38 fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
39 where
40 F: FnOnce(&mut Self::Buf) -> Result<R>,
41 {
42 let mut tmp = BytesMut::new();
43
44 let mut d = GzDecoder::new((&mut tmp).writer());
46 d.write_all(&buf.copy_to_bytes(buf.remaining()))
47 .context("Failed to decompress gzip")?;
48 d.finish().context("Failed to decompress gzip")?;
49
50 f(&mut tmp.into())
51 }
52}