kode_bridge/
buffer_pool.rs1use parking_lot::Mutex;
2use std::collections::VecDeque;
3use std::sync::Arc;
4use tracing::debug;
5
6#[derive(Debug, Clone)]
8pub struct BufferPool {
9 buffers: Arc<Mutex<VecDeque<Vec<u8>>>>,
10 buffer_size: usize,
11 max_pool_size: usize,
12}
13
14impl BufferPool {
15 pub fn new(buffer_size: usize, max_pool_size: usize) -> Self {
17 Self {
18 buffers: Arc::new(Mutex::new(VecDeque::with_capacity(max_pool_size))),
19 buffer_size,
20 max_pool_size,
21 }
22 }
23
24 pub fn get(&self) -> PooledBuffer {
26 let mut buffer = {
27 let mut buffers = self.buffers.lock();
28 buffers.pop_front().unwrap_or_else(|| {
29 debug!("Creating new buffer of size {}", self.buffer_size);
30 Vec::with_capacity(self.buffer_size)
31 })
32 };
33
34 buffer.clear();
36 if buffer.capacity() < self.buffer_size {
37 buffer.reserve(self.buffer_size - buffer.capacity());
38 }
39
40 PooledBuffer {
41 buffer,
42 pool: Arc::downgrade(&self.buffers),
43 max_pool_size: self.max_pool_size,
44 }
45 }
46
47 pub fn size(&self) -> usize {
49 self.buffers.lock().len()
50 }
51
52 pub fn warm_up(&self, count: usize) {
54 let mut buffers = self.buffers.lock();
55 let current_size = buffers.len();
56 let to_create = (count.saturating_sub(current_size)).min(self.max_pool_size - current_size);
57
58 for _ in 0..to_create {
59 buffers.push_back(Vec::with_capacity(self.buffer_size));
60 }
61
62 debug!("Buffer pool warmed up with {} buffers", to_create);
63 }
64}
65
66impl Default for BufferPool {
67 fn default() -> Self {
68 Self::new(8192, 32) }
70}
71
72pub struct PooledBuffer {
74 buffer: Vec<u8>,
75 pool: std::sync::Weak<Mutex<VecDeque<Vec<u8>>>>,
76 max_pool_size: usize,
77}
78
79impl PooledBuffer {
80 pub fn as_mut_vec(&mut self) -> &mut Vec<u8> {
82 &mut self.buffer
83 }
84
85 pub fn as_vec(&self) -> &Vec<u8> {
87 &self.buffer
88 }
89
90 pub fn as_slice(&self) -> &[u8] {
92 &self.buffer
93 }
94
95 pub fn capacity(&self) -> usize {
97 self.buffer.capacity()
98 }
99
100 pub fn len(&self) -> usize {
102 self.buffer.len()
103 }
104
105 pub fn is_empty(&self) -> bool {
107 self.buffer.is_empty()
108 }
109
110 pub fn clear(&mut self) {
112 self.buffer.clear();
113 }
114
115 pub fn extend_from_slice(&mut self, other: &[u8]) {
117 self.buffer.extend_from_slice(other);
118 }
119
120 pub fn push(&mut self, byte: u8) {
122 self.buffer.push(byte);
123 }
124
125 pub fn reserve(&mut self, additional: usize) {
127 self.buffer.reserve(additional);
128 }
129}
130
131impl Drop for PooledBuffer {
132 fn drop(&mut self) {
133 if let Some(pool) = self.pool.upgrade() {
134 let mut buffers = pool.lock();
135 if buffers.len() < self.max_pool_size && self.buffer.capacity() >= 1024 {
136 let mut returned_buffer = std::mem::take(&mut self.buffer);
138 returned_buffer.clear();
139 buffers.push_back(returned_buffer);
140 debug!("Buffer returned to pool");
141 }
142 }
143 }
144}
145
146impl std::ops::Deref for PooledBuffer {
147 type Target = [u8];
148
149 fn deref(&self) -> &Self::Target {
150 &self.buffer
151 }
152}
153
154impl std::ops::DerefMut for PooledBuffer {
155 fn deref_mut(&mut self) -> &mut Self::Target {
156 &mut self.buffer
157 }
158}
159
160pub struct GlobalBufferPools {
162 small: BufferPool,
164 medium: BufferPool,
166 large: BufferPool,
168}
169
170impl GlobalBufferPools {
171 pub fn new() -> Self {
172 Self {
173 small: BufferPool::new(1024, 16), medium: BufferPool::new(8192, 32), large: BufferPool::new(65536, 8), }
177 }
178
179 pub fn get_buffer(&self, expected_size: usize) -> PooledBuffer {
181 if expected_size <= 1024 {
182 self.small.get()
183 } else if expected_size <= 8192 {
184 self.medium.get()
185 } else {
186 self.large.get()
187 }
188 }
189
190 pub fn get_small(&self) -> PooledBuffer {
192 self.small.get()
193 }
194
195 pub fn get_medium(&self) -> PooledBuffer {
197 self.medium.get()
198 }
199
200 pub fn get_large(&self) -> PooledBuffer {
202 self.large.get()
203 }
204
205 pub fn warm_up(&self) {
207 self.small.warm_up(8);
208 self.medium.warm_up(16);
209 self.large.warm_up(4);
210 }
211
212 pub fn stats(&self) -> BufferPoolStats {
214 BufferPoolStats {
215 small_pool_size: self.small.size(),
216 medium_pool_size: self.medium.size(),
217 large_pool_size: self.large.size(),
218 }
219 }
220}
221
222impl Default for GlobalBufferPools {
223 fn default() -> Self {
224 Self::new()
225 }
226}
227
228#[derive(Debug, Clone)]
229pub struct BufferPoolStats {
230 pub small_pool_size: usize,
231 pub medium_pool_size: usize,
232 pub large_pool_size: usize,
233}
234
235use std::sync::OnceLock;
237
238static GLOBAL_POOLS: OnceLock<GlobalBufferPools> = OnceLock::new();
239
240pub fn global_pools() -> &'static GlobalBufferPools {
242 GLOBAL_POOLS.get_or_init(|| {
243 let pools = GlobalBufferPools::new();
244 pools.warm_up();
245 pools
246 })
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn test_buffer_pool_basic() {
255 let pool = BufferPool::new(1024, 4);
256
257 {
258 let mut buf1 = pool.get();
259 buf1.extend_from_slice(b"hello");
260 assert_eq!(buf1.len(), 5);
261 assert!(buf1.capacity() >= 1024);
262 }
263
264 assert_eq!(pool.size(), 1);
266
267 let mut buf2 = pool.get();
268 assert_eq!(buf2.len(), 0); buf2.extend_from_slice(b"world");
270 assert_eq!(buf2.as_slice(), b"world");
271 }
272
273 #[test]
274 fn test_buffer_pool_max_size() {
275 let pool = BufferPool::new(1024, 2);
276
277 let _buf1 = pool.get();
278 let _buf2 = pool.get();
279 let _buf3 = pool.get();
280
281 drop(_buf1);
283 drop(_buf2);
284 drop(_buf3);
285
286 assert_eq!(pool.size(), 2);
287 }
288
289 #[test]
290 fn test_global_pools() {
291 let pools = global_pools();
292
293 let mut small_buf = pools.get_small();
294 small_buf.extend_from_slice(b"small");
295
296 let mut medium_buf = pools.get_medium();
297 medium_buf.extend_from_slice(b"medium data");
298
299 let mut large_buf = pools.get_large();
300 large_buf.extend_from_slice(b"large data payload");
301
302 assert_eq!(small_buf.as_slice(), b"small");
303 assert_eq!(medium_buf.as_slice(), b"medium data");
304 assert_eq!(large_buf.as_slice(), b"large data payload");
305
306 assert!(small_buf.capacity() >= 1024);
308 assert!(medium_buf.capacity() >= 8192);
309 assert!(large_buf.capacity() >= 65536);
310 }
311
312 #[test]
313 fn test_buffer_selection() {
314 let pools = GlobalBufferPools::new();
315
316 let buf1 = pools.get_buffer(512); let buf2 = pools.get_buffer(4096); let buf3 = pools.get_buffer(32768); assert!(buf1.capacity() >= 1024);
321 assert!(buf2.capacity() >= 8192);
322 assert!(buf3.capacity() >= 65536);
323 }
324}