grapsus_agent_protocol/
buffer_pool.rs1use bytes::BytesMut;
14use std::cell::RefCell;
15use std::collections::VecDeque;
16
17pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
19
20pub const MAX_POOL_SIZE: usize = 16;
22
23pub const MAX_POOLED_BUFFER_SIZE: usize = 256 * 1024;
25
26thread_local! {
27 static BUFFER_POOL: RefCell<BufferPool> = RefCell::new(BufferPool::new());
28}
29
30struct BufferPool {
32 buffers: VecDeque<BytesMut>,
33 allocated: usize,
34 reused: usize,
35 dropped: usize,
36}
37
38impl BufferPool {
39 fn new() -> Self {
40 Self {
41 buffers: VecDeque::with_capacity(MAX_POOL_SIZE),
42 allocated: 0,
43 reused: 0,
44 dropped: 0,
45 }
46 }
47
48 fn get(&mut self, min_capacity: usize) -> BytesMut {
49 if let Some(idx) = self
51 .buffers
52 .iter()
53 .position(|b| b.capacity() >= min_capacity)
54 {
55 let mut buf = self.buffers.remove(idx).unwrap();
56 buf.clear();
57 self.reused += 1;
58 return buf;
59 }
60
61 if let Some(mut buf) = self.buffers.pop_front() {
63 buf.clear();
64 if min_capacity > buf.capacity() {
65 buf.reserve(min_capacity - buf.capacity());
66 }
67 self.reused += 1;
68 return buf;
69 }
70
71 self.allocated += 1;
73 BytesMut::with_capacity(min_capacity.max(DEFAULT_BUFFER_SIZE))
74 }
75
76 fn put(&mut self, buf: BytesMut) {
77 if buf.capacity() > MAX_POOLED_BUFFER_SIZE {
79 self.dropped += 1;
80 return;
81 }
82
83 if self.buffers.len() >= MAX_POOL_SIZE {
85 self.dropped += 1;
86 return;
87 }
88
89 self.buffers.push_back(buf);
90 }
91}
92
93pub struct PooledBuffer {
95 buffer: Option<BytesMut>,
96}
97
98impl PooledBuffer {
99 pub fn new(min_capacity: usize) -> Self {
101 let buffer = BUFFER_POOL.with(|pool| pool.borrow_mut().get(min_capacity));
102 Self {
103 buffer: Some(buffer),
104 }
105 }
106
107 pub fn default_size() -> Self {
109 Self::new(DEFAULT_BUFFER_SIZE)
110 }
111
112 #[inline]
114 #[allow(clippy::should_implement_trait)]
115 pub fn as_mut(&mut self) -> &mut BytesMut {
116 self.buffer.as_mut().expect("buffer already taken")
117 }
118
119 #[inline]
121 #[allow(clippy::should_implement_trait)]
122 pub fn as_ref(&self) -> &BytesMut {
123 self.buffer.as_ref().expect("buffer already taken")
124 }
125
126 pub fn take(mut self) -> BytesMut {
130 self.buffer.take().expect("buffer already taken")
131 }
132
133 #[inline]
135 pub fn len(&self) -> usize {
136 self.as_ref().len()
137 }
138
139 #[inline]
141 pub fn is_empty(&self) -> bool {
142 self.as_ref().is_empty()
143 }
144
145 #[inline]
147 pub fn capacity(&self) -> usize {
148 self.as_ref().capacity()
149 }
150
151 pub fn clear(&mut self) {
153 self.as_mut().clear();
154 }
155}
156
157impl Drop for PooledBuffer {
158 fn drop(&mut self) {
159 if let Some(buf) = self.buffer.take() {
160 BUFFER_POOL.with(|pool| pool.borrow_mut().put(buf));
161 }
162 }
163}
164
165impl std::ops::Deref for PooledBuffer {
166 type Target = BytesMut;
167
168 fn deref(&self) -> &Self::Target {
169 self.as_ref()
170 }
171}
172
173impl std::ops::DerefMut for PooledBuffer {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 self.as_mut()
176 }
177}
178
179impl AsRef<[u8]> for PooledBuffer {
180 fn as_ref(&self) -> &[u8] {
181 self.buffer.as_ref().expect("buffer already taken")
182 }
183}
184
185impl AsMut<[u8]> for PooledBuffer {
186 fn as_mut(&mut self) -> &mut [u8] {
187 self.buffer.as_mut().expect("buffer already taken")
188 }
189}
190
191pub fn pool_stats() -> PoolStats {
193 BUFFER_POOL.with(|pool| {
194 let pool = pool.borrow();
195 PoolStats {
196 pooled: pool.buffers.len(),
197 allocated: pool.allocated,
198 reused: pool.reused,
199 dropped: pool.dropped,
200 }
201 })
202}
203
204pub fn clear_pool() {
206 BUFFER_POOL.with(|pool| {
207 pool.borrow_mut().buffers.clear();
208 });
209}
210
211#[derive(Debug, Clone, Copy)]
213pub struct PoolStats {
214 pub pooled: usize,
216 pub allocated: usize,
218 pub reused: usize,
220 pub dropped: usize,
222}
223
224impl PoolStats {
225 pub fn hit_rate(&self) -> f64 {
227 let total = self.allocated + self.reused;
228 if total == 0 {
229 0.0
230 } else {
231 self.reused as f64 / total as f64
232 }
233 }
234}
235
236#[inline]
240pub fn acquire(min_capacity: usize) -> PooledBuffer {
241 PooledBuffer::new(min_capacity)
242}
243
244#[inline]
246pub fn acquire_default() -> PooledBuffer {
247 PooledBuffer::default_size()
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use bytes::BufMut;
254
255 #[test]
256 fn test_pooled_buffer_basic() {
257 let mut buf = acquire(1024);
258 assert!(buf.capacity() >= 1024);
259 assert!(buf.is_empty());
260
261 buf.put_slice(b"hello");
262 assert_eq!(buf.len(), 5);
263 assert_eq!(&buf[..], b"hello");
264 }
265
266 #[test]
267 fn test_buffer_reuse() {
268 clear_pool();
270
271 {
273 let mut buf = acquire(1024);
274 buf.put_slice(b"test data");
275 }
276
277 let stats = pool_stats();
278 assert_eq!(stats.pooled, 1);
279
280 {
282 let buf = acquire(1024);
283 assert!(buf.capacity() >= 1024);
284 }
285
286 let stats = pool_stats();
287 assert!(stats.reused >= 1);
288 }
289
290 #[test]
291 fn test_large_buffer_not_pooled() {
292 clear_pool();
293
294 {
296 let mut buf = acquire(MAX_POOLED_BUFFER_SIZE + 1);
297 buf.put_slice(b"large data");
298 }
299
300 let stats = pool_stats();
301 assert_eq!(stats.dropped, 1);
302 }
303
304 #[test]
305 fn test_buffer_take() {
306 clear_pool();
307
308 let buf = acquire(1024);
309 let taken = buf.take();
310 assert!(!taken.is_empty() || taken.is_empty()); let stats = pool_stats();
314 assert_eq!(stats.pooled, 0);
315 }
316
317 #[test]
318 fn test_pool_stats() {
319 clear_pool();
320
321 let _buf1 = acquire(1024);
323 let _buf2 = acquire(2048);
324
325 let stats = pool_stats();
326 assert_eq!(stats.allocated, 2);
327 assert_eq!(stats.reused, 0);
328 assert_eq!(stats.pooled, 0); drop(_buf1);
332 drop(_buf2);
333
334 let stats = pool_stats();
335 assert_eq!(stats.pooled, 2);
336 }
337
338 #[test]
339 fn test_hit_rate() {
340 let stats = PoolStats {
341 pooled: 5,
342 allocated: 10,
343 reused: 90,
344 dropped: 0,
345 };
346
347 assert!((stats.hit_rate() - 0.9).abs() < 0.01);
348 }
349
350 #[test]
351 fn test_pool_max_size() {
352 clear_pool();
353
354 let buffers: Vec<_> = (0..MAX_POOL_SIZE + 5).map(|_| acquire(1024)).collect();
356
357 drop(buffers);
359
360 let stats = pool_stats();
361 assert_eq!(stats.pooled, MAX_POOL_SIZE);
362 assert!(stats.dropped >= 5);
363 }
364}