kode_bridge/
buffer_pool.rs

1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6/// Thread-safe buffer pool for reducing memory allocations
7#[derive(Debug, Clone)]
8pub struct BufferPool {
9    buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10    buffer_size: usize,
11    max_pool_size: usize,
12}
13
14impl BufferPool {
15    /// Create a new buffer pool
16    pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17        Self {
18            buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19            buffer_size,
20            max_pool_size,
21        }
22    }
23
24    /// Get a buffer from the pool or create a new one
25    pub fn get(&self) -> PooledBuffer {
26        let mut buffer = {
27            let mut buffers = self.buffers.lock();
28            buffers.pop_front().unwrap_or_else(|| {
29                debug!("Creating new buffer of size {}", self.buffer_size);
30                Vec::with_capacity(self.buffer_size)
31            })
32        };
33
34        // Ensure buffer has the right capacity and is cleared
35        buffer.clear();
36        if buffer.capacity() < self.buffer_size {
37            buffer.reserve(self.buffer_size - buffer.capacity());
38        }
39
40        PooledBuffer {
41            buffer,
42            pool: Arc::downgrade(&self.buffers),
43            max_pool_size: self.max_pool_size,
44        }
45    }
46
47    /// Get current pool size for monitoring
48    pub fn size(&self) -> usize {
49        self.buffers.lock().len()
50    }
51
52    /// Pre-warm the pool with buffers
53    pub fn warm_up(&self, count: usize) {
54        let mut buffers = self.buffers.lock();
55        let current_size = buffers.len();
56        let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
57
58        for _ in 0..to_create {
59            buffers.push_back(Vec::with_capacity(self.buffer_size));
60        }
61
62        debug!("Buffer pool warmed up with {} buffers", to_create);
63    }
64}
65
66impl Default for BufferPool {
67    fn default() -> Self {
68        Self::new(8192, 32) // 8KB buffers, max 32 in pool
69    }
70}
71
72/// RAII wrapper for pooled buffers that returns buffer to pool on drop
73pub struct PooledBuffer {
74    buffer: Vec<u8>,
75    pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
76    max_pool_size: usize,
77}
78
79impl PooledBuffer {
80    /// Get mutable reference to the underlying buffer
81    pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
82        &mut self.buffer
83    }
84
85    /// Get reference to the underlying buffer
86    pub fn as_vec(&self) -> &Vec<u8> {
87        &self.buffer
88    }
89
90    /// Get the buffer as a byte slice
91    pub fn as_slice(&self) -> &[u8] {
92        &self.buffer
93    }
94
95    /// Get the buffer capacity
96    pub fn capacity(&self) -> usize {
97        self.buffer.capacity()
98    }
99
100    /// Get the buffer length
101    pub fn len(&self) -> usize {
102        self.buffer.len()
103    }
104
105    /// Check if buffer is empty
106    pub fn is_empty(&self) -> bool {
107        self.buffer.is_empty()
108    }
109
110    /// Clear the buffer
111    pub fn clear(&mut self) {
112        self.buffer.clear();
113    }
114
115    /// Extend buffer with slice
116    pub fn extend_from_slice(&mut self, other: &[u8]) {
117        self.buffer.extend_from_slice(other);
118    }
119
120    /// Push a single byte
121    pub fn push(&mut self, byte: u8) {
122        self.buffer.push(byte);
123    }
124
125    /// Reserve additional capacity
126    pub fn reserve(&mut self, additional: usize) {
127        self.buffer.reserve(additional);
128    }
129}
130
131impl Drop for PooledBuffer {
132    fn drop(&mut self) {
133        if let Some(pool) = self.pool.upgrade() {
134            let mut buffers = pool.lock();
135            if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
136                // Only return buffer to pool if it's reasonably sized and pool has space
137                let mut returned_buffer = std::mem::take(&mut self.buffer);
138                returned_buffer.clear();
139                buffers.push_back(returned_buffer);
140                debug!("Buffer returned to pool");
141            }
142        }
143    }
144}
145
146impl std::ops::Deref for PooledBuffer {
147    type Target = [u8];
148
149    fn deref(&self) -> &Self::Target {
150        &self.buffer
151    }
152}
153
154impl std::ops::DerefMut for PooledBuffer {
155    fn deref_mut(&mut self) -> &mut Self::Target {
156        &mut self.buffer
157    }
158}
159
160/// Global buffer pools for different use cases
161pub struct GlobalBufferPools {
162    /// Small buffers for headers and small data
163    small: BufferPool,
164    /// Medium buffers for typical HTTP requests/responses
165    medium: BufferPool,
166    /// Large buffers for big payloads
167    large: BufferPool,
168}
169
170impl GlobalBufferPools {
171    pub fn new() -> Self {
172        Self {
173            small: BufferPool::new(1024, 16),  // 1KB buffers
174            medium: BufferPool::new(8192, 32), // 8KB buffers
175            large: BufferPool::new(65536, 8),  // 64KB buffers
176        }
177    }
178
179    /// Get appropriate buffer based on expected size
180    pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
181        if expected_size <= 1024 {
182            self.small.get()
183        } else if expected_size <= 8192 {
184            self.medium.get()
185        } else {
186            self.large.get()
187        }
188    }
189
190    /// Get small buffer (for headers, small data)
191    pub fn get_small(&self) -> PooledBuffer {
192        self.small.get()
193    }
194
195    /// Get medium buffer (for typical HTTP data)
196    pub fn get_medium(&self) -> PooledBuffer {
197        self.medium.get()
198    }
199
200    /// Get large buffer (for big payloads)
201    pub fn get_large(&self) -> PooledBuffer {
202        self.large.get()
203    }
204
205    /// Warm up all pools
206    pub fn warm_up(&self) {
207        self.small.warm_up(8);
208        self.medium.warm_up(16);
209        self.large.warm_up(4);
210    }
211
212    /// Get pool statistics
213    pub fn stats(&self) -> BufferPoolStats {
214        BufferPoolStats {
215            small_pool_size: self.small.size(),
216            medium_pool_size: self.medium.size(),
217            large_pool_size: self.large.size(),
218        }
219    }
220}
221
222impl Default for GlobalBufferPools {
223    fn default() -> Self {
224        Self::new()
225    }
226}
227
228#[derive(Debug, Clone)]
229pub struct BufferPoolStats {
230    pub small_pool_size: usize,
231    pub medium_pool_size: usize,
232    pub large_pool_size: usize,
233}
234
235// Global instance - use lazy initialization
236use std::sync::OnceLock;
237
238static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
239
240/// Get global buffer pools instance
241pub fn global_pools() -> &'static GlobalBufferPools {
242    GLOBAL_POOLS.get_or_init(|| {
243        let pools = GlobalBufferPools::new();
244        pools.warm_up();
245        pools
246    })
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn test_buffer_pool_basic() {
255        let pool = BufferPool::new(1024, 4);
256
257        {
258            let mut buf1 = pool.get();
259            buf1.extend_from_slice(b"hello");
260            assert_eq!(buf1.len(), 5);
261            assert!(buf1.capacity() >= 1024);
262        }
263
264        // Buffer should be returned to pool
265        assert_eq!(pool.size(), 1);
266
267        let mut buf2 = pool.get();
268        assert_eq!(buf2.len(), 0); // Should be cleared
269        buf2.extend_from_slice(b"world");
270        assert_eq!(buf2.as_slice(), b"world");
271    }
272
273    #[test]
274    fn test_buffer_pool_max_size() {
275        let pool = BufferPool::new(1024, 2);
276
277        let _buf1 = pool.get();
278        let _buf2 = pool.get();
279        let _buf3 = pool.get();
280
281        // When all buffers drop, only 2 should be retained
282        drop(_buf1);
283        drop(_buf2);
284        drop(_buf3);
285
286        assert_eq!(pool.size(), 2);
287    }
288
289    #[test]
290    fn test_global_pools() {
291        let pools = global_pools();
292
293        let mut small_buf = pools.get_small();
294        small_buf.extend_from_slice(b"small");
295
296        let mut medium_buf = pools.get_medium();
297        medium_buf.extend_from_slice(b"medium data");
298
299        let mut large_buf = pools.get_large();
300        large_buf.extend_from_slice(b"large data payload");
301
302        assert_eq!(small_buf.as_slice(), b"small");
303        assert_eq!(medium_buf.as_slice(), b"medium data");
304        assert_eq!(large_buf.as_slice(), b"large data payload");
305
306        // Check capacities
307        assert!(small_buf.capacity() >= 1024);
308        assert!(medium_buf.capacity() >= 8192);
309        assert!(large_buf.capacity() >= 65536);
310    }
311
312    #[test]
313    fn test_buffer_selection() {
314        let pools = GlobalBufferPools::new();
315
316        let buf1 = pools.get_buffer(512); // Should get small
317        let buf2 = pools.get_buffer(4096); // Should get medium
318        let buf3 = pools.get_buffer(32768); // Should get large
319
320        assert!(buf1.capacity() >= 1024);
321        assert!(buf2.capacity() >= 8192);
322        assert!(buf3.capacity() >= 65536);
323    }
324}