safer_ring/pool/
buffer_pool.rs

1//! Thread-safe buffer pool implementation.
2
3use std::collections::VecDeque;
4use std::sync::{Arc, Mutex};
5
6use crate::buffer::PinnedBuffer;
7use crate::error::Result;
8
9use super::{PoolInner, PoolStats, PooledBuffer};
10
11/// Thread-safe pool of pre-allocated pinned buffers.
12///
13/// Pre-allocates all buffers during construction and pins them in memory,
14/// making them immediately ready for io_uring operations. This eliminates
15/// allocation overhead and ensures predictable performance.
16///
17/// # Thread Safety
18///
19/// [`BufferPool`] is [`Send`] and [`Sync`] - internal synchronization
20/// is handled through a [`Mutex`] protecting the pool state.
21///
22/// # Memory Management
23///
24/// All buffers are pre-allocated and remain allocated for the pool's lifetime.
25/// This provides predictable memory usage but requires careful sizing.
26///
27/// # Examples
28///
29/// ```rust
30/// use safer_ring::pool::BufferPool;
31///
32/// let pool = BufferPool::new(10, 4096);
33/// if let Some(buffer) = pool.try_get().unwrap() {
34///     // Use buffer for I/O operations
35///     // Buffer automatically returns to pool on drop
36/// }
37/// ```
38pub struct BufferPool {
39    /// Shared pool state protected by mutex
40    inner: Arc<Mutex<PoolInner>>,
41    /// Immutable capacity - stored outside mutex for lock-free access
42    capacity: usize,
43    /// Immutable buffer size - stored outside mutex for lock-free access  
44    buffer_size: usize,
45}
46
47impl BufferPool {
48    /// Create a new buffer pool with specified capacity and buffer size.
49    ///
50    /// All buffers are pre-allocated and pinned during construction.
51    ///
52    /// # Arguments
53    ///
54    /// * `capacity` - Number of buffers to pre-allocate (must be > 0)
55    /// * `buffer_size` - Size of each buffer in bytes (must be > 0)
56    ///
57    /// # Panics
58    ///
59    /// Panics if `capacity` or `buffer_size` is zero.
60    pub fn new(capacity: usize, buffer_size: usize) -> Self {
61        assert!(capacity > 0, "Pool capacity must be greater than zero");
62        assert!(buffer_size > 0, "Buffer size must be greater than zero");
63
64        let mut available = VecDeque::with_capacity(capacity);
65
66        // Pre-allocate all buffers - this is the key optimization
67        for _ in 0..capacity {
68            available.push_back(PinnedBuffer::with_capacity(buffer_size));
69        }
70
71        Self {
72            inner: Arc::new(Mutex::new(PoolInner {
73                available,
74                in_use: 0,
75                total_allocations: 0,
76                failed_allocations: 0,
77            })),
78            capacity,
79            buffer_size,
80        }
81    }
82
83    /// Create a buffer pool with custom buffer initialization.
84    ///
85    /// Allows more control over buffer initialization, such as pre-filling
86    /// buffers with specific patterns.
87    ///
88    /// # Arguments
89    ///
90    /// * `capacity` - Number of buffers to create
91    /// * `buffer_factory` - Closure that creates each buffer
92    ///
93    /// # Panics
94    ///
95    /// Panics if buffers have inconsistent sizes.
96    pub fn with_factory<F>(capacity: usize, mut buffer_factory: F) -> Self
97    where
98        F: FnMut() -> PinnedBuffer<[u8]>,
99    {
100        assert!(capacity > 0, "Pool capacity must be greater than zero");
101
102        let mut available = VecDeque::with_capacity(capacity);
103        let mut buffer_size = 0;
104
105        for i in 0..capacity {
106            let buffer = buffer_factory();
107            if i == 0 {
108                buffer_size = buffer.len();
109            } else {
110                // Ensure consistency - all buffers must be same size
111                assert_eq!(
112                    buffer.len(),
113                    buffer_size,
114                    "All buffers must have the same size"
115                );
116            }
117            available.push_back(buffer);
118        }
119
120        Self {
121            inner: Arc::new(Mutex::new(PoolInner {
122                available,
123                in_use: 0,
124                total_allocations: 0,
125                failed_allocations: 0,
126            })),
127            capacity,
128            buffer_size,
129        }
130    }
131
132    /// Try to get a buffer from the pool.
133    ///
134    /// Returns `Some(PooledBuffer)` if available, `None` if pool is empty.
135    /// Never blocks and never allocates new buffers.
136    pub fn try_get(&self) -> Result<Option<PooledBuffer>> {
137        let mut inner = PoolInner::lock(&self.inner)?;
138
139        if let Some(buffer) = inner.available.pop_front() {
140            inner.in_use += 1;
141            inner.total_allocations += 1;
142
143            Ok(Some(PooledBuffer::new(buffer, Arc::clone(&self.inner))))
144        } else {
145            inner.failed_allocations += 1;
146            Ok(None)
147        }
148    }
149
150    /// Fast path buffer acquisition with minimal locking.
151    ///
152    /// This is an optimized version that uses atomic operations where possible
153    /// to reduce lock contention in high-throughput scenarios.
154    pub fn try_get_fast(&self) -> Result<Option<PooledBuffer>> {
155        // Fast path: check if pool is likely empty without locking
156        {
157            let inner = PoolInner::lock(&self.inner)?;
158            if inner.available.is_empty() {
159                return Ok(None);
160            }
161        }
162
163        // Slow path: actually acquire buffer
164        self.try_get()
165    }
166
167    /// Get a buffer from the pool.
168    ///
169    /// Returns `Some(PooledBuffer)` if available, `None` if pool is empty.
170    /// This is a convenience method that unwraps the Result from try_get.
171    pub fn get(&self) -> Option<PooledBuffer> {
172        self.try_get().unwrap_or(None)
173    }
174
175    /// Get a buffer from the pool, blocking until one becomes available.
176    ///
177    /// This method blocks the current thread until a buffer is returned.
178    /// Uses exponential backoff to reduce CPU usage while waiting.
179    ///
180    /// # Performance Note
181    ///
182    /// For high-performance applications, consider using `try_get()` in a loop
183    /// with your own scheduling strategy rather than this blocking method.
184    pub fn get_blocking(&self) -> Result<PooledBuffer> {
185        let mut backoff_us = 1;
186        const MAX_BACKOFF_US: u64 = 1000; // Cap at 1ms
187
188        loop {
189            if let Some(buffer) = self.try_get()? {
190                return Ok(buffer);
191            }
192
193            // Exponential backoff to reduce CPU usage
194            std::thread::sleep(std::time::Duration::from_micros(backoff_us));
195            backoff_us = (backoff_us * 2).min(MAX_BACKOFF_US);
196        }
197    }
198
199    /// Get the total capacity of the pool.
200    ///
201    /// This is a lock-free operation since capacity is immutable.
202    pub fn capacity(&self) -> usize {
203        self.capacity
204    }
205
206    /// Get the number of available buffers.
207    pub fn available(&self) -> Result<usize> {
208        let inner = PoolInner::lock(&self.inner)?;
209        Ok(inner.available.len())
210    }
211
212    /// Get the number of buffers currently in use.
213    pub fn in_use(&self) -> Result<usize> {
214        let inner = PoolInner::lock(&self.inner)?;
215        Ok(inner.in_use)
216    }
217
218    /// Get the size of each buffer in the pool.
219    ///
220    /// This is a lock-free operation since buffer size is immutable.
221    pub fn buffer_size(&self) -> usize {
222        self.buffer_size
223    }
224
225    /// Get comprehensive statistics about the pool.
226    ///
227    /// Returns detailed information about pool usage and allocation patterns.
228    /// Takes a single lock to ensure all statistics are consistent.
229    pub fn stats(&self) -> PoolStats {
230        let inner = PoolInner::lock(&self.inner).unwrap();
231        let utilization = inner.in_use as f64 / self.capacity as f64;
232        let available = inner.available.len();
233
234        PoolStats {
235            capacity: self.capacity,
236            available,
237            in_use: inner.in_use,
238            buffer_size: self.buffer_size,
239            total_allocations: inner.total_allocations,
240            failed_allocations: inner.failed_allocations,
241            utilization,
242            total_buffers: self.capacity,
243            available_buffers: available,
244            in_use_buffers: inner.in_use,
245        }
246    }
247
248    /// Check if the pool is empty (no available buffers).
249    pub fn is_empty(&self) -> Result<bool> {
250        let inner = PoolInner::lock(&self.inner)?;
251        Ok(inner.available.is_empty())
252    }
253
254    /// Check if the pool is full (all buffers available).
255    pub fn is_full(&self) -> Result<bool> {
256        let inner = PoolInner::lock(&self.inner)?;
257        Ok(inner.available.len() == self.capacity)
258    }
259
260    /// Get a snapshot of current pool state in a single lock operation.
261    ///
262    /// More efficient than calling multiple methods separately when you need
263    /// multiple pieces of information about the pool state.
264    pub fn snapshot(&self) -> Result<(usize, usize, bool, bool)> {
265        let inner = PoolInner::lock(&self.inner)?;
266        let available = inner.available.len();
267        let in_use = inner.in_use;
268        let is_empty = inner.available.is_empty();
269        let is_full = available == self.capacity;
270
271        Ok((available, in_use, is_empty, is_full))
272    }
273}
274
275// SAFETY: BufferPool can be safely sent between threads
276unsafe impl Send for BufferPool {}
277
278// SAFETY: BufferPool can be safely shared between threads
279unsafe impl Sync for BufferPool {}