1mod buffer;
28mod raw_pool_buf_io;
29
30use std::collections::VecDeque;
31use std::ops::Deref;
32use std::ops::DerefMut;
33use std::sync::atomic::AtomicUsize;
34use std::sync::atomic::Ordering;
35
36use crossbeam::queue::SegQueue;
37
38use foundations::telemetry::metrics::metrics;
39use foundations::telemetry::metrics::Gauge;
40
41pub use crate::buffer::*;
42pub use crate::raw_pool_buf_io::*;
43
44#[metrics]
45pub mod buffer_pool {
46 pub fn pool_idle_count(name: &'static str) -> Gauge;
48 pub fn pool_idle_bytes(name: &'static str) -> Gauge;
50 pub fn pool_active_count(name: &'static str) -> Gauge;
52 pub fn consume_buffer_total_bytes() -> Gauge;
57}
58
59#[derive(Debug)]
61pub struct Pool<const S: usize, T: 'static> {
62 queues: [QueueShard<T>; S],
64 next_shard: AtomicUsize,
66}
67
68#[derive(Debug)]
69struct QueueShard<T> {
70 queue: SegQueue<T>,
72 elem_cnt: AtomicUsize,
74 trim: usize,
77 max: usize,
79 name: &'static str,
81}
82
83impl<T> QueueShard<T> {
84 const fn new(trim: usize, max: usize, name: &'static str) -> Self {
85 QueueShard {
86 queue: SegQueue::new(),
87 elem_cnt: AtomicUsize::new(0),
88 trim,
89 max,
90 name,
91 }
92 }
93}
94
95#[derive(Debug)]
97pub struct Pooled<T: Default + Reuse + 'static> {
98 inner: T,
99 pool: &'static QueueShard<T>,
100}
101
102impl<T: Default + Reuse> Pooled<T> {
103 fn new(inner: T, shard: &'static QueueShard<T>) -> Self {
104 buffer_pool::pool_active_count(shard.name).inc();
105 Pooled { inner, pool: shard }
106 }
107
108 pub fn into_inner(mut self) -> T {
109 std::mem::take(&mut self.inner)
110 }
111}
112
113impl<T: Default + Reuse> Drop for Pooled<T> {
114 fn drop(&mut self) {
115 let QueueShard {
116 queue,
117 elem_cnt,
118 trim,
119 max,
120 name,
121 } = self.pool;
122 buffer_pool::pool_active_count(name).dec();
124 if self.inner.reuse(*trim) {
125 if elem_cnt.fetch_add(1, Ordering::Acquire) < *max {
126 buffer_pool::pool_idle_count(name).inc();
129 buffer_pool::pool_idle_bytes(name)
130 .inc_by(self.inner.capacity() as u64);
131 queue.push(std::mem::take(&mut self.inner));
132 return;
133 }
134 elem_cnt.fetch_sub(1, Ordering::Release);
137 }
138 }
140}
141
142macro_rules! array_impl_new_queues {
146 {$n:expr, $t:ident $($ts:ident)*} => {
147 impl<$t: Default + Reuse> Pool<{$n}, $t> {
148 #[allow(dead_code)]
149 pub const fn new(limit: usize, trim: usize, name: &'static str) -> Self {
150 let limit = limit / $n;
151 Pool {
152 queues: [QueueShard::new(trim, limit, name), $(QueueShard::<$ts>::new(trim, limit, name)),*],
153 next_shard: AtomicUsize::new(0),
154 }
155 }
156 }
157
158 array_impl_new_queues!{($n - 1), $($ts)*}
159 };
160 {$n:expr,} => { };
161}
162
163array_impl_new_queues! { 32, T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T }
164
165impl<const S: usize, T: Default + Reuse> Pool<S, T> {
166 pub fn get(&'static self) -> Pooled<T> {
169 let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
170 let shard = &self.queues[shard];
171 let inner = match shard.queue.pop() {
172 Some(el) => {
173 shard.elem_cnt.fetch_sub(1, Ordering::Relaxed);
174 buffer_pool::pool_idle_count(shard.name).dec();
175 buffer_pool::pool_idle_bytes(shard.name)
176 .dec_by(el.capacity() as u64);
177 el
178 },
179 None => Default::default(),
180 };
181
182 Pooled::new(inner, shard)
183 }
184
185 pub fn get_empty(&'static self) -> Pooled<T> {
189 let shard = self.next_shard.load(Ordering::Relaxed) % S;
190 let shard = &self.queues[shard];
191 Pooled::new(Default::default(), shard)
192 }
193
194 pub fn get_with(&'static self, f: impl Fn(&mut T)) -> Pooled<T> {
197 let mut pooled = self.get();
198 f(&mut pooled);
199 pooled
200 }
201
202 pub fn from_owned(&'static self, inner: T) -> Pooled<T> {
203 let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
204 let shard = &self.queues[shard];
205 Pooled::new(inner, shard)
206 }
207}
208
209impl<'a, const S: usize, T: Default + Extend<&'a u8> + Reuse> Pool<S, T> {
210 pub fn with_slice(&'static self, v: &'a [u8]) -> Pooled<T> {
212 let mut buf = self.get();
213 buf.deref_mut().extend(v);
214 buf
215 }
216}
217
218impl<T: Default + Reuse> Deref for Pooled<T> {
219 type Target = T;
220
221 fn deref(&self) -> &Self::Target {
222 &self.inner
223 }
224}
225
226impl<T: Default + Reuse> DerefMut for Pooled<T> {
227 fn deref_mut(&mut self) -> &mut Self::Target {
228 &mut self.inner
229 }
230}
231
232pub trait Reuse {
236 fn reuse(&mut self, trim: usize) -> bool;
237
238 fn capacity(&self) -> usize;
241}
242
243impl Reuse for Vec<u8> {
244 fn reuse(&mut self, trim: usize) -> bool {
245 self.clear();
246 self.shrink_to(trim);
247 self.capacity() > 0
248 }
249
250 fn capacity(&self) -> usize {
251 self.capacity()
252 }
253}
254
255impl Reuse for VecDeque<u8> {
256 fn reuse(&mut self, val: usize) -> bool {
257 self.clear();
258 self.shrink_to(val);
259 self.capacity() > 0
260 }
261
262 fn capacity(&self) -> usize {
263 self.capacity()
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_sharding() {
273 const SHARDS: usize = 3;
274 const MAX_IN_SHARD: usize = 2;
275 const POOL_NAME: &'static str = "test_sharding_pool";
276
277 let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
278 SHARDS * MAX_IN_SHARD,
279 4,
280 POOL_NAME,
281 )));
282
283 let bufs = (0..SHARDS * 4).map(|_| pool.get()).collect::<Vec<_>>();
284
285 for shard in pool.queues.iter() {
286 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
287 }
288 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
289 assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
290 assert_eq!(
291 buffer_pool::pool_active_count(POOL_NAME).get(),
292 bufs.len() as u64
293 );
294
295 for (i, buf) in bufs.iter().enumerate() {
296 assert!(buf.is_empty());
297 assert_eq!(
299 buf.pool as *const _,
300 &pool.queues[i % SHARDS] as *const _
301 );
302 }
303
304 for shard in pool.queues.iter() {
306 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
307 }
308 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
309 assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
310 assert_eq!(
311 buffer_pool::pool_active_count(POOL_NAME).get(),
312 bufs.len() as u64
313 );
314
315 drop(bufs);
319 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
320 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
321
322 let bufs = (0..SHARDS * 4)
324 .map(|_| pool.get_with(|b| b.extend(&[0, 1])))
325 .collect::<Vec<_>>();
326
327 for (i, buf) in bufs.iter().enumerate() {
328 assert_eq!(
330 buf.pool as *const _,
331 &pool.queues[i % SHARDS] as *const _
332 );
333 assert_eq!(&buf[..], &[0, 1]);
335 }
336 assert_eq!(
337 buffer_pool::pool_active_count(POOL_NAME).get(),
338 bufs.len() as u64
339 );
340
341 drop(bufs);
342
343 for shard in pool.queues.iter() {
344 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
345 }
346 assert_eq!(
347 buffer_pool::pool_idle_count(POOL_NAME).get(),
348 (SHARDS * MAX_IN_SHARD) as u64
349 );
350 assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
351 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
352
353 let bufs = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
355
356 for (i, buf) in bufs.iter().enumerate() {
357 assert!(buf.is_empty());
359 assert_eq!(
361 buf.pool as *const _,
362 &pool.queues[i % SHARDS] as *const _
363 );
364 }
365
366 for shard in pool.queues.iter() {
367 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
368 }
369 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), SHARDS as u64);
370 assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
371 assert_eq!(
372 buffer_pool::pool_active_count(POOL_NAME).get(),
373 bufs.len() as u64
374 );
375
376 let bufs2 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
378 for shard in pool.queues.iter() {
379 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
380 }
381 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
382 assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
383 assert_eq!(
384 buffer_pool::pool_active_count(POOL_NAME).get(),
385 (bufs.len() + bufs2.len()) as u64
386 );
387
388 let bufs3 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
390 for shard in pool.queues.iter() {
391 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
392 }
393 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
394 assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
395 assert_eq!(
396 buffer_pool::pool_active_count(POOL_NAME).get(),
397 (bufs.len() + bufs2.len() + bufs3.len()) as u64
398 );
399
400 drop(bufs);
402 for shard in pool.queues.iter() {
403 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
404 }
405 assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), SHARDS as u64);
406 assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
407 assert_eq!(
408 buffer_pool::pool_active_count(POOL_NAME).get(),
409 (bufs2.len() + bufs3.len()) as u64
410 );
411
412 drop(bufs2);
413 for shard in pool.queues.iter() {
414 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
415 }
416 assert_eq!(
417 buffer_pool::pool_idle_count(POOL_NAME).get(),
418 (SHARDS * MAX_IN_SHARD) as u64
419 );
420 assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
421 assert_eq!(
422 buffer_pool::pool_active_count(POOL_NAME).get(),
423 bufs3.len() as u64
424 );
425
426 drop(bufs3);
427 for shard in pool.queues.iter() {
428 assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
430 }
431 assert_eq!(
432 buffer_pool::pool_idle_count(POOL_NAME).get(),
433 (SHARDS * MAX_IN_SHARD) as u64
434 );
435 assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
436 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
437 }
438
439 #[test]
440 fn test_creation() {
441 const SHARDS: usize = 3;
442 const MAX_IN_SHARD: usize = 2;
443 const POOL_NAME: &'static str = "test_creation_pool";
444
445 let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
446 SHARDS * MAX_IN_SHARD,
447 4,
448 POOL_NAME,
449 )));
450
451 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
452
453 {
454 let _buf1 = pool.get();
455 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 1);
456
457 let _buf2 = pool.get_empty();
458 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 2);
459
460 let _buf3 = pool.get_with(|_| ());
461 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 3);
462
463 let _buf4 = pool.from_owned(vec![0, 1, 2, 4]);
464 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 4);
465 }
466
467 assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
468 }
469}