tracing_cache/object_pool.rs
1//! Per-type buffer pool with try-only locking semantics.
2//!
3//! [`ObjectPool`] holds N shards (`Vec<Arc<Pool<T>>>`); the hot path
4//! picks a shard via the same per-thread key the rest of the cache
5//! uses, then attempts to pop a pre-reset `Box<T>` from that shard's
6//! `Mutex<Vec<Box<T>>>`.
7//!
8//! The pool starts **empty**. It grows under load:
9//! * acquire: `try_lock` the shard. Success → pop the next ready
10//! box; or, if the shard is empty, allocate `Box::new(T::default())`.
11//! `try_lock` fail (contended) → still allocate a fresh box.
12//! * Every `ReuseRef` is attached to its source shard regardless of
13//! whether the acquire try-lock succeeded. On drop the ref tries
14//! to hand its box back; that's how the pool fills up.
15//!
16//! On `ReuseRef::drop`:
17//! * `try_lock` the source shard. Success + room (`len < capacity`)
18//! → `reset()` and push back. Full or contended → drop the box.
19//!
20//! No thread ever blocks on the pool's lock.
21
22use std::sync::{Arc, Mutex};
23
24use crate::thread_state::ensure_thread_shard_key;
25
26/// State that can be reset to a pool-ready empty form.
27// `pub` because it appears as a bound on `ReuseRef<T>`, which is
28// part of the public surface (`SpanRecord.events`).
29#[doc(hidden)]
30pub trait Resettable {
31 fn reset(&mut self);
32}
33
34/// One shard of the pool. Holds up to `capacity` ready-to-reuse boxed
35/// items. Storing `Box<T>` (not `T`) means the heap allocation itself
36/// is recycled — `acquire` is `Vec::pop` returning the prior Box, and
37/// `return` is `Vec::push` putting it back; no `Box::new` on the hot
38/// path after warmup.
39pub(crate) struct Pool<T: Resettable + Default + Send + 'static> {
40 items: Mutex<Vec<Box<T>>>,
41 capacity: usize,
42}
43
44impl<T: Resettable + Default + Send + 'static> Pool<T> {
45 fn new(capacity: usize) -> Self {
46 Self {
47 // Don't pre-allocate. The pool grows from empty as
48 // ReuseRefs drop and hand boxes back.
49 items: Mutex::new(Vec::new()),
50 capacity,
51 }
52 }
53
54 /// Try to take a ready boxed item. Always returns a `Box<T>` —
55 /// pops from the shard if its `try_lock` succeeds and an item is
56 /// available, otherwise allocates `Box::new(T::default())`. In
57 /// either case the resulting `ReuseRef` is attached and will
58 /// attempt to hand the box back on drop, which is how the pool
59 /// grows.
60 fn try_take(self: &Arc<Self>) -> Box<T> {
61 if let Ok(mut guard) = self.items.try_lock()
62 && let Some(b) = guard.pop()
63 {
64 return b;
65 }
66 Box::new(T::default())
67 }
68
69 /// Push a (reset) boxed value back into the shard, dropping it if
70 /// there's no room or if the lock is contended. Drop is what frees
71 /// the Box's heap allocation.
72 fn try_return(&self, value: Box<T>) {
73 match self.items.try_lock() {
74 Ok(mut guard) => {
75 if guard.len() < self.capacity {
76 guard.push(value);
77 }
78 // else: full — Box drops here, freeing the allocation
79 }
80 Err(_) => {
81 // Contended — Box drops here, freeing the allocation
82 }
83 }
84 }
85}
86
87/// A sharded buffer pool for `T`. Construction allocates the shards;
88/// items are created lazily via `T::default()` when no ready item is
89/// available. Sharding keeps `try_lock`-failure rates low under
90/// concurrent acquire / release.
91pub(crate) struct ObjectPool<T: Resettable + Default + Send + 'static> {
92 shards: Vec<Arc<Pool<T>>>,
93 shard_mask: u64,
94}
95
96impl<T: Resettable + Default + Send + 'static> ObjectPool<T> {
97 /// Build a pool with `shard_count` shards (rounded to a power of two,
98 /// minimum 1), each holding up to `per_shard_capacity` ready items.
99 pub fn new(shard_count: usize, per_shard_capacity: usize) -> Arc<Self> {
100 let n = shard_count.max(1).next_power_of_two();
101 let shards = (0..n)
102 .map(|_| Arc::new(Pool::new(per_shard_capacity)))
103 .collect();
104 Arc::new(Self {
105 shards,
106 shard_mask: (n as u64) - 1,
107 })
108 }
109
110 /// Acquire a `ReuseRef<T>` from this thread's shard. Always
111 /// succeeds; if the shard is empty or its `try_lock` is contended,
112 /// falls back to `Box::new(T::default())`. Either way the ref is
113 /// attached to the shard — on drop it will try to hand the box
114 /// back, which is how the pool grows.
115 pub fn acquire(&self) -> ReuseRef<T> {
116 let key = ensure_thread_shard_key();
117 let shard = &self.shards[(key & self.shard_mask) as usize];
118 let boxed = shard.try_take();
119 ReuseRef {
120 value: Some(boxed),
121 pool: Arc::clone(shard),
122 }
123 }
124}
125
126/// A pool-backed owned reference. `Deref` / `DerefMut` give access to
127/// the inner `T`; on drop the value is reset and handed back to the
128/// source shard if room is available and `try_lock` succeeds —
129/// otherwise dropped.
130///
131/// `Clone` allocates a fresh box backed by the same shard. External
132/// clones (e.g. `cache.get_span()`) pay one heap allocation; the clone
133/// hands its box back into the same pool on drop, just like the
134/// original.
135pub struct ReuseRef<T: Resettable + Default + Send + 'static> {
136 value: Option<Box<T>>,
137 pool: Arc<Pool<T>>,
138}
139
140impl<T: Resettable + Default + Send + 'static> std::ops::Deref for ReuseRef<T> {
141 type Target = T;
142 // `value` is a `Some(Box<T>)` for the whole life of the
143 // ReuseRef — the only `take` is inside `drop`, after the
144 // pointer to `self` is already invalid. The `expect` here is
145 // a smart-pointer invariant (not user-facing)
146 #[inline]
147 #[allow(clippy::expect_used)]
148 fn deref(&self) -> &T {
149 self.value
150 .as_deref()
151 .expect("ReuseRef value taken before drop")
152 }
153}
154
155impl<T: Resettable + Default + Send + 'static> std::ops::DerefMut for ReuseRef<T> {
156 // Same invariant as the `Deref` impl above.
157 #[inline]
158 #[allow(clippy::expect_used)]
159 fn deref_mut(&mut self) -> &mut T {
160 self.value
161 .as_deref_mut()
162 .expect("ReuseRef value taken before drop")
163 }
164}
165
166impl<T: Resettable + Default + Send + 'static + Clone> Clone for ReuseRef<T> {
167 fn clone(&self) -> Self {
168 // Allocate a fresh box for the clone; share the source shard.
169 // On drop the clone tries to hand its box back the same way as
170 // any other acquire — that's how clones round-trip through the
171 // pool just like fresh acquires under contention.
172 ReuseRef {
173 value: Some(Box::new((**self).clone())),
174 pool: Arc::clone(&self.pool),
175 }
176 }
177}
178
179impl<T: Resettable + Default + Send + 'static> Drop for ReuseRef<T> {
180 fn drop(&mut self) {
181 let Some(mut boxed) = self.value.take() else {
182 return;
183 };
184 boxed.reset();
185 self.pool.try_return(boxed);
186 }
187}
188
189impl<T: Resettable + Default + Send + 'static + std::fmt::Debug> std::fmt::Debug for ReuseRef<T> {
190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191 std::fmt::Debug::fmt(&**self, f)
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198 use std::sync::atomic::{AtomicUsize, Ordering};
199
200 /// A `Buf` that increments a shared atomic on drop. Each test
201 /// builds its own counter so parallel tests don't interfere.
202 #[derive(Debug, Default, Clone)]
203 struct Buf {
204 bytes: Vec<u8>,
205 drops: Option<Arc<AtomicUsize>>,
206 }
207
208 impl Resettable for Buf {
209 fn reset(&mut self) {
210 self.bytes.clear();
211 }
212 }
213
214 impl Drop for Buf {
215 fn drop(&mut self) {
216 if let Some(d) = &self.drops {
217 d.fetch_add(1, Ordering::Relaxed);
218 }
219 }
220 }
221
222 fn install_counter(r: &mut ReuseRef<Buf>, counter: &Arc<AtomicUsize>) {
223 r.drops = Some(Arc::clone(counter));
224 }
225
226 #[test]
227 fn pool_starts_empty_and_grows() {
228 // Fresh pool: items vec is empty until something gets returned.
229 let pool = ObjectPool::<Buf>::new(1, 4);
230 let shard = &pool.shards[0];
231 assert_eq!(shard.items.lock().unwrap().len(), 0);
232
233 {
234 let mut r = pool.acquire();
235 r.bytes.extend_from_slice(b"x");
236 }
237 // After one acquire/drop the shard has exactly one entry.
238 assert_eq!(shard.items.lock().unwrap().len(), 1);
239 }
240
241 #[test]
242 fn acquire_then_drop_returns_to_shard() {
243 let pool = ObjectPool::<Buf>::new(1, 4);
244 let counter = Arc::new(AtomicUsize::new(0));
245 {
246 let mut r = pool.acquire();
247 install_counter(&mut r, &counter);
248 r.bytes.extend_from_slice(b"hello");
249 assert_eq!(r.bytes, b"hello");
250 }
251 // Box was returned — no Buf dropped.
252 assert_eq!(counter.load(Ordering::Relaxed), 0);
253 // Re-acquiring should hand back the same allocation (cleared).
254 let r = pool.acquire();
255 assert_eq!(r.bytes, b"");
256 }
257
258 #[test]
259 fn full_shard_drops_overflow() {
260 let pool = ObjectPool::<Buf>::new(1, 2);
261 let counter = Arc::new(AtomicUsize::new(0));
262 // Acquire 4 — shard cap is 2, so two should drop on return.
263 let mut refs: Vec<_> = (0..4).map(|_| pool.acquire()).collect();
264 for r in &mut refs {
265 install_counter(r, &counter);
266 }
267 drop(refs);
268 assert_eq!(counter.load(Ordering::Relaxed), 2);
269 }
270
271 #[test]
272 fn contended_acquire_still_returns_on_drop() {
273 // Hold the shard's lock; an acquire while contended should
274 // still produce a working ReuseRef (allocated on the fly),
275 // and dropping it (after the lock is released) should add it
276 // to the shard, growing the pool.
277 let pool = ObjectPool::<Buf>::new(1, 4);
278 let shard = Arc::clone(&pool.shards[0]);
279 let r;
280 {
281 let _guard = shard.items.lock().unwrap();
282 r = pool.acquire();
283 // Lock guard drops here; r is still alive.
284 }
285 // Drop r now that lock is free — should add to pool.
286 drop(r);
287 assert_eq!(shard.items.lock().unwrap().len(), 1);
288 }
289}