sentinel_agent_protocol/
buffer_pool.rs

1//! Buffer pooling for message serialization/deserialization.
2//!
3//! This module provides a thread-local buffer pool to reduce allocation overhead
4//! for message processing. Buffers are reused for messages under a size threshold,
5//! while larger messages get fresh allocations.
6//!
7//! # Performance
8//!
9//! - Small messages (< 64KB): Reused from pool, zero allocation
10//! - Large messages (>= 64KB): Fresh allocation (rare case)
11//! - Thread-local: No contention between threads
12
13use bytes::BytesMut;
14use std::cell::RefCell;
15use std::collections::VecDeque;
16
17/// Default buffer size (64 KB).
18pub const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
19
20/// Maximum number of buffers to keep in the pool per thread.
21pub const MAX_POOL_SIZE: usize = 16;
22
23/// Maximum buffer size to pool (larger buffers are dropped).
24pub const MAX_POOLED_BUFFER_SIZE: usize = 256 * 1024;
25
26thread_local! {
27    static BUFFER_POOL: RefCell<BufferPool> = RefCell::new(BufferPool::new());
28}
29
30/// Thread-local buffer pool.
31struct BufferPool {
32    buffers: VecDeque<BytesMut>,
33    allocated: usize,
34    reused: usize,
35    dropped: usize,
36}
37
38impl BufferPool {
39    fn new() -> Self {
40        Self {
41            buffers: VecDeque::with_capacity(MAX_POOL_SIZE),
42            allocated: 0,
43            reused: 0,
44            dropped: 0,
45        }
46    }
47
48    fn get(&mut self, min_capacity: usize) -> BytesMut {
49        // Try to find a buffer with sufficient capacity
50        if let Some(idx) = self.buffers.iter().position(|b| b.capacity() >= min_capacity) {
51            let mut buf = self.buffers.remove(idx).unwrap();
52            buf.clear();
53            self.reused += 1;
54            return buf;
55        }
56
57        // Try to get any buffer and resize if needed
58        if let Some(mut buf) = self.buffers.pop_front() {
59            buf.clear();
60            if min_capacity > buf.capacity() {
61                buf.reserve(min_capacity - buf.capacity());
62            }
63            self.reused += 1;
64            return buf;
65        }
66
67        // Allocate new buffer
68        self.allocated += 1;
69        BytesMut::with_capacity(min_capacity.max(DEFAULT_BUFFER_SIZE))
70    }
71
72    fn put(&mut self, buf: BytesMut) {
73        // Don't pool oversized buffers
74        if buf.capacity() > MAX_POOLED_BUFFER_SIZE {
75            self.dropped += 1;
76            return;
77        }
78
79        // Don't exceed pool size
80        if self.buffers.len() >= MAX_POOL_SIZE {
81            self.dropped += 1;
82            return;
83        }
84
85        self.buffers.push_back(buf);
86    }
87}
88
89/// A pooled buffer that returns to the pool on drop.
90pub struct PooledBuffer {
91    buffer: Option<BytesMut>,
92}
93
94impl PooledBuffer {
95    /// Create a new pooled buffer with at least the given capacity.
96    pub fn new(min_capacity: usize) -> Self {
97        let buffer = BUFFER_POOL.with(|pool| pool.borrow_mut().get(min_capacity));
98        Self {
99            buffer: Some(buffer),
100        }
101    }
102
103    /// Create a pooled buffer with the default capacity.
104    pub fn default_size() -> Self {
105        Self::new(DEFAULT_BUFFER_SIZE)
106    }
107
108    /// Get a mutable reference to the underlying buffer.
109    #[inline]
110    pub fn as_mut(&mut self) -> &mut BytesMut {
111        self.buffer.as_mut().expect("buffer already taken")
112    }
113
114    /// Get an immutable reference to the underlying buffer.
115    #[inline]
116    pub fn as_ref(&self) -> &BytesMut {
117        self.buffer.as_ref().expect("buffer already taken")
118    }
119
120    /// Take the buffer out of the pool wrapper.
121    ///
122    /// The buffer will NOT be returned to the pool when dropped.
123    pub fn take(mut self) -> BytesMut {
124        self.buffer.take().expect("buffer already taken")
125    }
126
127    /// Get the current length of data in the buffer.
128    #[inline]
129    pub fn len(&self) -> usize {
130        self.as_ref().len()
131    }
132
133    /// Check if the buffer is empty.
134    #[inline]
135    pub fn is_empty(&self) -> bool {
136        self.as_ref().is_empty()
137    }
138
139    /// Get the capacity of the buffer.
140    #[inline]
141    pub fn capacity(&self) -> usize {
142        self.as_ref().capacity()
143    }
144
145    /// Clear the buffer, keeping the capacity.
146    pub fn clear(&mut self) {
147        self.as_mut().clear();
148    }
149}
150
151impl Drop for PooledBuffer {
152    fn drop(&mut self) {
153        if let Some(buf) = self.buffer.take() {
154            BUFFER_POOL.with(|pool| pool.borrow_mut().put(buf));
155        }
156    }
157}
158
159impl std::ops::Deref for PooledBuffer {
160    type Target = BytesMut;
161
162    fn deref(&self) -> &Self::Target {
163        self.as_ref()
164    }
165}
166
167impl std::ops::DerefMut for PooledBuffer {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        self.as_mut()
170    }
171}
172
173impl AsRef<[u8]> for PooledBuffer {
174    fn as_ref(&self) -> &[u8] {
175        self.buffer.as_ref().expect("buffer already taken")
176    }
177}
178
179impl AsMut<[u8]> for PooledBuffer {
180    fn as_mut(&mut self) -> &mut [u8] {
181        self.buffer.as_mut().expect("buffer already taken")
182    }
183}
184
185/// Get buffer pool statistics for the current thread.
186pub fn pool_stats() -> PoolStats {
187    BUFFER_POOL.with(|pool| {
188        let pool = pool.borrow();
189        PoolStats {
190            pooled: pool.buffers.len(),
191            allocated: pool.allocated,
192            reused: pool.reused,
193            dropped: pool.dropped,
194        }
195    })
196}
197
198/// Clear the buffer pool for the current thread.
199pub fn clear_pool() {
200    BUFFER_POOL.with(|pool| {
201        pool.borrow_mut().buffers.clear();
202    });
203}
204
205/// Buffer pool statistics.
206#[derive(Debug, Clone, Copy)]
207pub struct PoolStats {
208    /// Number of buffers currently in the pool.
209    pub pooled: usize,
210    /// Total buffers allocated (lifetime).
211    pub allocated: usize,
212    /// Total buffers reused from pool (lifetime).
213    pub reused: usize,
214    /// Total buffers dropped (too large or pool full).
215    pub dropped: usize,
216}
217
218impl PoolStats {
219    /// Calculate the hit rate (reused / (allocated + reused)).
220    pub fn hit_rate(&self) -> f64 {
221        let total = self.allocated + self.reused;
222        if total == 0 {
223            0.0
224        } else {
225            self.reused as f64 / total as f64
226        }
227    }
228}
229
230/// Acquire a buffer from the pool with the given minimum capacity.
231///
232/// This is a convenience function for getting a pooled buffer.
233#[inline]
234pub fn acquire(min_capacity: usize) -> PooledBuffer {
235    PooledBuffer::new(min_capacity)
236}
237
238/// Acquire a buffer with the default size.
239#[inline]
240pub fn acquire_default() -> PooledBuffer {
241    PooledBuffer::default_size()
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use bytes::BufMut;
248
249    #[test]
250    fn test_pooled_buffer_basic() {
251        let mut buf = acquire(1024);
252        assert!(buf.capacity() >= 1024);
253        assert!(buf.is_empty());
254
255        buf.put_slice(b"hello");
256        assert_eq!(buf.len(), 5);
257        assert_eq!(&buf[..], b"hello");
258    }
259
260    #[test]
261    fn test_buffer_reuse() {
262        // Clear pool first
263        clear_pool();
264
265        // Allocate and drop a buffer
266        {
267            let mut buf = acquire(1024);
268            buf.put_slice(b"test data");
269        }
270
271        let stats = pool_stats();
272        assert_eq!(stats.pooled, 1);
273
274        // Get another buffer - should reuse
275        {
276            let buf = acquire(1024);
277            assert!(buf.capacity() >= 1024);
278        }
279
280        let stats = pool_stats();
281        assert!(stats.reused >= 1);
282    }
283
284    #[test]
285    fn test_large_buffer_not_pooled() {
286        clear_pool();
287
288        // Allocate a large buffer
289        {
290            let mut buf = acquire(MAX_POOLED_BUFFER_SIZE + 1);
291            buf.put_slice(b"large data");
292        }
293
294        let stats = pool_stats();
295        assert_eq!(stats.dropped, 1);
296    }
297
298    #[test]
299    fn test_buffer_take() {
300        clear_pool();
301
302        let buf = acquire(1024);
303        let taken = buf.take();
304        assert!(!taken.is_empty() || taken.is_empty()); // Just check it works
305
306        // Buffer should NOT be returned to pool
307        let stats = pool_stats();
308        assert_eq!(stats.pooled, 0);
309    }
310
311    #[test]
312    fn test_pool_stats() {
313        clear_pool();
314
315        // Allocate some buffers
316        let _buf1 = acquire(1024);
317        let _buf2 = acquire(2048);
318
319        let stats = pool_stats();
320        assert_eq!(stats.allocated, 2);
321        assert_eq!(stats.reused, 0);
322        assert_eq!(stats.pooled, 0); // Still in use
323
324        // Drop buffers
325        drop(_buf1);
326        drop(_buf2);
327
328        let stats = pool_stats();
329        assert_eq!(stats.pooled, 2);
330    }
331
332    #[test]
333    fn test_hit_rate() {
334        let stats = PoolStats {
335            pooled: 5,
336            allocated: 10,
337            reused: 90,
338            dropped: 0,
339        };
340
341        assert!((stats.hit_rate() - 0.9).abs() < 0.01);
342    }
343
344    #[test]
345    fn test_pool_max_size() {
346        clear_pool();
347
348        // Create more buffers than the pool can hold
349        let buffers: Vec<_> = (0..MAX_POOL_SIZE + 5)
350            .map(|_| acquire(1024))
351            .collect();
352
353        // Drop all buffers
354        drop(buffers);
355
356        let stats = pool_stats();
357        assert_eq!(stats.pooled, MAX_POOL_SIZE);
358        assert!(stats.dropped >= 5);
359    }
360}