kode_bridge/
buffer_pool.rs1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6#[derive(Debug, Clone)]
8pub struct BufferPool {
9 buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10 buffer_size: usize,
11 max_pool_size: usize,
12}
13
14impl BufferPool {
15 pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17 Self {
18 buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19 buffer_size,
20 max_pool_size,
21 }
22 }
23
24 pub fn get(&self) -> PooledBuffer {
26 let mut buffer = {
27 let mut buffers = self.buffers.lock();
28 buffers.pop_front().unwrap_or_else(|| {
29 debug!("Creating new buffer of size {}", self.buffer_size);
30 Vec::with_capacity(self.buffer_size)
31 })
32 };
33
34 buffer.clear();
36 if buffer.capacity() < self.buffer_size {
37 buffer.reserve(self.buffer_size - buffer.capacity());
38 }
39
40 PooledBuffer {
41 buffer,
42 pool: Arc::downgrade(&self.buffers),
43 max_pool_size: self.max_pool_size,
44 }
45 }
46
47 pub fn size(&self) -> usize {
49 self.buffers.lock().len()
50 }
51
52 pub fn warm_up(&self, count: usize) {
54 let to_create = {
55 let mut buffers = self.buffers.lock();
56 let current_size = buffers.len();
57 let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
58
59 for _ in 0..to_create {
60 buffers.push_back(Vec::with_capacity(self.buffer_size));
61 }
62
63 to_create
64 };
65
66 debug!("Buffer pool warmed up with {} buffers", to_create);
67 }
68}
69
70impl Default for BufferPool {
71 fn default() -> Self {
72 Self::new(16384, 64) }
74}
75
76pub struct PooledBuffer {
78 buffer: Vec<u8>,
79 pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
80 max_pool_size: usize,
81}
82
83impl PooledBuffer {
84 pub const fn as_mut_vec(&mut self) -> &mut Vec<u8> {
86 &mut self.buffer
87 }
88
89 pub const fn as_vec(&self) -> &Vec<u8> {
91 &self.buffer
92 }
93
94 pub fn as_slice(&self) -> &[u8] {
96 &self.buffer
97 }
98
99 pub fn capacity(&self) -> usize {
101 self.buffer.capacity()
102 }
103
104 pub fn len(&self) -> usize {
106 self.buffer.len()
107 }
108
109 pub fn is_empty(&self) -> bool {
111 self.buffer.is_empty()
112 }
113
114 pub fn clear(&mut self) {
116 self.buffer.clear();
117 }
118
119 pub fn extend_from_slice(&mut self, other: &[u8]) {
121 self.buffer.extend_from_slice(other);
122 }
123
124 pub fn push(&mut self, byte: u8) {
126 self.buffer.push(byte);
127 }
128
129 pub fn reserve(&mut self, additional: usize) {
131 self.buffer.reserve(additional);
132 }
133}
134
135impl Drop for PooledBuffer {
136 fn drop(&mut self) {
137 if let Some(pool) = self.pool.upgrade() {
138 let returned = {
139 let mut buffers = pool.lock();
140 if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
141 let mut returned_buffer = std::mem::take(&mut self.buffer);
143 returned_buffer.clear();
144 buffers.push_back(returned_buffer);
145 true
146 } else {
147 false
148 }
149 };
150
151 if returned {
152 debug!("Buffer returned to pool");
153 }
154 }
155 }
156}
157
158impl std::ops::Deref for PooledBuffer {
159 type Target = [u8];
160
161 fn deref(&self) -> &Self::Target {
162 &self.buffer
163 }
164}
165
166impl std::ops::DerefMut for PooledBuffer {
167 fn deref_mut(&mut self) -> &mut Self::Target {
168 &mut self.buffer
169 }
170}
171
172pub struct GlobalBufferPools {
174 small: BufferPool,
176 medium: BufferPool,
178 large: BufferPool,
180 extra_large: BufferPool,
182}
183
184impl GlobalBufferPools {
185 pub fn new() -> Self {
186 Self {
187 small: BufferPool::new(2048, 32), medium: BufferPool::new(16384, 64), large: BufferPool::new(131072, 16), extra_large: BufferPool::new(1048576, 8), }
192 }
193
194 pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
196 if expected_size <= 2048 {
197 self.small.get()
198 } else if expected_size <= 16384 {
199 self.medium.get()
200 } else if expected_size <= 131072 {
201 self.large.get()
202 } else {
203 self.extra_large.get()
204 }
205 }
206
207 pub fn get_small(&self) -> PooledBuffer {
209 self.small.get()
210 }
211
212 pub fn get_medium(&self) -> PooledBuffer {
214 self.medium.get()
215 }
216
217 pub fn get_large(&self) -> PooledBuffer {
219 self.large.get()
220 }
221
222 pub fn get_extra_large(&self) -> PooledBuffer {
224 self.extra_large.get()
225 }
226
227 pub fn warm_up(&self) {
229 self.small.warm_up(16); self.medium.warm_up(32);
231 self.large.warm_up(8);
232 self.extra_large.warm_up(4);
233 }
234
235 pub fn stats(&self) -> BufferPoolStats {
237 BufferPoolStats {
238 small_pool_size: self.small.size(),
239 medium_pool_size: self.medium.size(),
240 large_pool_size: self.large.size(),
241 extra_large_pool_size: self.extra_large.size(),
242 }
243 }
244}
245
246impl Default for GlobalBufferPools {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252#[derive(Debug, Clone)]
253pub struct BufferPoolStats {
254 pub small_pool_size: usize,
255 pub medium_pool_size: usize,
256 pub large_pool_size: usize,
257 pub extra_large_pool_size: usize,
258}
259
260use std::sync::OnceLock;
262
263static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
264
265pub fn global_pools() -> &'static GlobalBufferPools {
267 GLOBAL_POOLS.get_or_init(|| {
268 let pools = GlobalBufferPools::new();
269 pools.warm_up();
270 pools
271 })
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_buffer_pool_basic() {
280 let pool = BufferPool::new(1024, 4);
281
282 {
283 let mut buf1 = pool.get();
284 buf1.extend_from_slice(b"hello");
285 assert_eq!(buf1.len(), 5);
286 assert!(buf1.capacity() >= 1024);
287 }
288
289 assert_eq!(pool.size(), 1);
291
292 let mut buf2 = pool.get();
293 assert_eq!(buf2.len(), 0); buf2.extend_from_slice(b"world");
295 assert_eq!(buf2.as_slice(), b"world");
296 }
297
298 #[test]
299 fn test_buffer_pool_max_size() {
300 let pool = BufferPool::new(1024, 2);
301
302 let _buf1 = pool.get();
303 let _buf2 = pool.get();
304 let _buf3 = pool.get();
305
306 drop(_buf1);
308 drop(_buf2);
309 drop(_buf3);
310
311 assert_eq!(pool.size(), 2);
312 }
313
314 #[test]
315 fn test_global_pools() {
316 let pools = global_pools();
317
318 let mut small_buf = pools.get_small();
319 small_buf.extend_from_slice(b"small");
320
321 let mut medium_buf = pools.get_medium();
322 medium_buf.extend_from_slice(b"medium data");
323
324 let mut large_buf = pools.get_large();
325 large_buf.extend_from_slice(b"large data payload");
326
327 assert_eq!(small_buf.as_slice(), b"small");
328 assert_eq!(medium_buf.as_slice(), b"medium data");
329 assert_eq!(large_buf.as_slice(), b"large data payload");
330
331 assert!(small_buf.capacity() >= 1024);
333 assert!(medium_buf.capacity() >= 8192);
334 assert!(large_buf.capacity() >= 65536);
335 }
336
337 #[test]
338 fn test_buffer_selection() {
339 let pools = GlobalBufferPools::new();
340
341 let buf1 = pools.get_buffer(512); let buf2 = pools.get_buffer(4096); let buf3 = pools.get_buffer(32768); assert!(buf1.capacity() >= 1024);
346 assert!(buf2.capacity() >= 8192);
347 assert!(buf3.capacity() >= 65536);
348 }
349}