Skip to main content

grapsus_agent_protocol/
buffer_pool.rs

1//! Buffer pooling for message serialization/deserialization.
2//!
3//! This module provides a thread-local buffer pool to reduce allocation overhead
4//! for message processing. Buffers are reused for messages under a size threshold,
5//! while larger messages get fresh allocations.
6//!
7//! # Performance
8//!
9//! - Small messages (< 64KB): Reused from pool, zero allocation
10//! - Large messages (>= 64KB): Fresh allocation (rare case)
11//! - Thread-local: No contention between threads
12
13use bytes::BytesMut;
14use std::cell::RefCell;
15use std::collections::VecDeque;
16
17/// Default buffer size (64 KB).
18pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
19
20/// Maximum number of buffers to keep in the pool per thread.
21pub const MAX_POOL_SIZE: usize = 16;
22
23/// Maximum buffer size to pool (larger buffers are dropped).
24pub const MAX_POOLED_BUFFER_SIZE: usize = 256 * 1024;
25
26thread_local! {
27    static BUFFER_POOL: RefCell<BufferPool> = RefCell::new(BufferPool::new());
28}
29
30/// Thread-local buffer pool.
31struct BufferPool {
32    buffers: VecDeque<BytesMut>,
33    allocated: usize,
34    reused: usize,
35    dropped: usize,
36}
37
38impl BufferPool {
39    fn new() -> Self {
40        Self {
41            buffers: VecDeque::with_capacity(MAX_POOL_SIZE),
42            allocated: 0,
43            reused: 0,
44            dropped: 0,
45        }
46    }
47
48    fn get(&mut self, min_capacity: usize) -> BytesMut {
49        // Try to find a buffer with sufficient capacity
50        if let Some(idx) = self
51            .buffers
52            .iter()
53            .position(|b| b.capacity() >= min_capacity)
54        {
55            let mut buf = self.buffers.remove(idx).unwrap();
56            buf.clear();
57            self.reused += 1;
58            return buf;
59        }
60
61        // Try to get any buffer and resize if needed
62        if let Some(mut buf) = self.buffers.pop_front() {
63            buf.clear();
64            if min_capacity > buf.capacity() {
65                buf.reserve(min_capacity - buf.capacity());
66            }
67            self.reused += 1;
68            return buf;
69        }
70
71        // Allocate new buffer
72        self.allocated += 1;
73        BytesMut::with_capacity(min_capacity.max(DEFAULT_BUFFER_SIZE))
74    }
75
76    fn put(&mut self, buf: BytesMut) {
77        // Don't pool oversized buffers
78        if buf.capacity() > MAX_POOLED_BUFFER_SIZE {
79            self.dropped += 1;
80            return;
81        }
82
83        // Don't exceed pool size
84        if self.buffers.len() >= MAX_POOL_SIZE {
85            self.dropped += 1;
86            return;
87        }
88
89        self.buffers.push_back(buf);
90    }
91}
92
93/// A pooled buffer that returns to the pool on drop.
94pub struct PooledBuffer {
95    buffer: Option<BytesMut>,
96}
97
98impl PooledBuffer {
99    /// Create a new pooled buffer with at least the given capacity.
100    pub fn new(min_capacity: usize) -> Self {
101        let buffer = BUFFER_POOL.with(|pool| pool.borrow_mut().get(min_capacity));
102        Self {
103            buffer: Some(buffer),
104        }
105    }
106
107    /// Create a pooled buffer with the default capacity.
108    pub fn default_size() -> Self {
109        Self::new(DEFAULT_BUFFER_SIZE)
110    }
111
112    /// Get a mutable reference to the underlying buffer.
113    #[inline]
114    #[allow(clippy::should_implement_trait)]
115    pub fn as_mut(&mut self) -> &mut BytesMut {
116        self.buffer.as_mut().expect("buffer already taken")
117    }
118
119    /// Get an immutable reference to the underlying buffer.
120    #[inline]
121    #[allow(clippy::should_implement_trait)]
122    pub fn as_ref(&self) -> &BytesMut {
123        self.buffer.as_ref().expect("buffer already taken")
124    }
125
126    /// Take the buffer out of the pool wrapper.
127    ///
128    /// The buffer will NOT be returned to the pool when dropped.
129    pub fn take(mut self) -> BytesMut {
130        self.buffer.take().expect("buffer already taken")
131    }
132
133    /// Get the current length of data in the buffer.
134    #[inline]
135    pub fn len(&self) -> usize {
136        self.as_ref().len()
137    }
138
139    /// Check if the buffer is empty.
140    #[inline]
141    pub fn is_empty(&self) -> bool {
142        self.as_ref().is_empty()
143    }
144
145    /// Get the capacity of the buffer.
146    #[inline]
147    pub fn capacity(&self) -> usize {
148        self.as_ref().capacity()
149    }
150
151    /// Clear the buffer, keeping the capacity.
152    pub fn clear(&mut self) {
153        self.as_mut().clear();
154    }
155}
156
157impl Drop for PooledBuffer {
158    fn drop(&mut self) {
159        if let Some(buf) = self.buffer.take() {
160            BUFFER_POOL.with(|pool| pool.borrow_mut().put(buf));
161        }
162    }
163}
164
165impl std::ops::Deref for PooledBuffer {
166    type Target = BytesMut;
167
168    fn deref(&self) -> &Self::Target {
169        self.as_ref()
170    }
171}
172
173impl std::ops::DerefMut for PooledBuffer {
174    fn deref_mut(&mut self) -> &mut Self::Target {
175        self.as_mut()
176    }
177}
178
179impl AsRef<[u8]> for PooledBuffer {
180    fn as_ref(&self) -> &[u8] {
181        self.buffer.as_ref().expect("buffer already taken")
182    }
183}
184
185impl AsMut<[u8]> for PooledBuffer {
186    fn as_mut(&mut self) -> &mut [u8] {
187        self.buffer.as_mut().expect("buffer already taken")
188    }
189}
190
191/// Get buffer pool statistics for the current thread.
192pub fn pool_stats() -> PoolStats {
193    BUFFER_POOL.with(|pool| {
194        let pool = pool.borrow();
195        PoolStats {
196            pooled: pool.buffers.len(),
197            allocated: pool.allocated,
198            reused: pool.reused,
199            dropped: pool.dropped,
200        }
201    })
202}
203
204/// Clear the buffer pool for the current thread.
205pub fn clear_pool() {
206    BUFFER_POOL.with(|pool| {
207        pool.borrow_mut().buffers.clear();
208    });
209}
210
211/// Buffer pool statistics.
212#[derive(Debug, Clone, Copy)]
213pub struct PoolStats {
214    /// Number of buffers currently in the pool.
215    pub pooled: usize,
216    /// Total buffers allocated (lifetime).
217    pub allocated: usize,
218    /// Total buffers reused from pool (lifetime).
219    pub reused: usize,
220    /// Total buffers dropped (too large or pool full).
221    pub dropped: usize,
222}
223
224impl PoolStats {
225    /// Calculate the hit rate (reused / (allocated + reused)).
226    pub fn hit_rate(&self) -> f64 {
227        let total = self.allocated + self.reused;
228        if total == 0 {
229            0.0
230        } else {
231            self.reused as f64 / total as f64
232        }
233    }
234}
235
236/// Acquire a buffer from the pool with the given minimum capacity.
237///
238/// This is a convenience function for getting a pooled buffer.
239#[inline]
240pub fn acquire(min_capacity: usize) -> PooledBuffer {
241    PooledBuffer::new(min_capacity)
242}
243
244/// Acquire a buffer with the default size.
245#[inline]
246pub fn acquire_default() -> PooledBuffer {
247    PooledBuffer::default_size()
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use bytes::BufMut;
254
255    #[test]
256    fn test_pooled_buffer_basic() {
257        let mut buf = acquire(1024);
258        assert!(buf.capacity() >= 1024);
259        assert!(buf.is_empty());
260
261        buf.put_slice(b"hello");
262        assert_eq!(buf.len(), 5);
263        assert_eq!(&buf[..], b"hello");
264    }
265
266    #[test]
267    fn test_buffer_reuse() {
268        // Clear pool first
269        clear_pool();
270
271        // Allocate and drop a buffer
272        {
273            let mut buf = acquire(1024);
274            buf.put_slice(b"test data");
275        }
276
277        let stats = pool_stats();
278        assert_eq!(stats.pooled, 1);
279
280        // Get another buffer - should reuse
281        {
282            let buf = acquire(1024);
283            assert!(buf.capacity() >= 1024);
284        }
285
286        let stats = pool_stats();
287        assert!(stats.reused >= 1);
288    }
289
290    #[test]
291    fn test_large_buffer_not_pooled() {
292        clear_pool();
293
294        // Allocate a large buffer
295        {
296            let mut buf = acquire(MAX_POOLED_BUFFER_SIZE + 1);
297            buf.put_slice(b"large data");
298        }
299
300        let stats = pool_stats();
301        assert_eq!(stats.dropped, 1);
302    }
303
304    #[test]
305    fn test_buffer_take() {
306        clear_pool();
307
308        let buf = acquire(1024);
309        let taken = buf.take();
310        assert!(!taken.is_empty() || taken.is_empty()); // Just check it works
311
312        // Buffer should NOT be returned to pool
313        let stats = pool_stats();
314        assert_eq!(stats.pooled, 0);
315    }
316
317    #[test]
318    fn test_pool_stats() {
319        clear_pool();
320
321        // Allocate some buffers
322        let _buf1 = acquire(1024);
323        let _buf2 = acquire(2048);
324
325        let stats = pool_stats();
326        assert_eq!(stats.allocated, 2);
327        assert_eq!(stats.reused, 0);
328        assert_eq!(stats.pooled, 0); // Still in use
329
330        // Drop buffers
331        drop(_buf1);
332        drop(_buf2);
333
334        let stats = pool_stats();
335        assert_eq!(stats.pooled, 2);
336    }
337
338    #[test]
339    fn test_hit_rate() {
340        let stats = PoolStats {
341            pooled: 5,
342            allocated: 10,
343            reused: 90,
344            dropped: 0,
345        };
346
347        assert!((stats.hit_rate() - 0.9).abs() < 0.01);
348    }
349
350    #[test]
351    fn test_pool_max_size() {
352        clear_pool();
353
354        // Create more buffers than the pool can hold
355        let buffers: Vec<_> = (0..MAX_POOL_SIZE + 5).map(|_| acquire(1024)).collect();
356
357        // Drop all buffers
358        drop(buffers);
359
360        let stats = pool_stats();
361        assert_eq!(stats.pooled, MAX_POOL_SIZE);
362        assert!(stats.dropped >= 5);
363    }
364}