kafka_protocol/compression/
snappy.rs1use anyhow::{Context, Result};
2use bytes::{Bytes, BytesMut};
3use snap::raw::*;
4
5use crate::protocol::buf::{ByteBuf, ByteBufMut};
6
7use super::{Compressor, Decompressor};
8
9pub struct Snappy;
12
13impl<B: ByteBufMut> Compressor<B> for Snappy {
14 type BufMut = BytesMut;
15 fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
16 where
17 F: FnOnce(&mut Self::BufMut) -> Result<R>,
18 {
19 let mut tmp = BytesMut::new();
21 let res = f(&mut tmp)?;
22
23 let start_pos = buf.offset();
25 let compress_gap = buf.put_gap(max_compress_len(tmp.len()));
26 let actual_len = Encoder::new()
27 .compress(&tmp, buf.gap_buf(compress_gap))
28 .context("Failed to compress snappy")?;
29 buf.seek(start_pos + actual_len);
30
31 Ok(res)
32 }
33}
34
35impl<B: ByteBuf> Decompressor<B> for Snappy {
36 type Buf = Bytes;
37 fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
38 where
39 F: FnOnce(&mut Self::Buf) -> Result<R>,
40 {
41 let buf = buf.copy_to_bytes(buf.remaining());
43 let actual_len = decompress_len(&buf).context("Failed to decompress snappy")?;
44 let mut tmp = BytesMut::new();
45 tmp.resize(actual_len, 0);
46
47 Decoder::new()
49 .decompress(&buf, &mut tmp)
50 .context("Failed to decompress snappy")?;
51
52 f(&mut tmp.into())
53 }
54}