kode_bridge/
buffer_pool.rs

1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6/// Thread-safe buffer pool for reducing memory allocations
7#[derive(Debug, Clone)]
8pub struct BufferPool {
9    buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10    buffer_size: usize,
11    max_pool_size: usize,
12}
13
14impl BufferPool {
15    /// Create a new buffer pool
16    pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17        Self {
18            buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19            buffer_size,
20            max_pool_size,
21        }
22    }
23
24    /// Get a buffer from the pool or create a new one
25    pub fn get(&self) -> PooledBuffer {
26        let mut buffer = {
27            let mut buffers = self.buffers.lock();
28            buffers.pop_front().unwrap_or_else(|| {
29                debug!("Creating new buffer of size {}", self.buffer_size);
30                Vec::with_capacity(self.buffer_size)
31            })
32        };
33
34        // Ensure buffer has the right capacity and is cleared
35        buffer.clear();
36        if buffer.capacity() < self.buffer_size {
37            buffer.reserve(self.buffer_size - buffer.capacity());
38        }
39
40        PooledBuffer {
41            buffer,
42            pool: Arc::downgrade(&self.buffers),
43            max_pool_size: self.max_pool_size,
44        }
45    }
46
47    /// Get current pool size for monitoring
48    pub fn size(&self) -> usize {
49        self.buffers.lock().len()
50    }
51
52    /// Pre-warm the pool with buffers
53    pub fn warm_up(&self, count: usize) {
54        let mut buffers = self.buffers.lock();
55        let current_size = buffers.len();
56        let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
57
58        for _ in 0..to_create {
59            buffers.push_back(Vec::with_capacity(self.buffer_size));
60        }
61
62        debug!("Buffer pool warmed up with {} buffers", to_create);
63    }
64}
65
66impl Default for BufferPool {
67    fn default() -> Self {
68        Self::new(16384, 64) // 16KB buffers, max 64 in pool (increased for better performance)
69    }
70}
71
72/// RAII wrapper for pooled buffers that returns buffer to pool on drop
73pub struct PooledBuffer {
74    buffer: Vec<u8>,
75    pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
76    max_pool_size: usize,
77}
78
79impl PooledBuffer {
80    /// Get mutable reference to the underlying buffer
81    pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
82        &mut self.buffer
83    }
84
85    /// Get reference to the underlying buffer
86    pub fn as_vec(&self) -> &Vec<u8> {
87        &self.buffer
88    }
89
90    /// Get the buffer as a byte slice
91    pub fn as_slice(&self) -> &[u8] {
92        &self.buffer
93    }
94
95    /// Get the buffer capacity
96    pub fn capacity(&self) -> usize {
97        self.buffer.capacity()
98    }
99
100    /// Get the buffer length
101    pub fn len(&self) -> usize {
102        self.buffer.len()
103    }
104
105    /// Check if buffer is empty
106    pub fn is_empty(&self) -> bool {
107        self.buffer.is_empty()
108    }
109
110    /// Clear the buffer
111    pub fn clear(&mut self) {
112        self.buffer.clear();
113    }
114
115    /// Extend buffer with slice
116    pub fn extend_from_slice(&mut self, other: &[u8]) {
117        self.buffer.extend_from_slice(other);
118    }
119
120    /// Push a single byte
121    pub fn push(&mut self, byte: u8) {
122        self.buffer.push(byte);
123    }
124
125    /// Reserve additional capacity
126    pub fn reserve(&mut self, additional: usize) {
127        self.buffer.reserve(additional);
128    }
129}
130
131impl Drop for PooledBuffer {
132    fn drop(&mut self) {
133        if let Some(pool) = self.pool.upgrade() {
134            let mut buffers = pool.lock();
135            if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
136                // Only return buffer to pool if it's reasonably sized and pool has space
137                let mut returned_buffer = std::mem::take(&mut self.buffer);
138                returned_buffer.clear();
139                buffers.push_back(returned_buffer);
140                debug!("Buffer returned to pool");
141            }
142        }
143    }
144}
145
146impl std::ops::Deref for PooledBuffer {
147    type Target = [u8];
148
149    fn deref(&self) -> &Self::Target {
150        &self.buffer
151    }
152}
153
154impl std::ops::DerefMut for PooledBuffer {
155    fn deref_mut(&mut self) -> &mut Self::Target {
156        &mut self.buffer
157    }
158}
159
160/// Global buffer pools for different use cases
161pub struct GlobalBufferPools {
162    /// Small buffers for headers and small data
163    small: BufferPool,
164    /// Medium buffers for typical HTTP requests/responses
165    medium: BufferPool,
166    /// Large buffers for big payloads
167    large: BufferPool,
168    /// Extra large buffers for very large PUT/POST requests
169    extra_large: BufferPool,
170}
171
172impl GlobalBufferPools {
173    pub fn new() -> Self {
174        Self {
175            small: BufferPool::new(2048, 32),   // 2KB buffers, more instances
176            medium: BufferPool::new(16384, 64), // 16KB buffers, doubled size and count
177            large: BufferPool::new(131072, 16), // 128KB buffers, doubled size, more instances
178            extra_large: BufferPool::new(1048576, 8), // 1MB buffers for very large requests
179        }
180    }
181
182    /// Get appropriate buffer based on expected size with better size selection
183    pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
184        if expected_size <= 2048 {
185            self.small.get()
186        } else if expected_size <= 16384 {
187            self.medium.get()
188        } else if expected_size <= 131072 {
189            self.large.get()
190        } else {
191            self.extra_large.get()
192        }
193    }
194
195    /// Get small buffer (for headers, small data)
196    pub fn get_small(&self) -> PooledBuffer {
197        self.small.get()
198    }
199
200    /// Get medium buffer (for typical HTTP data)
201    pub fn get_medium(&self) -> PooledBuffer {
202        self.medium.get()
203    }
204
205    /// Get large buffer (for big payloads)
206    pub fn get_large(&self) -> PooledBuffer {
207        self.large.get()
208    }
209
210    /// Get extra large buffer (for very large PUT/POST requests)
211    pub fn get_extra_large(&self) -> PooledBuffer {
212        self.extra_large.get()
213    }
214
215    /// Warm up all pools
216    pub fn warm_up(&self) {
217        self.small.warm_up(16); // More pre-warmed buffers
218        self.medium.warm_up(32);
219        self.large.warm_up(8);
220        self.extra_large.warm_up(4);
221    }
222
223    /// Get pool statistics
224    pub fn stats(&self) -> BufferPoolStats {
225        BufferPoolStats {
226            small_pool_size: self.small.size(),
227            medium_pool_size: self.medium.size(),
228            large_pool_size: self.large.size(),
229            extra_large_pool_size: self.extra_large.size(),
230        }
231    }
232}
233
234impl Default for GlobalBufferPools {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240#[derive(Debug, Clone)]
241pub struct BufferPoolStats {
242    pub small_pool_size: usize,
243    pub medium_pool_size: usize,
244    pub large_pool_size: usize,
245    pub extra_large_pool_size: usize,
246}
247
248// Global instance - use lazy initialization
249use std::sync::OnceLock;
250
251static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
252
253/// Get global buffer pools instance
254pub fn global_pools() -> &'static GlobalBufferPools {
255    GLOBAL_POOLS.get_or_init(|| {
256        let pools = GlobalBufferPools::new();
257        pools.warm_up();
258        pools
259    })
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn test_buffer_pool_basic() {
268        let pool = BufferPool::new(1024, 4);
269
270        {
271            let mut buf1 = pool.get();
272            buf1.extend_from_slice(b"hello");
273            assert_eq!(buf1.len(), 5);
274            assert!(buf1.capacity() >= 1024);
275        }
276
277        // Buffer should be returned to pool
278        assert_eq!(pool.size(), 1);
279
280        let mut buf2 = pool.get();
281        assert_eq!(buf2.len(), 0); // Should be cleared
282        buf2.extend_from_slice(b"world");
283        assert_eq!(buf2.as_slice(), b"world");
284    }
285
286    #[test]
287    fn test_buffer_pool_max_size() {
288        let pool = BufferPool::new(1024, 2);
289
290        let _buf1 = pool.get();
291        let _buf2 = pool.get();
292        let _buf3 = pool.get();
293
294        // When all buffers drop, only 2 should be retained
295        drop(_buf1);
296        drop(_buf2);
297        drop(_buf3);
298
299        assert_eq!(pool.size(), 2);
300    }
301
302    #[test]
303    fn test_global_pools() {
304        let pools = global_pools();
305
306        let mut small_buf = pools.get_small();
307        small_buf.extend_from_slice(b"small");
308
309        let mut medium_buf = pools.get_medium();
310        medium_buf.extend_from_slice(b"medium data");
311
312        let mut large_buf = pools.get_large();
313        large_buf.extend_from_slice(b"large data payload");
314
315        assert_eq!(small_buf.as_slice(), b"small");
316        assert_eq!(medium_buf.as_slice(), b"medium data");
317        assert_eq!(large_buf.as_slice(), b"large data payload");
318
319        // Check capacities
320        assert!(small_buf.capacity() >= 1024);
321        assert!(medium_buf.capacity() >= 8192);
322        assert!(large_buf.capacity() >= 65536);
323    }
324
325    #[test]
326    fn test_buffer_selection() {
327        let pools = GlobalBufferPools::new();
328
329        let buf1 = pools.get_buffer(512); // Should get small
330        let buf2 = pools.get_buffer(4096); // Should get medium
331        let buf3 = pools.get_buffer(32768); // Should get large
332
333        assert!(buf1.capacity() >= 1024);
334        assert!(buf2.capacity() >= 8192);
335        assert!(buf3.capacity() >= 65536);
336    }
337}