kafka_protocol/compression/
snappy.rs

1use anyhow::{Context, Result};
2use bytes::{Bytes, BytesMut};
3use snap::raw::*;
4
5use crate::protocol::buf::{ByteBuf, ByteBufMut};
6
7use super::{Compressor, Decompressor};
8
9/// Snappy compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
10/// for more information.
11pub struct Snappy;
12
13impl<B: ByteBufMut> Compressor<B> for Snappy {
14    type BufMut = BytesMut;
15    fn compress<R, F>(buf: &mut B, f: F) -> Result<R>
16    where
17        F: FnOnce(&mut Self::BufMut) -> Result<R>,
18    {
19        // Write uncompressed bytes into a temporary buffer
20        let mut tmp = BytesMut::new();
21        let res = f(&mut tmp)?;
22
23        // Compress directly into the target buffer
24        let start_pos = buf.offset();
25        let compress_gap = buf.put_gap(max_compress_len(tmp.len()));
26        let actual_len = Encoder::new()
27            .compress(&tmp, buf.gap_buf(compress_gap))
28            .context("Failed to compress snappy")?;
29        buf.seek(start_pos + actual_len);
30
31        Ok(res)
32    }
33}
34
35impl<B: ByteBuf> Decompressor<B> for Snappy {
36    type Buf = Bytes;
37    fn decompress<R, F>(buf: &mut B, f: F) -> Result<R>
38    where
39        F: FnOnce(&mut Self::Buf) -> Result<R>,
40    {
41        // Allocate a temporary buffer to hold the uncompressed bytes
42        let buf = buf.copy_to_bytes(buf.remaining());
43        let actual_len = decompress_len(&buf).context("Failed to decompress snappy")?;
44        let mut tmp = BytesMut::new();
45        tmp.resize(actual_len, 0);
46
47        // Decompress directly from the input buffer
48        Decoder::new()
49            .decompress(&buf, &mut tmp)
50            .context("Failed to decompress snappy")?;
51
52        f(&mut tmp.into())
53    }
54}