kode_bridge/
buffer_pool.rs

1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6/// Thread-safe buffer pool for reducing memory allocations
7#[derive(Debug, Clone)]
8pub struct BufferPool {
9    buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10    buffer_size: usize,
11    max_pool_size: usize,
12}
13
14impl BufferPool {
15    /// Create a new buffer pool
16    pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17        Self {
18            buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19            buffer_size,
20            max_pool_size,
21        }
22    }
23
24    /// Get a buffer from the pool or create a new one
25    pub fn get(&self) -> PooledBuffer {
26        let mut buffer = {
27            let mut buffers = self.buffers.lock();
28            buffers.pop_front().unwrap_or_else(|| {
29                debug!("Creating new buffer of size {}", self.buffer_size);
30                Vec::with_capacity(self.buffer_size)
31            })
32        };
33
34        // Ensure buffer has the right capacity and is cleared
35        buffer.clear();
36        if buffer.capacity() < self.buffer_size {
37            buffer.reserve(self.buffer_size - buffer.capacity());
38        }
39
40        PooledBuffer {
41            buffer,
42            pool: Arc::downgrade(&self.buffers),
43            max_pool_size: self.max_pool_size,
44        }
45    }
46
47    /// Get current pool size for monitoring
48    pub fn size(&self) -> usize {
49        self.buffers.lock().len()
50    }
51
52    /// Pre-warm the pool with buffers
53    pub fn warm_up(&self, count: usize) {
54        let to_create = {
55            let mut buffers = self.buffers.lock();
56            let current_size = buffers.len();
57            let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
58
59            for _ in 0..to_create {
60                buffers.push_back(Vec::with_capacity(self.buffer_size));
61            }
62
63            to_create
64        };
65
66        debug!("Buffer pool warmed up with {} buffers", to_create);
67    }
68}
69
70impl Default for BufferPool {
71    fn default() -> Self {
72        Self::new(16384, 64) // 16KB buffers, max 64 in pool (increased for better performance)
73    }
74}
75
76/// RAII wrapper for pooled buffers that returns buffer to pool on drop
77pub struct PooledBuffer {
78    buffer: Vec<u8>,
79    pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
80    max_pool_size: usize,
81}
82
83impl PooledBuffer {
84    /// Get mutable reference to the underlying buffer
85    pub const fn as_mut_vec(&mut self) -> &mut Vec<u8> {
86        &mut self.buffer
87    }
88
89    /// Get reference to the underlying buffer
90    pub const fn as_vec(&self) -> &Vec<u8> {
91        &self.buffer
92    }
93
94    /// Get the buffer as a byte slice
95    pub fn as_slice(&self) -> &[u8] {
96        &self.buffer
97    }
98
99    /// Get the buffer capacity
100    pub fn capacity(&self) -> usize {
101        self.buffer.capacity()
102    }
103
104    /// Get the buffer length
105    pub fn len(&self) -> usize {
106        self.buffer.len()
107    }
108
109    /// Check if buffer is empty
110    pub fn is_empty(&self) -> bool {
111        self.buffer.is_empty()
112    }
113
114    /// Clear the buffer
115    pub fn clear(&mut self) {
116        self.buffer.clear();
117    }
118
119    /// Extend buffer with slice
120    pub fn extend_from_slice(&mut self, other: &[u8]) {
121        self.buffer.extend_from_slice(other);
122    }
123
124    /// Push a single byte
125    pub fn push(&mut self, byte: u8) {
126        self.buffer.push(byte);
127    }
128
129    /// Reserve additional capacity
130    pub fn reserve(&mut self, additional: usize) {
131        self.buffer.reserve(additional);
132    }
133}
134
135impl Drop for PooledBuffer {
136    fn drop(&mut self) {
137        if let Some(pool) = self.pool.upgrade() {
138            let returned = {
139                let mut buffers = pool.lock();
140                if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
141                    // Only return buffer to pool if it's reasonably sized and pool has space
142                    let mut returned_buffer = std::mem::take(&mut self.buffer);
143                    returned_buffer.clear();
144                    buffers.push_back(returned_buffer);
145                    true
146                } else {
147                    false
148                }
149            };
150
151            if returned {
152                debug!("Buffer returned to pool");
153            }
154        }
155    }
156}
157
158impl std::ops::Deref for PooledBuffer {
159    type Target = [u8];
160
161    fn deref(&self) -> &Self::Target {
162        &self.buffer
163    }
164}
165
166impl std::ops::DerefMut for PooledBuffer {
167    fn deref_mut(&mut self) -> &mut Self::Target {
168        &mut self.buffer
169    }
170}
171
172/// Global buffer pools for different use cases
173pub struct GlobalBufferPools {
174    /// Small buffers for headers and small data
175    small: BufferPool,
176    /// Medium buffers for typical HTTP requests/responses
177    medium: BufferPool,
178    /// Large buffers for big payloads
179    large: BufferPool,
180    /// Extra large buffers for very large PUT/POST requests
181    extra_large: BufferPool,
182}
183
184impl GlobalBufferPools {
185    pub fn new() -> Self {
186        Self {
187            small: BufferPool::new(2048, 32),         // 2KB buffers, more instances
188            medium: BufferPool::new(16384, 64),       // 16KB buffers, doubled size and count
189            large: BufferPool::new(131072, 16),       // 128KB buffers, doubled size, more instances
190            extra_large: BufferPool::new(1048576, 8), // 1MB buffers for very large requests
191        }
192    }
193
194    /// Get appropriate buffer based on expected size with better size selection
195    pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
196        if expected_size <= 2048 {
197            self.small.get()
198        } else if expected_size <= 16384 {
199            self.medium.get()
200        } else if expected_size <= 131072 {
201            self.large.get()
202        } else {
203            self.extra_large.get()
204        }
205    }
206
207    /// Get small buffer (for headers, small data)
208    pub fn get_small(&self) -> PooledBuffer {
209        self.small.get()
210    }
211
212    /// Get medium buffer (for typical HTTP data)
213    pub fn get_medium(&self) -> PooledBuffer {
214        self.medium.get()
215    }
216
217    /// Get large buffer (for big payloads)
218    pub fn get_large(&self) -> PooledBuffer {
219        self.large.get()
220    }
221
222    /// Get extra large buffer (for very large PUT/POST requests)
223    pub fn get_extra_large(&self) -> PooledBuffer {
224        self.extra_large.get()
225    }
226
227    /// Warm up all pools
228    pub fn warm_up(&self) {
229        self.small.warm_up(16); // More pre-warmed buffers
230        self.medium.warm_up(32);
231        self.large.warm_up(8);
232        self.extra_large.warm_up(4);
233    }
234
235    /// Get pool statistics
236    pub fn stats(&self) -> BufferPoolStats {
237        BufferPoolStats {
238            small_pool_size: self.small.size(),
239            medium_pool_size: self.medium.size(),
240            large_pool_size: self.large.size(),
241            extra_large_pool_size: self.extra_large.size(),
242        }
243    }
244}
245
246impl Default for GlobalBufferPools {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252#[derive(Debug, Clone)]
253pub struct BufferPoolStats {
254    pub small_pool_size: usize,
255    pub medium_pool_size: usize,
256    pub large_pool_size: usize,
257    pub extra_large_pool_size: usize,
258}
259
260// Global instance - use lazy initialization
261use std::sync::OnceLock;
262
263static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
264
265/// Get global buffer pools instance
266pub fn global_pools() -> &'static GlobalBufferPools {
267    GLOBAL_POOLS.get_or_init(|| {
268        let pools = GlobalBufferPools::new();
269        pools.warm_up();
270        pools
271    })
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_buffer_pool_basic() {
280        let pool = BufferPool::new(1024, 4);
281
282        {
283            let mut buf1 = pool.get();
284            buf1.extend_from_slice(b"hello");
285            assert_eq!(buf1.len(), 5);
286            assert!(buf1.capacity() >= 1024);
287        }
288
289        // Buffer should be returned to pool
290        assert_eq!(pool.size(), 1);
291
292        let mut buf2 = pool.get();
293        assert_eq!(buf2.len(), 0); // Should be cleared
294        buf2.extend_from_slice(b"world");
295        assert_eq!(buf2.as_slice(), b"world");
296    }
297
298    #[test]
299    fn test_buffer_pool_max_size() {
300        let pool = BufferPool::new(1024, 2);
301
302        let _buf1 = pool.get();
303        let _buf2 = pool.get();
304        let _buf3 = pool.get();
305
306        // When all buffers drop, only 2 should be retained
307        drop(_buf1);
308        drop(_buf2);
309        drop(_buf3);
310
311        assert_eq!(pool.size(), 2);
312    }
313
314    #[test]
315    fn test_global_pools() {
316        let pools = global_pools();
317
318        let mut small_buf = pools.get_small();
319        small_buf.extend_from_slice(b"small");
320
321        let mut medium_buf = pools.get_medium();
322        medium_buf.extend_from_slice(b"medium data");
323
324        let mut large_buf = pools.get_large();
325        large_buf.extend_from_slice(b"large data payload");
326
327        assert_eq!(small_buf.as_slice(), b"small");
328        assert_eq!(medium_buf.as_slice(), b"medium data");
329        assert_eq!(large_buf.as_slice(), b"large data payload");
330
331        // Check capacities
332        assert!(small_buf.capacity() >= 1024);
333        assert!(medium_buf.capacity() >= 8192);
334        assert!(large_buf.capacity() >= 65536);
335    }
336
337    #[test]
338    fn test_buffer_selection() {
339        let pools = GlobalBufferPools::new();
340
341        let buf1 = pools.get_buffer(512); // Should get small
342        let buf2 = pools.get_buffer(4096); // Should get medium
343        let buf3 = pools.get_buffer(32768); // Should get large
344
345        assert!(buf1.capacity() >= 1024);
346        assert!(buf2.capacity() >= 8192);
347        assert!(buf3.capacity() >= 65536);
348    }
349}