sentinel_agent_protocol/
buffer_pool.rs1use bytes::BytesMut;
14use std::cell::RefCell;
15use std::collections::VecDeque;
16
17pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
19
20pub const MAX_POOL_SIZE: usize = 16;
22
23pub const MAX_POOLED_BUFFER_SIZE: usize = 256 * 1024;
25
26thread_local! {
27 static BUFFER_POOL: RefCell<BufferPool> = RefCell::new(BufferPool::new());
28}
29
30struct BufferPool {
32 buffers: VecDeque<BytesMut>,
33 allocated: usize,
34 reused: usize,
35 dropped: usize,
36}
37
38impl BufferPool {
39 fn new() -> Self {
40 Self {
41 buffers: VecDeque::with_capacity(MAX_POOL_SIZE),
42 allocated: 0,
43 reused: 0,
44 dropped: 0,
45 }
46 }
47
48 fn get(&mut self, min_capacity: usize) -> BytesMut {
49 if let Some(idx) = self.buffers.iter().position(|b| b.capacity() >= min_capacity) {
51 let mut buf = self.buffers.remove(idx).unwrap();
52 buf.clear();
53 self.reused += 1;
54 return buf;
55 }
56
57 if let Some(mut buf) = self.buffers.pop_front() {
59 buf.clear();
60 if min_capacity > buf.capacity() {
61 buf.reserve(min_capacity - buf.capacity());
62 }
63 self.reused += 1;
64 return buf;
65 }
66
67 self.allocated += 1;
69 BytesMut::with_capacity(min_capacity.max(DEFAULT_BUFFER_SIZE))
70 }
71
72 fn put(&mut self, buf: BytesMut) {
73 if buf.capacity() > MAX_POOLED_BUFFER_SIZE {
75 self.dropped += 1;
76 return;
77 }
78
79 if self.buffers.len() >= MAX_POOL_SIZE {
81 self.dropped += 1;
82 return;
83 }
84
85 self.buffers.push_back(buf);
86 }
87}
88
89pub struct PooledBuffer {
91 buffer: Option<BytesMut>,
92}
93
94impl PooledBuffer {
95 pub fn new(min_capacity: usize) -> Self {
97 let buffer = BUFFER_POOL.with(|pool| pool.borrow_mut().get(min_capacity));
98 Self {
99 buffer: Some(buffer),
100 }
101 }
102
103 pub fn default_size() -> Self {
105 Self::new(DEFAULT_BUFFER_SIZE)
106 }
107
108 #[inline]
110 pub fn as_mut(&mut self) -> &mut BytesMut {
111 self.buffer.as_mut().expect("buffer already taken")
112 }
113
114 #[inline]
116 pub fn as_ref(&self) -> &BytesMut {
117 self.buffer.as_ref().expect("buffer already taken")
118 }
119
120 pub fn take(mut self) -> BytesMut {
124 self.buffer.take().expect("buffer already taken")
125 }
126
127 #[inline]
129 pub fn len(&self) -> usize {
130 self.as_ref().len()
131 }
132
133 #[inline]
135 pub fn is_empty(&self) -> bool {
136 self.as_ref().is_empty()
137 }
138
139 #[inline]
141 pub fn capacity(&self) -> usize {
142 self.as_ref().capacity()
143 }
144
145 pub fn clear(&mut self) {
147 self.as_mut().clear();
148 }
149}
150
151impl Drop for PooledBuffer {
152 fn drop(&mut self) {
153 if let Some(buf) = self.buffer.take() {
154 BUFFER_POOL.with(|pool| pool.borrow_mut().put(buf));
155 }
156 }
157}
158
159impl std::ops::Deref for PooledBuffer {
160 type Target = BytesMut;
161
162 fn deref(&self) -> &Self::Target {
163 self.as_ref()
164 }
165}
166
167impl std::ops::DerefMut for PooledBuffer {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 self.as_mut()
170 }
171}
172
173impl AsRef<[u8]> for PooledBuffer {
174 fn as_ref(&self) -> &[u8] {
175 self.buffer.as_ref().expect("buffer already taken")
176 }
177}
178
179impl AsMut<[u8]> for PooledBuffer {
180 fn as_mut(&mut self) -> &mut [u8] {
181 self.buffer.as_mut().expect("buffer already taken")
182 }
183}
184
185pub fn pool_stats() -> PoolStats {
187 BUFFER_POOL.with(|pool| {
188 let pool = pool.borrow();
189 PoolStats {
190 pooled: pool.buffers.len(),
191 allocated: pool.allocated,
192 reused: pool.reused,
193 dropped: pool.dropped,
194 }
195 })
196}
197
198pub fn clear_pool() {
200 BUFFER_POOL.with(|pool| {
201 pool.borrow_mut().buffers.clear();
202 });
203}
204
205#[derive(Debug, Clone, Copy)]
207pub struct PoolStats {
208 pub pooled: usize,
210 pub allocated: usize,
212 pub reused: usize,
214 pub dropped: usize,
216}
217
218impl PoolStats {
219 pub fn hit_rate(&self) -> f64 {
221 let total = self.allocated + self.reused;
222 if total == 0 {
223 0.0
224 } else {
225 self.reused as f64 / total as f64
226 }
227 }
228}
229
230#[inline]
234pub fn acquire(min_capacity: usize) -> PooledBuffer {
235 PooledBuffer::new(min_capacity)
236}
237
238#[inline]
240pub fn acquire_default() -> PooledBuffer {
241 PooledBuffer::default_size()
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use bytes::BufMut;
248
249 #[test]
250 fn test_pooled_buffer_basic() {
251 let mut buf = acquire(1024);
252 assert!(buf.capacity() >= 1024);
253 assert!(buf.is_empty());
254
255 buf.put_slice(b"hello");
256 assert_eq!(buf.len(), 5);
257 assert_eq!(&buf[..], b"hello");
258 }
259
260 #[test]
261 fn test_buffer_reuse() {
262 clear_pool();
264
265 {
267 let mut buf = acquire(1024);
268 buf.put_slice(b"test data");
269 }
270
271 let stats = pool_stats();
272 assert_eq!(stats.pooled, 1);
273
274 {
276 let buf = acquire(1024);
277 assert!(buf.capacity() >= 1024);
278 }
279
280 let stats = pool_stats();
281 assert!(stats.reused >= 1);
282 }
283
284 #[test]
285 fn test_large_buffer_not_pooled() {
286 clear_pool();
287
288 {
290 let mut buf = acquire(MAX_POOLED_BUFFER_SIZE + 1);
291 buf.put_slice(b"large data");
292 }
293
294 let stats = pool_stats();
295 assert_eq!(stats.dropped, 1);
296 }
297
298 #[test]
299 fn test_buffer_take() {
300 clear_pool();
301
302 let buf = acquire(1024);
303 let taken = buf.take();
304 assert!(!taken.is_empty() || taken.is_empty()); let stats = pool_stats();
308 assert_eq!(stats.pooled, 0);
309 }
310
311 #[test]
312 fn test_pool_stats() {
313 clear_pool();
314
315 let _buf1 = acquire(1024);
317 let _buf2 = acquire(2048);
318
319 let stats = pool_stats();
320 assert_eq!(stats.allocated, 2);
321 assert_eq!(stats.reused, 0);
322 assert_eq!(stats.pooled, 0); drop(_buf1);
326 drop(_buf2);
327
328 let stats = pool_stats();
329 assert_eq!(stats.pooled, 2);
330 }
331
332 #[test]
333 fn test_hit_rate() {
334 let stats = PoolStats {
335 pooled: 5,
336 allocated: 10,
337 reused: 90,
338 dropped: 0,
339 };
340
341 assert!((stats.hit_rate() - 0.9).abs() < 0.01);
342 }
343
344 #[test]
345 fn test_pool_max_size() {
346 clear_pool();
347
348 let buffers: Vec<_> = (0..MAX_POOL_SIZE + 5)
350 .map(|_| acquire(1024))
351 .collect();
352
353 drop(buffers);
355
356 let stats = pool_stats();
357 assert_eq!(stats.pooled, MAX_POOL_SIZE);
358 assert!(stats.dropped >= 5);
359 }
360}