1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::buffer::PoolBuffer;
5use crate::config::PoolConfig;
6use crate::error::{Error, Result, MAX_REQUEST_BYTES};
7use crate::size_class::SizeClassPool;
8use crate::stats::PoolStats;
9use crate::tls::take_tls_buffer;
10
11pub(crate) const FOUR_KIB: usize = 4 * 1024;
12pub(crate) const SIXTY_FOUR_KIB: usize = 64 * 1024;
13pub(crate) const TWO_FIFTY_SIX_KIB: usize = 256 * 1024;
14pub(crate) const ONE_MIB: usize = 1024 * 1024;
15
16#[derive(Debug)]
31pub struct BufferPool {
32 pub(crate) four_kib: Arc<SizeClassPool>,
33 pub(crate) sixty_four_kib: Arc<SizeClassPool>,
34 pub(crate) two_fifty_six_kib: Arc<SizeClassPool>,
35 pub(crate) one_mib: Arc<SizeClassPool>,
36 pub(crate) numa_node: Option<u32>,
37 pub(crate) stats: Arc<PoolStats>,
38}
39
40impl BufferPool {
41 #[must_use]
46 pub fn new(config: PoolConfig) -> Self {
47 Self {
48 four_kib: Arc::new(SizeClassPool::pooled(
49 FOUR_KIB,
50 config.four_kib_count,
51 config.numa_node,
52 )),
53 sixty_four_kib: Arc::new(SizeClassPool::pooled(
54 SIXTY_FOUR_KIB,
55 config.sixty_four_kib_count,
56 config.numa_node,
57 )),
58 two_fifty_six_kib: Arc::new(SizeClassPool::pooled(
59 TWO_FIFTY_SIX_KIB,
60 config.two_fifty_six_kib_count,
61 config.numa_node,
62 )),
63 one_mib: Arc::new(SizeClassPool::pooled(
64 ONE_MIB,
65 config.one_mib_count,
66 config.numa_node,
67 )),
68 numa_node: config.numa_node,
69 stats: Arc::new(PoolStats::default()),
70 }
71 }
72
73 #[must_use]
75 pub fn stats(&self) -> &PoolStats {
76 &self.stats
77 }
78
79 pub fn checkout(&self, min_bytes: usize) -> Result<PoolBuffer> {
88 if min_bytes > MAX_REQUEST_BYTES {
89 return Err(Error::RequestedLengthTooLarge {
90 requested: min_bytes,
91 });
92 }
93
94 if let Some((capacity, ptr, owner)) = take_tls_buffer(min_bytes) {
95 let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
96 self.stats
97 .peak_checked_out
98 .fetch_max(out, Ordering::Relaxed);
99 let bytes_out = self
100 .stats
101 .bytes_checked_out
102 .fetch_add(min_bytes, Ordering::Relaxed)
103 + min_bytes;
104 self.stats
105 .peak_bytes_checked_out
106 .fetch_max(bytes_out, Ordering::Relaxed);
107 return Ok(PoolBuffer {
108 ptr,
109 len: min_bytes,
110 capacity,
111 owner,
112 stats: Some(Arc::clone(&self.stats)),
113 });
114 }
115
116 let selected = self.select_pool(min_bytes);
117 let fallback_capacity = selected.map_or(min_bytes, |pool| pool.capacity);
118
119 if let Some(pool) = selected {
120 if let Some(allocation) = pool.pop() {
121 self.stats.hits.fetch_add(1, Ordering::Relaxed);
122 let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
123 self.stats
124 .peak_checked_out
125 .fetch_max(out, Ordering::Relaxed);
126 let bytes_out = self
127 .stats
128 .bytes_checked_out
129 .fetch_add(min_bytes, Ordering::Relaxed)
130 + min_bytes;
131 self.stats
132 .peak_bytes_checked_out
133 .fetch_max(bytes_out, Ordering::Relaxed);
134 return Ok(PoolBuffer {
135 ptr: allocation.ptr,
136 len: min_bytes,
137 capacity: pool.capacity,
138 owner: Arc::clone(pool),
139 stats: Some(Arc::clone(&self.stats)),
140 });
141 }
142 }
143
144 self.stats.misses.fetch_add(1, Ordering::Relaxed);
145 let out = self.stats.checked_out.fetch_add(1, Ordering::Relaxed) + 1;
146 self.stats
147 .peak_checked_out
148 .fetch_max(out, Ordering::Relaxed);
149 let bytes_out = self
150 .stats
151 .bytes_checked_out
152 .fetch_add(min_bytes, Ordering::Relaxed)
153 + min_bytes;
154 self.stats
155 .peak_bytes_checked_out
156 .fetch_max(bytes_out, Ordering::Relaxed);
157
158 let owner = selected.map_or_else(
159 || Arc::new(SizeClassPool::pooled(fallback_capacity, 0, self.numa_node)),
160 Arc::clone,
161 );
162
163 let allocation = owner.allocate_fallback();
164 Ok(PoolBuffer {
165 ptr: allocation.ptr,
166 len: min_bytes,
167 capacity: fallback_capacity,
168 owner,
169 stats: Some(Arc::clone(&self.stats)),
170 })
171 }
172
173 pub fn checkout_zeroed(&self, min_bytes: usize) -> Result<PoolBuffer> {
183 let mut buffer = self.checkout(min_bytes)?;
184 buffer.fill(0);
185 Ok(buffer)
186 }
187
188 pub fn checkout_aligned(&self, min_bytes: usize, alignment: usize) -> Result<PoolBuffer> {
200 if !alignment.is_power_of_two() || alignment == 0 {
201 return Err(Error::InvalidAlignment { alignment });
202 }
203 self.checkout(min_bytes)
209 }
210
211 #[doc(hidden)]
212 pub fn available_for_test(&self, class: usize) -> usize {
213 let pool = match class {
214 FOUR_KIB => &self.four_kib,
215 SIXTY_FOUR_KIB => &self.sixty_four_kib,
216 TWO_FIFTY_SIX_KIB => &self.two_fifty_six_kib,
217 ONE_MIB => &self.one_mib,
218 _ => return 0,
219 };
220 pool.queue
221 .as_ref()
222 .map_or(0, crossbeam_queue::ArrayQueue::len)
223 }
224
225 fn select_pool(&self, min_bytes: usize) -> Option<&Arc<SizeClassPool>> {
226 if min_bytes <= FOUR_KIB {
227 Some(&self.four_kib)
228 } else if min_bytes <= SIXTY_FOUR_KIB {
229 Some(&self.sixty_four_kib)
230 } else if min_bytes <= TWO_FIFTY_SIX_KIB {
231 Some(&self.two_fifty_six_kib)
232 } else if min_bytes <= ONE_MIB {
233 Some(&self.one_mib)
234 } else {
235 None
236 }
237 }
238}