kode_bridge/
buffer_pool.rs1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6#[derive(Debug, Clone)]
8pub struct BufferPool {
9 buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10 buffer_size: usize,
11 max_pool_size: usize,
12}
13
14impl BufferPool {
15 pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17 Self {
18 buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19 buffer_size,
20 max_pool_size,
21 }
22 }
23
24 pub fn get(&self) -> PooledBuffer {
26 let mut buffer = {
27 let mut buffers = self.buffers.lock();
28 buffers.pop_front().unwrap_or_else(|| {
29 debug!("Creating new buffer of size {}", self.buffer_size);
30 Vec::with_capacity(self.buffer_size)
31 })
32 };
33
34 buffer.clear();
36 if buffer.capacity() < self.buffer_size {
37 buffer.reserve(self.buffer_size - buffer.capacity());
38 }
39
40 PooledBuffer {
41 buffer,
42 pool: Arc::downgrade(&self.buffers),
43 max_pool_size: self.max_pool_size,
44 }
45 }
46
47 pub fn size(&self) -> usize {
49 self.buffers.lock().len()
50 }
51
52 pub fn warm_up(&self, count: usize) {
54 let mut buffers = self.buffers.lock();
55 let current_size = buffers.len();
56 let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
57
58 for _ in 0..to_create {
59 buffers.push_back(Vec::with_capacity(self.buffer_size));
60 }
61
62 debug!("Buffer pool warmed up with {} buffers", to_create);
63 }
64}
65
66impl Default for BufferPool {
67 fn default() -> Self {
68 Self::new(16384, 64) }
70}
71
72pub struct PooledBuffer {
74 buffer: Vec<u8>,
75 pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
76 max_pool_size: usize,
77}
78
79impl PooledBuffer {
80 pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
82 &mut self.buffer
83 }
84
85 pub fn as_vec(&self) -> &Vec<u8> {
87 &self.buffer
88 }
89
90 pub fn as_slice(&self) -> &[u8] {
92 &self.buffer
93 }
94
95 pub fn capacity(&self) -> usize {
97 self.buffer.capacity()
98 }
99
100 pub fn len(&self) -> usize {
102 self.buffer.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
107 self.buffer.is_empty()
108 }
109
110 pub fn clear(&mut self) {
112 self.buffer.clear();
113 }
114
115 pub fn extend_from_slice(&mut self, other: &[u8]) {
117 self.buffer.extend_from_slice(other);
118 }
119
120 pub fn push(&mut self, byte: u8) {
122 self.buffer.push(byte);
123 }
124
125 pub fn reserve(&mut self, additional: usize) {
127 self.buffer.reserve(additional);
128 }
129}
130
131impl Drop for PooledBuffer {
132 fn drop(&mut self) {
133 if let Some(pool) = self.pool.upgrade() {
134 let mut buffers = pool.lock();
135 if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
136 let mut returned_buffer = std::mem::take(&mut self.buffer);
138 returned_buffer.clear();
139 buffers.push_back(returned_buffer);
140 debug!("Buffer returned to pool");
141 }
142 }
143 }
144}
145
146impl std::ops::Deref for PooledBuffer {
147 type Target = [u8];
148
149 fn deref(&self) -> &Self::Target {
150 &self.buffer
151 }
152}
153
154impl std::ops::DerefMut for PooledBuffer {
155 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.buffer
157 }
158}
159
160pub struct GlobalBufferPools {
162 small: BufferPool,
164 medium: BufferPool,
166 large: BufferPool,
168 extra_large: BufferPool,
170}
171
172impl GlobalBufferPools {
173 pub fn new() -> Self {
174 Self {
175 small: BufferPool::new(2048, 32), medium: BufferPool::new(16384, 64), large: BufferPool::new(131072, 16), extra_large: BufferPool::new(1048576, 8), }
180 }
181
182 pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
184 if expected_size <= 2048 {
185 self.small.get()
186 } else if expected_size <= 16384 {
187 self.medium.get()
188 } else if expected_size <= 131072 {
189 self.large.get()
190 } else {
191 self.extra_large.get()
192 }
193 }
194
195 pub fn get_small(&self) -> PooledBuffer {
197 self.small.get()
198 }
199
200 pub fn get_medium(&self) -> PooledBuffer {
202 self.medium.get()
203 }
204
205 pub fn get_large(&self) -> PooledBuffer {
207 self.large.get()
208 }
209
210 pub fn get_extra_large(&self) -> PooledBuffer {
212 self.extra_large.get()
213 }
214
215 pub fn warm_up(&self) {
217 self.small.warm_up(16); self.medium.warm_up(32);
219 self.large.warm_up(8);
220 self.extra_large.warm_up(4);
221 }
222
223 pub fn stats(&self) -> BufferPoolStats {
225 BufferPoolStats {
226 small_pool_size: self.small.size(),
227 medium_pool_size: self.medium.size(),
228 large_pool_size: self.large.size(),
229 extra_large_pool_size: self.extra_large.size(),
230 }
231 }
232}
233
234impl Default for GlobalBufferPools {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240#[derive(Debug, Clone)]
241pub struct BufferPoolStats {
242 pub small_pool_size: usize,
243 pub medium_pool_size: usize,
244 pub large_pool_size: usize,
245 pub extra_large_pool_size: usize,
246}
247
248use std::sync::OnceLock;
250
251static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
252
253pub fn global_pools() -> &'static GlobalBufferPools {
255 GLOBAL_POOLS.get_or_init(|| {
256 let pools = GlobalBufferPools::new();
257 pools.warm_up();
258 pools
259 })
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265
266 #[test]
267 fn test_buffer_pool_basic() {
268 let pool = BufferPool::new(1024, 4);
269
270 {
271 let mut buf1 = pool.get();
272 buf1.extend_from_slice(b"hello");
273 assert_eq!(buf1.len(), 5);
274 assert!(buf1.capacity() >= 1024);
275 }
276
277 assert_eq!(pool.size(), 1);
279
280 let mut buf2 = pool.get();
281 assert_eq!(buf2.len(), 0); buf2.extend_from_slice(b"world");
283 assert_eq!(buf2.as_slice(), b"world");
284 }
285
286 #[test]
287 fn test_buffer_pool_max_size() {
288 let pool = BufferPool::new(1024, 2);
289
290 let _buf1 = pool.get();
291 let _buf2 = pool.get();
292 let _buf3 = pool.get();
293
294 drop(_buf1);
296 drop(_buf2);
297 drop(_buf3);
298
299 assert_eq!(pool.size(), 2);
300 }
301
302 #[test]
303 fn test_global_pools() {
304 let pools = global_pools();
305
306 let mut small_buf = pools.get_small();
307 small_buf.extend_from_slice(b"small");
308
309 let mut medium_buf = pools.get_medium();
310 medium_buf.extend_from_slice(b"medium data");
311
312 let mut large_buf = pools.get_large();
313 large_buf.extend_from_slice(b"large data payload");
314
315 assert_eq!(small_buf.as_slice(), b"small");
316 assert_eq!(medium_buf.as_slice(), b"medium data");
317 assert_eq!(large_buf.as_slice(), b"large data payload");
318
319 assert!(small_buf.capacity() >= 1024);
321 assert!(medium_buf.capacity() >= 8192);
322 assert!(large_buf.capacity() >= 65536);
323 }
324
325 #[test]
326 fn test_buffer_selection() {
327 let pools = GlobalBufferPools::new();
328
329 let buf1 = pools.get_buffer(512); let buf2 = pools.get_buffer(4096); let buf3 = pools.get_buffer(32768); assert!(buf1.capacity() >= 1024);
334 assert!(buf2.capacity() >= 8192);
335 assert!(buf3.capacity() >= 65536);
336 }
337}