Skip to main content

tracing_cache/
object_pool.rs

1//! Per-type buffer pool with try-only locking semantics.
2//!
3//! [`ObjectPool`] holds N shards (`Vec<Arc<Pool<T>>>`); the hot path
4//! picks a shard via the same per-thread key the rest of the cache
5//! uses, then attempts to pop a pre-reset `Box<T>` from that shard's
6//! `Mutex<Vec<Box<T>>>`.
7//!
8//! The pool starts **empty**.  It grows under load:
9//!   * acquire: `try_lock` the shard.  Success → pop the next ready
10//!     box; or, if the shard is empty, allocate `Box::new(T::default())`.
11//!     `try_lock` fail (contended) → still allocate a fresh box.
12//!   * Every `ReuseRef` is attached to its source shard regardless of
13//!     whether the acquire try-lock succeeded.  On drop the ref tries
14//!     to hand its box back; that's how the pool fills up.
15//!
16//! On `ReuseRef::drop`:
17//!   * `try_lock` the source shard.  Success + room (`len < capacity`)
18//!     → `reset()` and push back.  Full or contended → drop the box.
19//!
20//! No thread ever blocks on the pool's lock.
21
22use std::sync::{Arc, Mutex};
23
24use crate::thread_state::ensure_thread_shard_key;
25
26/// State that can be reset to a pool-ready empty form.
27// `pub` because it appears as a bound on `ReuseRef<T>`, which is
28// part of the public surface (`SpanRecord.events`).
29#[doc(hidden)]
30pub trait Resettable {
31    fn reset(&mut self);
32}
33
34/// One shard of the pool.  Holds up to `capacity` ready-to-reuse boxed
35/// items.  Storing `Box<T>` (not `T`) means the heap allocation itself
36/// is recycled — `acquire` is `Vec::pop` returning the prior Box, and
37/// `return` is `Vec::push` putting it back; no `Box::new` on the hot
38/// path after warmup.
39pub(crate) struct Pool<T: Resettable + Default + Send + 'static> {
40    items: Mutex<Vec<Box<T>>>,
41    capacity: usize,
42}
43
44impl<T: Resettable + Default + Send + 'static> Pool<T> {
45    fn new(capacity: usize) -> Self {
46        Self {
47            // Don't pre-allocate.  The pool grows from empty as
48            // ReuseRefs drop and hand boxes back.
49            items: Mutex::new(Vec::new()),
50            capacity,
51        }
52    }
53
54    /// Try to take a ready boxed item.  Always returns a `Box<T>` —
55    /// pops from the shard if its `try_lock` succeeds and an item is
56    /// available, otherwise allocates `Box::new(T::default())`.  In
57    /// either case the resulting `ReuseRef` is attached and will
58    /// attempt to hand the box back on drop, which is how the pool
59    /// grows.
60    fn try_take(self: &Arc<Self>) -> Box<T> {
61        if let Ok(mut guard) = self.items.try_lock()
62            && let Some(b) = guard.pop()
63        {
64            return b;
65        }
66        Box::new(T::default())
67    }
68
69    /// Push a (reset) boxed value back into the shard, dropping it if
70    /// there's no room or if the lock is contended.  Drop is what frees
71    /// the Box's heap allocation.
72    fn try_return(&self, value: Box<T>) {
73        match self.items.try_lock() {
74            Ok(mut guard) => {
75                if guard.len() < self.capacity {
76                    guard.push(value);
77                }
78                // else: full — Box drops here, freeing the allocation
79            }
80            Err(_) => {
81                // Contended — Box drops here, freeing the allocation
82            }
83        }
84    }
85}
86
87/// A sharded buffer pool for `T`.  Construction allocates the shards;
88/// items are created lazily via `T::default()` when no ready item is
89/// available.  Sharding keeps `try_lock`-failure rates low under
90/// concurrent acquire / release.
91pub(crate) struct ObjectPool<T: Resettable + Default + Send + 'static> {
92    shards: Vec<Arc<Pool<T>>>,
93    shard_mask: u64,
94}
95
96impl<T: Resettable + Default + Send + 'static> ObjectPool<T> {
97    /// Build a pool with `shard_count` shards (rounded to a power of two,
98    /// minimum 1), each holding up to `per_shard_capacity` ready items.
99    pub fn new(shard_count: usize, per_shard_capacity: usize) -> Arc<Self> {
100        let n = shard_count.max(1).next_power_of_two();
101        let shards = (0..n)
102            .map(|_| Arc::new(Pool::new(per_shard_capacity)))
103            .collect();
104        Arc::new(Self {
105            shards,
106            shard_mask: (n as u64) - 1,
107        })
108    }
109
110    /// Acquire a `ReuseRef<T>` from this thread's shard.  Always
111    /// succeeds; if the shard is empty or its `try_lock` is contended,
112    /// falls back to `Box::new(T::default())`.  Either way the ref is
113    /// attached to the shard — on drop it will try to hand the box
114    /// back, which is how the pool grows.
115    pub fn acquire(&self) -> ReuseRef<T> {
116        let key = ensure_thread_shard_key();
117        let shard = &self.shards[(key & self.shard_mask) as usize];
118        let boxed = shard.try_take();
119        ReuseRef {
120            value: Some(boxed),
121            pool: Arc::clone(shard),
122        }
123    }
124}
125
126/// A pool-backed owned reference.  `Deref` / `DerefMut` give access to
127/// the inner `T`; on drop the value is reset and handed back to the
128/// source shard if room is available and `try_lock` succeeds —
129/// otherwise dropped.
130///
131/// `Clone` allocates a fresh box backed by the same shard.  External
132/// clones (e.g. `cache.get_span()`) pay one heap allocation; the clone
133/// hands its box back into the same pool on drop, just like the
134/// original.
135pub struct ReuseRef<T: Resettable + Default + Send + 'static> {
136    value: Option<Box<T>>,
137    pool: Arc<Pool<T>>,
138}
139
140impl<T: Resettable + Default + Send + 'static> std::ops::Deref for ReuseRef<T> {
141    type Target = T;
142    // `value` is a `Some(Box<T>)` for the whole life of the
143    // ReuseRef — the only `take` is inside `drop`, after the
144    // pointer to `self` is already invalid.  The `expect` here is
145    // a smart-pointer invariant (not user-facing)
146    #[inline]
147    #[allow(clippy::expect_used)]
148    fn deref(&self) -> &T {
149        self.value
150            .as_deref()
151            .expect("ReuseRef value taken before drop")
152    }
153}
154
155impl<T: Resettable + Default + Send + 'static> std::ops::DerefMut for ReuseRef<T> {
156    // Same invariant as the `Deref` impl above.
157    #[inline]
158    #[allow(clippy::expect_used)]
159    fn deref_mut(&mut self) -> &mut T {
160        self.value
161            .as_deref_mut()
162            .expect("ReuseRef value taken before drop")
163    }
164}
165
166impl<T: Resettable + Default + Send + 'static + Clone> Clone for ReuseRef<T> {
167    fn clone(&self) -> Self {
168        // Allocate a fresh box for the clone; share the source shard.
169        // On drop the clone tries to hand its box back the same way as
170        // any other acquire — that's how clones round-trip through the
171        // pool just like fresh acquires under contention.
172        ReuseRef {
173            value: Some(Box::new((**self).clone())),
174            pool: Arc::clone(&self.pool),
175        }
176    }
177}
178
179impl<T: Resettable + Default + Send + 'static> Drop for ReuseRef<T> {
180    fn drop(&mut self) {
181        let Some(mut boxed) = self.value.take() else {
182            return;
183        };
184        boxed.reset();
185        self.pool.try_return(boxed);
186    }
187}
188
189impl<T: Resettable + Default + Send + 'static + std::fmt::Debug> std::fmt::Debug for ReuseRef<T> {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        std::fmt::Debug::fmt(&**self, f)
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198    use std::sync::atomic::{AtomicUsize, Ordering};
199
200    /// A `Buf` that increments a shared atomic on drop.  Each test
201    /// builds its own counter so parallel tests don't interfere.
202    #[derive(Debug, Default, Clone)]
203    struct Buf {
204        bytes: Vec<u8>,
205        drops: Option<Arc<AtomicUsize>>,
206    }
207
208    impl Resettable for Buf {
209        fn reset(&mut self) {
210            self.bytes.clear();
211        }
212    }
213
214    impl Drop for Buf {
215        fn drop(&mut self) {
216            if let Some(d) = &self.drops {
217                d.fetch_add(1, Ordering::Relaxed);
218            }
219        }
220    }
221
222    fn install_counter(r: &mut ReuseRef<Buf>, counter: &Arc<AtomicUsize>) {
223        r.drops = Some(Arc::clone(counter));
224    }
225
226    #[test]
227    fn pool_starts_empty_and_grows() {
228        // Fresh pool: items vec is empty until something gets returned.
229        let pool = ObjectPool::<Buf>::new(1, 4);
230        let shard = &pool.shards[0];
231        assert_eq!(shard.items.lock().unwrap().len(), 0);
232
233        {
234            let mut r = pool.acquire();
235            r.bytes.extend_from_slice(b"x");
236        }
237        // After one acquire/drop the shard has exactly one entry.
238        assert_eq!(shard.items.lock().unwrap().len(), 1);
239    }
240
241    #[test]
242    fn acquire_then_drop_returns_to_shard() {
243        let pool = ObjectPool::<Buf>::new(1, 4);
244        let counter = Arc::new(AtomicUsize::new(0));
245        {
246            let mut r = pool.acquire();
247            install_counter(&mut r, &counter);
248            r.bytes.extend_from_slice(b"hello");
249            assert_eq!(r.bytes, b"hello");
250        }
251        // Box was returned — no Buf dropped.
252        assert_eq!(counter.load(Ordering::Relaxed), 0);
253        // Re-acquiring should hand back the same allocation (cleared).
254        let r = pool.acquire();
255        assert_eq!(r.bytes, b"");
256    }
257
258    #[test]
259    fn full_shard_drops_overflow() {
260        let pool = ObjectPool::<Buf>::new(1, 2);
261        let counter = Arc::new(AtomicUsize::new(0));
262        // Acquire 4 — shard cap is 2, so two should drop on return.
263        let mut refs: Vec<_> = (0..4).map(|_| pool.acquire()).collect();
264        for r in &mut refs {
265            install_counter(r, &counter);
266        }
267        drop(refs);
268        assert_eq!(counter.load(Ordering::Relaxed), 2);
269    }
270
271    #[test]
272    fn contended_acquire_still_returns_on_drop() {
273        // Hold the shard's lock; an acquire while contended should
274        // still produce a working ReuseRef (allocated on the fly),
275        // and dropping it (after the lock is released) should add it
276        // to the shard, growing the pool.
277        let pool = ObjectPool::<Buf>::new(1, 4);
278        let shard = Arc::clone(&pool.shards[0]);
279        let r;
280        {
281            let _guard = shard.items.lock().unwrap();
282            r = pool.acquire();
283            // Lock guard drops here; r is still alive.
284        }
285        // Drop r now that lock is free — should add to pool.
286        drop(r);
287        assert_eq!(shard.items.lock().unwrap().len(), 1);
288    }
289}