buffer_pool/
lib.rs

1// Copyright (C) 2025, Cloudflare, Inc.
2// All rights reserved.
3//
4// Redistribution and use in source and binary forms, with or without
5// modification, are permitted provided that the following conditions are
6// met:
7//
8//     * Redistributions of source code must retain the above copyright notice,
9//       this list of conditions and the following disclaimer.
10//
11//     * Redistributions in binary form must reproduce the above copyright
12//       notice, this list of conditions and the following disclaimer in the
13//       documentation and/or other materials provided with the distribution.
14//
15// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
16// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
17// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
19// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
20// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
22// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
23// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
24// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27mod buffer;
28mod raw_pool_buf_io;
29
30use std::collections::VecDeque;
31use std::ops::Deref;
32use std::ops::DerefMut;
33use std::sync::atomic::AtomicUsize;
34use std::sync::atomic::Ordering;
35
36use crossbeam::queue::SegQueue;
37
38use foundations::telemetry::metrics::metrics;
39use foundations::telemetry::metrics::Gauge;
40
41pub use crate::buffer::*;
42pub use crate::raw_pool_buf_io::*;
43
44#[metrics]
45pub mod buffer_pool {
46    /// Number of objects available for reuse in the pool.
47    pub fn pool_idle_count(name: &'static str) -> Gauge;
48    /// Memory footprint of objects currently in the pool.
49    pub fn pool_idle_bytes(name: &'static str) -> Gauge;
50    /// Number of objects currently active and in-use.
51    pub fn pool_active_count(name: &'static str) -> Gauge;
52    /// Total number of bytes allocated across all `ConsumeBuffer` objects.
53    ///
54    /// We're not able to track this with better granularity because
55    /// the ConsumeBuffers may be resized, and they don't know their pools.
56    pub fn consume_buffer_total_bytes() -> Gauge;
57}
58
59/// A sharded pool of elements.
60#[derive(Debug)]
61pub struct Pool<const S: usize, T: 'static> {
62    /// List of distinct shards to reduce contention.
63    queues: [QueueShard<T>; S],
64    /// The index of the next shard to use, in round-robin order.
65    next_shard: AtomicUsize,
66}
67
68#[derive(Debug)]
69struct QueueShard<T> {
70    /// The inner stack of pooled values.
71    queue: SegQueue<T>,
72    /// The number of elements currently stored in this shard.
73    elem_cnt: AtomicUsize,
74    /// The value to use when calling [`Reuse::reuse`]. Typically the capacity
75    /// to keep in a reused buffer.
76    trim: usize,
77    /// The max number of values to keep in the shard.
78    max: usize,
79    /// Name of the pool, for metrics.
80    name: &'static str,
81}
82
83impl<T> QueueShard<T> {
84    const fn new(trim: usize, max: usize, name: &'static str) -> Self {
85        QueueShard {
86            queue: SegQueue::new(),
87            elem_cnt: AtomicUsize::new(0),
88            trim,
89            max,
90            name,
91        }
92    }
93}
94
95/// A value borrowed from the [`Pool`] that can be dereferenced to `T`.
96#[derive(Debug)]
97pub struct Pooled<T: Default + Reuse + 'static> {
98    inner: T,
99    pool: &'static QueueShard<T>,
100}
101
102impl<T: Default + Reuse> Pooled<T> {
103    fn new(inner: T, shard: &'static QueueShard<T>) -> Self {
104        buffer_pool::pool_active_count(shard.name).inc();
105        Pooled { inner, pool: shard }
106    }
107
108    pub fn into_inner(mut self) -> T {
109        std::mem::take(&mut self.inner)
110    }
111}
112
113impl<T: Default + Reuse> Drop for Pooled<T> {
114    fn drop(&mut self) {
115        let QueueShard {
116            queue,
117            elem_cnt,
118            trim,
119            max,
120            name,
121        } = self.pool;
122        // The memory associated with this object is no longer live.
123        buffer_pool::pool_active_count(name).dec();
124        if self.inner.reuse(*trim) {
125            if elem_cnt.fetch_add(1, Ordering::Acquire) < *max {
126                // If returning the element to the queue would not exceed max
127                // number of elements, return it
128                buffer_pool::pool_idle_count(name).inc();
129                buffer_pool::pool_idle_bytes(name)
130                    .inc_by(self.inner.capacity() as u64);
131                queue.push(std::mem::take(&mut self.inner));
132                return;
133            }
134            // There was no room for the buffer, return count to previous value
135            // and drop
136            elem_cnt.fetch_sub(1, Ordering::Release);
137        }
138        // If item did not qualify for return, drop it
139    }
140}
141
142// Currently there is no way to const init an array that does not implement
143// Copy, so this macro generates initializators for up to 32 shards. If
144// const Default is ever stabilized this will all go away.
145macro_rules! array_impl_new_queues {
146    {$n:expr, $t:ident $($ts:ident)*} => {
147        impl<$t: Default + Reuse> Pool<{$n}, $t> {
148            #[allow(dead_code)]
149            pub const fn new(limit: usize, trim: usize, name: &'static str) -> Self {
150                let limit = limit / $n;
151                Pool {
152                    queues: [QueueShard::new(trim, limit, name), $(QueueShard::<$ts>::new(trim, limit, name)),*],
153                    next_shard: AtomicUsize::new(0),
154                }
155            }
156        }
157
158        array_impl_new_queues!{($n - 1), $($ts)*}
159    };
160    {$n:expr,} => {  };
161}
162
163array_impl_new_queues! { 32, T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T T }
164
165impl<const S: usize, T: Default + Reuse> Pool<S, T> {
166    /// Get a value from the pool, or create a new default value if the
167    /// assigned shard is currently empty.
168    pub fn get(&'static self) -> Pooled<T> {
169        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
170        let shard = &self.queues[shard];
171        let inner = match shard.queue.pop() {
172            Some(el) => {
173                shard.elem_cnt.fetch_sub(1, Ordering::Relaxed);
174                buffer_pool::pool_idle_count(shard.name).dec();
175                buffer_pool::pool_idle_bytes(shard.name)
176                    .dec_by(el.capacity() as u64);
177                el
178            },
179            None => Default::default(),
180        };
181
182        Pooled::new(inner, shard)
183    }
184
185    /// Create a new default value assigned for a pool, if it is ends up
186    /// being expanded and eligible for reuse it will return to the pool,
187    /// otherwise it will end up being dropped.
188    pub fn get_empty(&'static self) -> Pooled<T> {
189        let shard = self.next_shard.load(Ordering::Relaxed) % S;
190        let shard = &self.queues[shard];
191        Pooled::new(Default::default(), shard)
192    }
193
194    /// Get a value from the pool and apply the provided transformation on
195    /// it before returning.
196    pub fn get_with(&'static self, f: impl Fn(&mut T)) -> Pooled<T> {
197        let mut pooled = self.get();
198        f(&mut pooled);
199        pooled
200    }
201
202    pub fn from_owned(&'static self, inner: T) -> Pooled<T> {
203        let shard = self.next_shard.fetch_add(1, Ordering::Relaxed) % S;
204        let shard = &self.queues[shard];
205        Pooled::new(inner, shard)
206    }
207}
208
209impl<'a, const S: usize, T: Default + Extend<&'a u8> + Reuse> Pool<S, T> {
210    /// Get a value from the pool and extend it with the provided slice.
211    pub fn with_slice(&'static self, v: &'a [u8]) -> Pooled<T> {
212        let mut buf = self.get();
213        buf.deref_mut().extend(v);
214        buf
215    }
216}
217
218impl<T: Default + Reuse> Deref for Pooled<T> {
219    type Target = T;
220
221    fn deref(&self) -> &Self::Target {
222        &self.inner
223    }
224}
225
226impl<T: Default + Reuse> DerefMut for Pooled<T> {
227    fn deref_mut(&mut self) -> &mut Self::Target {
228        &mut self.inner
229    }
230}
231
232/// A trait that prepares an item to be returned to the pool. For example
233/// clearing it. `true` is returned if the item should be returned to the pool,
234/// `false` if it should be dropped.
235pub trait Reuse {
236    fn reuse(&mut self, trim: usize) -> bool;
237
238    /// Returns the capacity of the object in bytes, to allow for more precise
239    /// tracking.
240    fn capacity(&self) -> usize;
241}
242
243impl Reuse for Vec<u8> {
244    fn reuse(&mut self, trim: usize) -> bool {
245        self.clear();
246        self.shrink_to(trim);
247        self.capacity() > 0
248    }
249
250    fn capacity(&self) -> usize {
251        self.capacity()
252    }
253}
254
255impl Reuse for VecDeque<u8> {
256    fn reuse(&mut self, val: usize) -> bool {
257        self.clear();
258        self.shrink_to(val);
259        self.capacity() > 0
260    }
261
262    fn capacity(&self) -> usize {
263        self.capacity()
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_sharding() {
273        const SHARDS: usize = 3;
274        const MAX_IN_SHARD: usize = 2;
275        const POOL_NAME: &'static str = "test_sharding_pool";
276
277        let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
278            SHARDS * MAX_IN_SHARD,
279            4,
280            POOL_NAME,
281        )));
282
283        let bufs = (0..SHARDS * 4).map(|_| pool.get()).collect::<Vec<_>>();
284
285        for shard in pool.queues.iter() {
286            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
287        }
288        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
289        assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
290        assert_eq!(
291            buffer_pool::pool_active_count(POOL_NAME).get(),
292            bufs.len() as u64
293        );
294
295        for (i, buf) in bufs.iter().enumerate() {
296            assert!(buf.is_empty());
297            // Check the buffer is sharded properly.
298            assert_eq!(
299                buf.pool as *const _,
300                &pool.queues[i % SHARDS] as *const _
301            );
302        }
303
304        // Shards are still empty.
305        for shard in pool.queues.iter() {
306            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
307        }
308        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
309        assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
310        assert_eq!(
311            buffer_pool::pool_active_count(POOL_NAME).get(),
312            bufs.len() as u64
313        );
314
315        // Now drop the buffers, they will not go into the pool because they have
316        // no capacity, so reuse returns false. What is the point in
317        // pooling empty buffers?
318        drop(bufs);
319        assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
320        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
321
322        // Get buffers with capacity next.
323        let bufs = (0..SHARDS * 4)
324            .map(|_| pool.get_with(|b| b.extend(&[0, 1])))
325            .collect::<Vec<_>>();
326
327        for (i, buf) in bufs.iter().enumerate() {
328            // Check the buffer is sharded properly.
329            assert_eq!(
330                buf.pool as *const _,
331                &pool.queues[i % SHARDS] as *const _
332            );
333            // Check that the buffer was properly extended
334            assert_eq!(&buf[..], &[0, 1]);
335        }
336        assert_eq!(
337            buffer_pool::pool_active_count(POOL_NAME).get(),
338            bufs.len() as u64
339        );
340
341        drop(bufs);
342
343        for shard in pool.queues.iter() {
344            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
345        }
346        assert_eq!(
347            buffer_pool::pool_idle_count(POOL_NAME).get(),
348            (SHARDS * MAX_IN_SHARD) as u64
349        );
350        assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
351        assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
352
353        // Now get buffers again, this time they should come from the pool.
354        let bufs = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
355
356        for (i, buf) in bufs.iter().enumerate() {
357            // Check that the buffer was properly cleared.
358            assert!(buf.is_empty());
359            // Check the buffer is sharded properly.
360            assert_eq!(
361                buf.pool as *const _,
362                &pool.queues[i % SHARDS] as *const _
363            );
364        }
365
366        for shard in pool.queues.iter() {
367            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
368        }
369        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), SHARDS as u64);
370        assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
371        assert_eq!(
372            buffer_pool::pool_active_count(POOL_NAME).get(),
373            bufs.len() as u64
374        );
375
376        // Get more buffers from the pool.
377        let bufs2 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
378        for shard in pool.queues.iter() {
379            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
380        }
381        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
382        assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
383        assert_eq!(
384            buffer_pool::pool_active_count(POOL_NAME).get(),
385            (bufs.len() + bufs2.len()) as u64
386        );
387
388        // Get even more buffers.
389        let bufs3 = (0..SHARDS).map(|_| pool.get()).collect::<Vec<_>>();
390        for shard in pool.queues.iter() {
391            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 0);
392        }
393        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), 0);
394        assert_eq!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
395        assert_eq!(
396            buffer_pool::pool_active_count(POOL_NAME).get(),
397            (bufs.len() + bufs2.len() + bufs3.len()) as u64
398        );
399
400        // Now begin dropping.
401        drop(bufs);
402        for shard in pool.queues.iter() {
403            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), 1);
404        }
405        assert_eq!(buffer_pool::pool_idle_count(POOL_NAME).get(), SHARDS as u64);
406        assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
407        assert_eq!(
408            buffer_pool::pool_active_count(POOL_NAME).get(),
409            (bufs2.len() + bufs3.len()) as u64
410        );
411
412        drop(bufs2);
413        for shard in pool.queues.iter() {
414            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
415        }
416        assert_eq!(
417            buffer_pool::pool_idle_count(POOL_NAME).get(),
418            (SHARDS * MAX_IN_SHARD) as u64
419        );
420        assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
421        assert_eq!(
422            buffer_pool::pool_active_count(POOL_NAME).get(),
423            bufs3.len() as u64
424        );
425
426        drop(bufs3);
427        for shard in pool.queues.iter() {
428            // Can't get over limit.
429            assert_eq!(shard.elem_cnt.load(Ordering::Relaxed), MAX_IN_SHARD);
430        }
431        assert_eq!(
432            buffer_pool::pool_idle_count(POOL_NAME).get(),
433            (SHARDS * MAX_IN_SHARD) as u64
434        );
435        assert_ne!(buffer_pool::pool_idle_bytes(POOL_NAME).get(), 0);
436        assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
437    }
438
439    #[test]
440    fn test_creation() {
441        const SHARDS: usize = 3;
442        const MAX_IN_SHARD: usize = 2;
443        const POOL_NAME: &'static str = "test_creation_pool";
444
445        let pool = Box::leak(Box::new(Pool::<SHARDS, Vec<u8>>::new(
446            SHARDS * MAX_IN_SHARD,
447            4,
448            POOL_NAME,
449        )));
450
451        assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
452
453        {
454            let _buf1 = pool.get();
455            assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 1);
456
457            let _buf2 = pool.get_empty();
458            assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 2);
459
460            let _buf3 = pool.get_with(|_| ());
461            assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 3);
462
463            let _buf4 = pool.from_owned(vec![0, 1, 2, 4]);
464            assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 4);
465        }
466
467        assert_eq!(buffer_pool::pool_active_count(POOL_NAME).get(), 0);
468    }
469}