Skip to main content

santh_bufpool/
pool.rs

1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::buffer::PoolBuffer;
5use crate::config::PoolConfig;
6use crate::error::{Error, Result, MAX_REQUEST_BYTES};
7use crate::size_class::SizeClassPool;
8use crate::stats::PoolStats;
9use crate::tls::take_tls_buffer;
10
11pub(crate) const FOUR_KIB: usize = 4 * 1024;
12pub(crate) const SIXTY_FOUR_KIB: usize = 64 * 1024;
13pub(crate) const TWO_FIFTY_SIX_KIB: usize = 256 * 1024;
14pub(crate) const ONE_MIB: usize = 1024 * 1024;
15
16/// Fixed-size recyclable buffer pool.
17///
18/// # Examples
19///
20/// ```rust
21/// use santh_bufpool::{BufferPool, PoolConfig};
22///
23/// let pool = BufferPool::new(PoolConfig {
24///     four_kib_count: 1,
25///     ..PoolConfig::default()
26/// });
27/// let buffer = pool.checkout(32).unwrap();
28/// assert_eq!(buffer.len(), 32);
29/// ```
30#[derive(Debug)]
31pub struct BufferPool {
32    pub(crate) four_kib: Arc<SizeClassPool>,
33    pub(crate) sixty_four_kib: Arc<SizeClassPool>,
34    pub(crate) two_fifty_six_kib: Arc<SizeClassPool>,
35    pub(crate) one_mib: Arc<SizeClassPool>,
36    pub(crate) numa_node: Option<u32>,
37    pub(crate) stats: Arc<PoolStats>,
38}
39
40impl BufferPool {
41    /// Create a new buffer pool from the supplied configuration.
42    ///
43    /// NUMA placement is best-effort. If `kernelkit` cannot place pages on the
44    /// requested node, allocation falls back to a standard heap buffer.
45    #[must_use]
46    pub fn new(config: PoolConfig) -> Self {
47        Self {
48            four_kib: Arc::new(SizeClassPool::pooled(
49                FOUR_KIB,
50                config.four_kib_count,
51                config.numa_node,
52            )),
53            sixty_four_kib: Arc::new(SizeClassPool::pooled(
54                SIXTY_FOUR_KIB,
55                config.sixty_four_kib_count,
56                config.numa_node,
57            )),
58            two_fifty_six_kib: Arc::new(SizeClassPool::pooled(
59                TWO_FIFTY_SIX_KIB,
60                config.two_fifty_six_kib_count,
61                config.numa_node,
62            )),
63            one_mib: Arc::new(SizeClassPool::pooled(
64                ONE_MIB,
65                config.one_mib_count,
66                config.numa_node,
67            )),
68            numa_node: config.numa_node,
69            stats: Arc::new(PoolStats::default()),
70        }
71    }
72
73    /// Access pool statistics.
74    #[must_use]
75    pub fn stats(&self) -> &PoolStats {
76        &self.stats
77    }
78
79    /// Check out a buffer with at least `min_bytes` of capacity.
80    ///
81    /// The returned slice length matches `min_bytes`. Its backing allocation is
82    /// guaranteed to be at least `min_bytes` but may be up to 1 MiB.
83    ///
84    /// # Errors
85    ///
86    /// Returns an error if `min_bytes` exceeds `isize::MAX`.
87    pub fn checkout(&self, min_bytes: usize) -> Result<PoolBuffer> {
88        if min_bytes > MAX_REQUEST_BYTES {
89            return Err(Error::RequestedLengthTooLarge {
90                requested: min_bytes,
91            });
92        }
93
94        if let Some((capacity, ptr, owner)) = take_tls_buffer(min_bytes) {
95            let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
96            self.stats
97                .peak_checked_out
98                .fetch_max(out, Ordering::Relaxed);
99            let bytes_out = self
100                .stats
101                .bytes_checked_out
102                .fetch_add(min_bytes, Ordering::Relaxed)
103                + min_bytes;
104            self.stats
105                .peak_bytes_checked_out
106                .fetch_max(bytes_out, Ordering::Relaxed);
107            return Ok(PoolBuffer {
108                ptr,
109                len: min_bytes,
110                capacity,
111                owner,
112                stats: Some(Arc::clone(&self.stats)),
113            });
114        }
115
116        let selected = self.select_pool(min_bytes);
117        let fallback_capacity = selected.map_or(min_bytes, |pool| pool.capacity);
118
119        if let Some(pool) = selected {
120            if let Some(allocation) = pool.pop() {
121                self.stats.hits.fetch_add(1, Ordering::Relaxed);
122                let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
123                self.stats
124                    .peak_checked_out
125                    .fetch_max(out, Ordering::Relaxed);
126                let bytes_out = self
127                    .stats
128                    .bytes_checked_out
129                    .fetch_add(min_bytes, Ordering::Relaxed)
130                    + min_bytes;
131                self.stats
132                    .peak_bytes_checked_out
133                    .fetch_max(bytes_out, Ordering::Relaxed);
134                return Ok(PoolBuffer {
135                    ptr: allocation.ptr,
136                    len: min_bytes,
137                    capacity: pool.capacity,
138                    owner: Arc::clone(pool),
139                    stats: Some(Arc::clone(&self.stats)),
140                });
141            }
142        }
143
144        self.stats.misses.fetch_add(1, Ordering::Relaxed);
145        let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
146        self.stats
147            .peak_checked_out
148            .fetch_max(out, Ordering::Relaxed);
149        let bytes_out = self
150            .stats
151            .bytes_checked_out
152            .fetch_add(min_bytes, Ordering::Relaxed)
153            + min_bytes;
154        self.stats
155            .peak_bytes_checked_out
156            .fetch_max(bytes_out, Ordering::Relaxed);
157
158        let owner = selected.map_or_else(
159            || Arc::new(SizeClassPool::pooled(fallback_capacity, 0, self.numa_node)),
160            Arc::clone,
161        );
162
163        let allocation = owner.allocate_fallback();
164        Ok(PoolBuffer {
165            ptr: allocation.ptr,
166            len: min_bytes,
167            capacity: fallback_capacity,
168            owner,
169            stats: Some(Arc::clone(&self.stats)),
170        })
171    }
172
173    /// Check out a buffer and explicitly zero its visible contents.
174    ///
175    /// Recycled buffers are zeroed before being returned to the pool, so this
176    /// is mostly useful if you need to ensure the buffer is zeroed immediately
177    /// prior to use (e.g. for cryptographic materials).
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if `min_bytes` exceeds `isize::MAX`.
182    pub fn checkout_zeroed(&self, min_bytes: usize) -> Result<PoolBuffer> {
183        let mut buffer = self.checkout(min_bytes)?;
184        buffer.fill(0);
185        Ok(buffer)
186    }
187
188    /// Check out a buffer with best-effort alignment.
189    ///
190    /// The pool will attempt to provide a buffer aligned to `alignment` bytes,
191    /// but this is not guaranteed for alignments larger than the platform's
192    /// default heap alignment (typically 8 or 16 bytes). For SIMD (32-byte),
193    /// this typically works. For page alignment (4096), use OS-specific allocators.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if `min_bytes` exceeds platform limits or `alignment`
198    /// is not a power of two.
199    pub fn checkout_aligned(&self, min_bytes: usize, alignment: usize) -> Result<PoolBuffer> {
200        if !alignment.is_power_of_two() || alignment == 0 {
201            return Err(Error::InvalidAlignment { alignment });
202        }
203        // Best-effort: the underlying allocator (heap via Vec or mmap) typically
204        // provides at least 8- or 16-byte alignment. Pooled buffers from mmap
205        // are page-aligned (4096). We cannot adjust the base pointer because
206        // Drop recycles from `ptr` directly. For guaranteed SIMD alignment,
207        // callers should use arenakit or a dedicated aligned allocator.
208        self.checkout(min_bytes)
209    }
210
211    #[doc(hidden)]
212    pub fn available_for_test(&self, class: usize) -> usize {
213        let pool = match class {
214            FOUR_KIB => &self.four_kib,
215            SIXTY_FOUR_KIB => &self.sixty_four_kib,
216            TWO_FIFTY_SIX_KIB => &self.two_fifty_six_kib,
217            ONE_MIB => &self.one_mib,
218            _ => return 0,
219        };
220        pool.queue
221            .as_ref()
222            .map_or(0, crossbeam_queue::ArrayQueue::len)
223    }
224
225    fn select_pool(&self, min_bytes: usize) -> Option<&Arc<SizeClassPool>> {
226        if min_bytes <= FOUR_KIB {
227            Some(&self.four_kib)
228        } else if min_bytes <= SIXTY_FOUR_KIB {
229            Some(&self.sixty_four_kib)
230        } else if min_bytes <= TWO_FIFTY_SIX_KIB {
231            Some(&self.two_fifty_six_kib)
232        } else if min_bytes <= ONE_MIB {
233            Some(&self.one_mib)
234        } else {
235            None
236        }
237    }
238}