bitfold_core/
packet_pool.rs

1use std::sync::Arc;
2
3/// A pooled packet buffer that can be reused to reduce allocations.
4#[derive(Clone)]
5pub struct PooledPacket {
6    data: Arc<Vec<u8>>,
7    start: usize,
8    len: usize,
9}
10
11impl PooledPacket {
12    /// Creates a new pooled packet from existing data.
13    pub fn new(data: Vec<u8>) -> Self {
14        let len = data.len();
15        Self { data: Arc::new(data), start: 0, len }
16    }
17
18    /// Creates a pooled packet as a slice of shared data.
19    pub fn from_arc(data: Arc<Vec<u8>>, start: usize, len: usize) -> Self {
20        Self { data, start, len }
21    }
22
23    /// Returns the packet data as a slice.
24    pub fn as_slice(&self) -> &[u8] {
25        &self.data[self.start..self.start + self.len]
26    }
27
28    /// Returns the length of the packet.
29    pub fn len(&self) -> usize {
30        self.len
31    }
32
33    /// Returns true if the packet is empty.
34    pub fn is_empty(&self) -> bool {
35        self.len == 0
36    }
37
38    /// Returns the reference count of the underlying buffer.
39    pub fn ref_count(&self) -> usize {
40        Arc::strong_count(&self.data)
41    }
42
43    /// Converts the pooled packet into owned bytes if this is the only reference.
44    /// Otherwise, clones the data.
45    pub fn into_owned(self) -> Vec<u8> {
46        match Arc::try_unwrap(self.data) {
47            Ok(mut vec) => {
48                if self.start == 0 && self.len == vec.len() {
49                    vec
50                } else {
51                    vec.drain(self.start..self.start + self.len).collect()
52                }
53            }
54            Err(arc) => arc[self.start..self.start + self.len].to_vec(),
55        }
56    }
57}
58
59impl AsRef<[u8]> for PooledPacket {
60    fn as_ref(&self) -> &[u8] {
61        self.as_slice()
62    }
63}
64
65impl std::fmt::Debug for PooledPacket {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("PooledPacket")
68            .field("len", &self.len)
69            .field("ref_count", &self.ref_count())
70            .finish()
71    }
72}
73
74/// A simple packet allocator that reuses buffers.
75pub struct PacketAllocator {
76    /// Pool of reusable buffers
77    pool: Vec<Vec<u8>>,
78    /// Size of buffers to allocate
79    buffer_size: usize,
80    /// Maximum pool size
81    max_pool_size: usize,
82}
83
84impl PacketAllocator {
85    /// Creates a new packet allocator.
86    pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
87        Self { pool: Vec::with_capacity(max_pool_size), buffer_size, max_pool_size }
88    }
89
90    /// Allocates a buffer from the pool or creates a new one.
91    pub fn allocate(&mut self) -> Vec<u8> {
92        self.pool.pop().unwrap_or_else(|| Vec::with_capacity(self.buffer_size))
93    }
94
95    /// Returns a buffer to the pool for reuse.
96    pub fn deallocate(&mut self, mut buffer: Vec<u8>) {
97        if self.pool.len() < self.max_pool_size {
98            buffer.clear();
99            self.pool.push(buffer);
100        }
101    }
102
103    /// Returns the number of buffers currently in the pool.
104    pub fn available(&self) -> usize {
105        self.pool.len()
106    }
107
108    /// Clears all pooled buffers.
109    pub fn clear(&mut self) {
110        self.pool.clear();
111    }
112}
113
114impl Default for PacketAllocator {
115    fn default() -> Self {
116        Self::new(1500, 256)
117    }
118}
119
120/// A pool for compression output buffers to reduce allocations in hot paths.
121/// Compression can be expensive, so reusing buffers improves performance.
122pub struct CompressionBufferPool {
123    /// Pool of reusable buffers for compression output
124    pool: Vec<Vec<u8>>,
125    /// Maximum buffer size to pool (larger buffers are not pooled)
126    max_buffer_size: usize,
127    /// Maximum number of buffers to keep in pool
128    max_pool_size: usize,
129}
130
131impl CompressionBufferPool {
132    /// Creates a new compression buffer pool.
133    pub fn new(max_buffer_size: usize, max_pool_size: usize) -> Self {
134        Self { pool: Vec::with_capacity(max_pool_size), max_buffer_size, max_pool_size }
135    }
136
137    /// Acquires a buffer from the pool or creates a new one.
138    /// The buffer is cleared and ready to use.
139    pub fn acquire(&mut self) -> Vec<u8> {
140        self.pool.pop().unwrap_or_default()
141    }
142
143    /// Returns a buffer to the pool for reuse.
144    /// Buffers larger than max_buffer_size are not pooled.
145    pub fn release(&mut self, mut buffer: Vec<u8>) {
146        if buffer.capacity() <= self.max_buffer_size && self.pool.len() < self.max_pool_size {
147            buffer.clear();
148            self.pool.push(buffer);
149        }
150    }
151
152    /// Returns the number of buffers currently available in the pool.
153    pub fn available(&self) -> usize {
154        self.pool.len()
155    }
156
157    /// Clears all pooled buffers.
158    pub fn clear(&mut self) {
159        self.pool.clear();
160    }
161}
162
163impl Default for CompressionBufferPool {
164    fn default() -> Self {
165        // Default: pool buffers up to 8KB, keep up to 32 buffers
166        Self::new(8192, 32)
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    #[test]
175    fn test_pooled_packet_basic() {
176        let data = vec![1, 2, 3, 4, 5];
177        let packet = PooledPacket::new(data);
178
179        assert_eq!(packet.len(), 5);
180        assert_eq!(packet.as_slice(), &[1, 2, 3, 4, 5]);
181        assert!(!packet.is_empty());
182    }
183
184    #[test]
185    fn test_pooled_packet_slice() {
186        let data = Arc::new(vec![1, 2, 3, 4, 5]);
187        #[allow(clippy::redundant_clone)]
188        let packet = PooledPacket::from_arc(data.clone(), 1, 3);
189
190        assert_eq!(packet.len(), 3);
191        assert_eq!(packet.as_slice(), &[2, 3, 4]);
192    }
193
194    #[test]
195    fn test_pooled_packet_ref_count() {
196        let data = vec![1, 2, 3];
197        let packet1 = PooledPacket::new(data);
198        assert_eq!(packet1.ref_count(), 1);
199
200        #[allow(clippy::redundant_clone)]
201        let packet2 = packet1.clone();
202        assert_eq!(packet1.ref_count(), 2);
203        assert_eq!(packet2.ref_count(), 2);
204    }
205
206    #[test]
207    fn test_allocator_basic() {
208        let mut allocator = PacketAllocator::new(100, 10);
209
210        let buf1 = allocator.allocate();
211        assert!(buf1.capacity() >= 100);
212        assert_eq!(allocator.available(), 0);
213
214        allocator.deallocate(buf1);
215        assert_eq!(allocator.available(), 1);
216
217        let buf2 = allocator.allocate();
218        assert_eq!(allocator.available(), 0);
219        drop(buf2);
220    }
221
222    #[test]
223    fn test_allocator_max_pool_size() {
224        let mut allocator = PacketAllocator::new(100, 2);
225
226        for _ in 0..5 {
227            allocator.deallocate(Vec::new());
228        }
229
230        assert_eq!(allocator.available(), 2);
231    }
232
233    #[test]
234    fn test_compression_pool_basic() {
235        let mut pool = CompressionBufferPool::new(1024, 10);
236
237        // Acquire a buffer
238        let buf1 = pool.acquire();
239        assert_eq!(pool.available(), 0);
240
241        // Release it back
242        pool.release(buf1);
243        assert_eq!(pool.available(), 1);
244
245        // Acquire again - should reuse the buffer
246        let buf2 = pool.acquire();
247        assert_eq!(pool.available(), 0);
248        drop(buf2);
249    }
250
251    #[test]
252    fn test_compression_pool_size_limit() {
253        let mut pool = CompressionBufferPool::new(1024, 5);
254
255        // Create a buffer larger than max_buffer_size
256        let mut large_buf = Vec::with_capacity(2048);
257        large_buf.extend_from_slice(&[0u8; 2048]);
258
259        pool.release(large_buf);
260        // Should not be pooled due to size
261        assert_eq!(pool.available(), 0);
262
263        // Create a buffer within size limit
264        let mut small_buf = Vec::with_capacity(512);
265        small_buf.extend_from_slice(&[0u8; 256]);
266
267        pool.release(small_buf);
268        // Should be pooled
269        assert_eq!(pool.available(), 1);
270    }
271
272    #[test]
273    fn test_compression_pool_max_pool_size() {
274        let mut pool = CompressionBufferPool::new(1024, 3);
275
276        // Try to release more buffers than max_pool_size
277        for _ in 0..5 {
278            pool.release(Vec::new());
279        }
280
281        // Should only keep max_pool_size buffers
282        assert_eq!(pool.available(), 3);
283    }
284
285    #[test]
286    fn test_compression_pool_clear() {
287        let mut pool = CompressionBufferPool::default();
288
289        for _ in 0..5 {
290            pool.release(Vec::new());
291        }
292
293        assert_eq!(pool.available(), 5);
294
295        pool.clear();
296        assert_eq!(pool.available(), 0);
297    }
298}