Skip to main content

wacore_binary/
zlib_pool.rs

1use flate2::{Decompress, FlushDecompress, Status};
2use std::cell::RefCell;
3use std::io;
4
5thread_local! {
6    static DECOMPRESSOR: RefCell<(Decompress, Vec<u8>)> = RefCell::new((
7        Decompress::new(true),
8        Vec::with_capacity(4096),
9    ));
10}
11
12/// Decompress zlib data using a pooled decompressor.
13///
14/// Reuses the `flate2::Decompress` internal state (~48 KB) and the output
15/// buffer across calls on the same thread, avoiding repeated heap allocations.
16pub fn decompress_zlib_pooled(compressed: &[u8], max_size: u64) -> io::Result<Vec<u8>> {
17    DECOMPRESSOR.with(|cell| {
18        let (decompressor, scratch) = &mut *cell.borrow_mut();
19        decompressor.reset(true);
20        scratch.clear();
21
22        let estimated = (compressed.len() * 4).clamp(256, 64 * 1024);
23        if scratch.capacity() < estimated {
24            scratch.reserve(estimated - scratch.capacity());
25        }
26
27        // Cap output growth to max_size + 1 so we detect oversized payloads
28        // without allocating unbounded memory from a compressed bomb.
29        let cap = (max_size as usize).saturating_add(1);
30
31        let mut input_offset = 0;
32        loop {
33            // Enforce cap before decompress_vec can grow the buffer
34            if scratch.len() >= cap {
35                return Err(io::Error::new(
36                    io::ErrorKind::InvalidData,
37                    format!("decompressed payload exceeds {max_size} bytes"),
38                ));
39            }
40
41            let prev_in = decompressor.total_in();
42            let prev_out = decompressor.total_out();
43
44            let status = decompressor
45                .decompress_vec(
46                    &compressed[input_offset..],
47                    scratch,
48                    FlushDecompress::Finish,
49                )
50                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
51
52            input_offset = decompressor.total_in() as usize;
53
54            if scratch.len() as u64 > max_size {
55                return Err(io::Error::new(
56                    io::ErrorKind::InvalidData,
57                    format!("decompressed payload exceeds {max_size} bytes"),
58                ));
59            }
60
61            match status {
62                Status::StreamEnd => break,
63                Status::Ok => {
64                    // Grow but never past the cap
65                    let want = scratch.capacity().max(4096).min(cap - scratch.len());
66                    scratch.reserve(want);
67                }
68                Status::BufError => {
69                    if decompressor.total_in() == prev_in && decompressor.total_out() == prev_out {
70                        return Err(io::Error::new(
71                            io::ErrorKind::InvalidData,
72                            "zlib stream truncated (no progress)",
73                        ));
74                    }
75                    let want = scratch.capacity().max(4096).min(cap - scratch.len());
76                    scratch.reserve(want);
77                }
78            }
79        }
80
81        // Move the Vec out (zero-copy), then restore scratch with fresh capacity.
82        // Callers (unpack_bytes, history_sync) wrap in Bytes::from() which takes
83        // ownership of the Vec's allocation, so no extra copy occurs.
84        let result = std::mem::take(scratch);
85        // Pre-allocate for next call so the first decompress_vec doesn't start at 0
86        scratch.reserve(4096);
87        Ok(result)
88    })
89}