socket_flow/
decoder.rs

1use bytes::BytesMut;
2use flate2::{Decompress, FlushDecompress, Status};
3fn calculate_buffer_size(payload_size: usize) -> usize {
4    if payload_size <= 4096 {
5        4096 // 4 KB for small payloads
6    } else if payload_size <= 65536 {
7        16384 // 16 KB for medium payloads
8    } else {
9        65536 // 64 KB for large payloads
10    }
11}
12
13pub(crate) struct Decoder {
14    pub decompressor: Decompress,
15    pub reset_context: bool,
16}
17
18const DEFLATE_TRAILER: [u8; 4] = [0, 0, 255, 255];
19
20impl Decoder {
21    pub fn new(reset_context: bool, window_bits: Option<u8>) -> Self {
22        let decompressor = if let Some(window_bits) = window_bits {
23            Decompress::new_with_window_bits(false, window_bits)
24        } else {
25            Decompress::new(false)
26        };
27        Self {
28            decompressor,
29            reset_context,
30        }
31    }
32
33    pub fn decompress(&mut self, payload: &mut BytesMut) -> Result<Vec<u8>, std::io::Error> {
34        payload.extend_from_slice(&DEFLATE_TRAILER);
35        // adjust the buffer size, depending on the payload,
36        // for balancing between CPU vs. Memory usage
37        let buffer_size = calculate_buffer_size(payload.len());
38        // Create an output buffer with a reasonable initial capacity
39        let mut decompressed_data = BytesMut::with_capacity(buffer_size);
40
41        // Create a reusable buffer for intermediate decompression chunks
42        let mut buffer = Vec::with_capacity(buffer_size);
43
44        // Reset the decompressor before starting to ensure no leftover state
45        if self.reset_context {
46            self.decompressor.reset(false);
47        }
48
49        let before_in = self.decompressor.total_in();
50
51        // Here on the while loop, we need to use decompressor.total_in() method, because
52        // when we don't need to reset the context between decompression processes,
53        // the decompressor will keep the number of bytes decompressed, also the client
54        // responsible for compressing the payload, which is also keeping the context, will send
55        // smaller payloads, hopping that the receiver also is keeping the context
56        // That is why the handshake part is really important, to ensure we don't have a
57        // misalignment.
58        while self.decompressor.total_in() - before_in < payload.as_ref().len() as u64 {
59            let i = (self.decompressor.total_in() - before_in) as usize;
60            let input = &payload[i..];
61
62            // TODO - We are using decompress_vec, perhaps only decompress method should be
63            // more performant, the only issue with that,
64            // is that you need to manage the buffer manually
65            match self
66                .decompressor
67                .decompress_vec(input, &mut buffer, FlushDecompress::Sync)?
68            {
69                Status::Ok => {
70                    decompressed_data.extend_from_slice(buffer.as_ref());
71                    buffer.clear();
72                }
73                Status::StreamEnd => break,
74                _ => {}
75            }
76        }
77        decompressed_data.truncate(decompressed_data.len());
78
79        Ok(decompressed_data.to_vec())
80    }
81}