safer_ring/pool/buffer_pool.rs
1//! Thread-safe buffer pool implementation.
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Mutex};
5
6use crate::buffer::PinnedBuffer;
7use crate::error::Result;
8
9use super::{PoolInner, PoolStats, PooledBuffer};
10
11/// Thread-safe pool of pre-allocated pinned buffers.
12///
13/// Pre-allocates all buffers during construction and pins them in memory,
14/// making them immediately ready for io_uring operations. This eliminates
15/// allocation overhead and ensures predictable performance.
16///
17/// # Thread Safety
18///
19/// [`BufferPool`] is [`Send`] and [`Sync`] - internal synchronization
20/// is handled through a [`Mutex`] protecting the pool state.
21///
22/// # Memory Management
23///
24/// All buffers are pre-allocated and remain allocated for the pool's lifetime.
25/// This provides predictable memory usage but requires careful sizing.
26///
27/// # Examples
28///
29/// ```rust
30/// use safer_ring::pool::BufferPool;
31///
32/// let pool = BufferPool::new(10, 4096);
33/// if let Some(buffer) = pool.try_get().unwrap() {
34/// // Use buffer for I/O operations
35/// // Buffer automatically returns to pool on drop
36/// }
37/// ```
38pub struct BufferPool {
39 /// Shared pool state protected by mutex
40 inner: Arc<Mutex<PoolInner>>,
41 /// Immutable capacity - stored outside mutex for lock-free access
42 capacity: usize,
43 /// Immutable buffer size - stored outside mutex for lock-free access
44 buffer_size: usize,
45}
46
47impl BufferPool {
48 /// Create a new buffer pool with specified capacity and buffer size.
49 ///
50 /// All buffers are pre-allocated and pinned during construction.
51 ///
52 /// # Arguments
53 ///
54 /// * `capacity` - Number of buffers to pre-allocate (must be > 0)
55 /// * `buffer_size` - Size of each buffer in bytes (must be > 0)
56 ///
57 /// # Panics
58 ///
59 /// Panics if `capacity` or `buffer_size` is zero.
60 pub fn new(capacity: usize, buffer_size: usize) -> Self {
61 assert!(capacity > 0, "Pool capacity must be greater than zero");
62 assert!(buffer_size > 0, "Buffer size must be greater than zero");
63
64 let mut available = VecDeque::with_capacity(capacity);
65
66 // Pre-allocate all buffers - this is the key optimization
67 for _ in 0..capacity {
68 available.push_back(PinnedBuffer::with_capacity(buffer_size));
69 }
70
71 Self {
72 inner: Arc::new(Mutex::new(PoolInner {
73 available,
74 in_use: 0,
75 total_allocations: 0,
76 failed_allocations: 0,
77 })),
78 capacity,
79 buffer_size,
80 }
81 }
82
83 /// Create a buffer pool with custom buffer initialization.
84 ///
85 /// Allows more control over buffer initialization, such as pre-filling
86 /// buffers with specific patterns.
87 ///
88 /// # Arguments
89 ///
90 /// * `capacity` - Number of buffers to create
91 /// * `buffer_factory` - Closure that creates each buffer
92 ///
93 /// # Panics
94 ///
95 /// Panics if buffers have inconsistent sizes.
96 pub fn with_factory<F>(capacity: usize, mut buffer_factory: F) -> Self
97 where
98 F: FnMut() -> PinnedBuffer<[u8]>,
99 {
100 assert!(capacity > 0, "Pool capacity must be greater than zero");
101
102 let mut available = VecDeque::with_capacity(capacity);
103 let mut buffer_size = 0;
104
105 for i in 0..capacity {
106 let buffer = buffer_factory();
107 if i == 0 {
108 buffer_size = buffer.len();
109 } else {
110 // Ensure consistency - all buffers must be same size
111 assert_eq!(
112 buffer.len(),
113 buffer_size,
114 "All buffers must have the same size"
115 );
116 }
117 available.push_back(buffer);
118 }
119
120 Self {
121 inner: Arc::new(Mutex::new(PoolInner {
122 available,
123 in_use: 0,
124 total_allocations: 0,
125 failed_allocations: 0,
126 })),
127 capacity,
128 buffer_size,
129 }
130 }
131
132 /// Try to get a buffer from the pool.
133 ///
134 /// Returns `Some(PooledBuffer)` if available, `None` if pool is empty.
135 /// Never blocks and never allocates new buffers.
136 pub fn try_get(&self) -> Result<Option<PooledBuffer>> {
137 let mut inner = PoolInner::lock(&self.inner)?;
138
139 if let Some(buffer) = inner.available.pop_front() {
140 inner.in_use += 1;
141 inner.total_allocations += 1;
142
143 Ok(Some(PooledBuffer::new(buffer, Arc::clone(&self.inner))))
144 } else {
145 inner.failed_allocations += 1;
146 Ok(None)
147 }
148 }
149
150 /// Fast path buffer acquisition with minimal locking.
151 ///
152 /// This is an optimized version that uses atomic operations where possible
153 /// to reduce lock contention in high-throughput scenarios.
154 pub fn try_get_fast(&self) -> Result<Option<PooledBuffer>> {
155 // Fast path: check if pool is likely empty without locking
156 {
157 let inner = PoolInner::lock(&self.inner)?;
158 if inner.available.is_empty() {
159 return Ok(None);
160 }
161 }
162
163 // Slow path: actually acquire buffer
164 self.try_get()
165 }
166
167 /// Get a buffer from the pool.
168 ///
169 /// Returns `Some(PooledBuffer)` if available, `None` if pool is empty.
170 /// This is a convenience method that unwraps the Result from try_get.
171 pub fn get(&self) -> Option<PooledBuffer> {
172 self.try_get().unwrap_or(None)
173 }
174
175 /// Get a buffer from the pool, blocking until one becomes available.
176 ///
177 /// This method blocks the current thread until a buffer is returned.
178 /// Uses exponential backoff to reduce CPU usage while waiting.
179 ///
180 /// # Performance Note
181 ///
182 /// For high-performance applications, consider using `try_get()` in a loop
183 /// with your own scheduling strategy rather than this blocking method.
184 pub fn get_blocking(&self) -> Result<PooledBuffer> {
185 let mut backoff_us = 1;
186 const MAX_BACKOFF_US: u64 = 1000; // Cap at 1ms
187
188 loop {
189 if let Some(buffer) = self.try_get()? {
190 return Ok(buffer);
191 }
192
193 // Exponential backoff to reduce CPU usage
194 std::thread::sleep(std::time::Duration::from_micros(backoff_us));
195 backoff_us = (backoff_us * 2).min(MAX_BACKOFF_US);
196 }
197 }
198
199 /// Get the total capacity of the pool.
200 ///
201 /// This is a lock-free operation since capacity is immutable.
202 pub fn capacity(&self) -> usize {
203 self.capacity
204 }
205
206 /// Get the number of available buffers.
207 pub fn available(&self) -> Result<usize> {
208 let inner = PoolInner::lock(&self.inner)?;
209 Ok(inner.available.len())
210 }
211
212 /// Get the number of buffers currently in use.
213 pub fn in_use(&self) -> Result<usize> {
214 let inner = PoolInner::lock(&self.inner)?;
215 Ok(inner.in_use)
216 }
217
218 /// Get the size of each buffer in the pool.
219 ///
220 /// This is a lock-free operation since buffer size is immutable.
221 pub fn buffer_size(&self) -> usize {
222 self.buffer_size
223 }
224
225 /// Get comprehensive statistics about the pool.
226 ///
227 /// Returns detailed information about pool usage and allocation patterns.
228 /// Takes a single lock to ensure all statistics are consistent.
229 pub fn stats(&self) -> PoolStats {
230 let inner = PoolInner::lock(&self.inner).unwrap();
231 let utilization = inner.in_use as f64 / self.capacity as f64;
232 let available = inner.available.len();
233
234 PoolStats {
235 capacity: self.capacity,
236 available,
237 in_use: inner.in_use,
238 buffer_size: self.buffer_size,
239 total_allocations: inner.total_allocations,
240 failed_allocations: inner.failed_allocations,
241 utilization,
242 total_buffers: self.capacity,
243 available_buffers: available,
244 in_use_buffers: inner.in_use,
245 }
246 }
247
248 /// Check if the pool is empty (no available buffers).
249 pub fn is_empty(&self) -> Result<bool> {
250 let inner = PoolInner::lock(&self.inner)?;
251 Ok(inner.available.is_empty())
252 }
253
254 /// Check if the pool is full (all buffers available).
255 pub fn is_full(&self) -> Result<bool> {
256 let inner = PoolInner::lock(&self.inner)?;
257 Ok(inner.available.len() == self.capacity)
258 }
259
260 /// Get a snapshot of current pool state in a single lock operation.
261 ///
262 /// More efficient than calling multiple methods separately when you need
263 /// multiple pieces of information about the pool state.
264 pub fn snapshot(&self) -> Result<(usize, usize, bool, bool)> {
265 let inner = PoolInner::lock(&self.inner)?;
266 let available = inner.available.len();
267 let in_use = inner.in_use;
268 let is_empty = inner.available.is_empty();
269 let is_full = available == self.capacity;
270
271 Ok((available, in_use, is_empty, is_full))
272 }
273}
274
275// SAFETY: BufferPool can be safely sent between threads
276unsafe impl Send for BufferPool {}
277
278// SAFETY: BufferPool can be safely shared between threads
279unsafe impl Sync for BufferPool {}