Skip to main content

poolshark/global/
mod.rs

1//! Lock-free global object pools for cross-thread pooling.
2//!
3//! Global pools ensure objects always return to their origin pool, regardless of which
4//! thread drops them. This is essential for producer-consumer patterns where one thread
5//! creates objects and other threads consume them.
6//!
7//! # When to Use
8//!
9//! Use global pools when:
10//! - One thread primarily allocates objects while others consume them
11//! - You need objects to return to a specific pool regardless of which thread drops them
12//! - You have a producer-consumer pattern across threads
13//!
14//! Otherwise, prefer [`crate::local`] pools for better performance.
15//!
16//! # Examples
17//!
18//! ## Using a static global pool
19//!
20//! ```
21//! use poolshark::global::{Pool, GPooled};
22//! use std::sync::LazyLock;
23//!
24//! static STRINGS: LazyLock<Pool<String>> = LazyLock::new(|| Pool::new(1024, 4096));
25//!
26//! fn create_message() -> GPooled<String> {
27//!     let mut s = STRINGS.take();
28//!     s.push_str("Hello, world!");
29//!     s
30//! }
31//! ```
32//!
33//! ## Using thread-local global pools
34//!
35//! ```
36//! use poolshark::global;
37//! use std::collections::HashMap;
38//!
39//! // Take from thread-local global pool
40//! let map = global::take::<HashMap<String, i32>>();
41//! ```
42use crate::{Discriminant, IsoPoolable, Opaque, Poolable, RawPoolable};
43use crossbeam_queue::ArrayQueue;
44use fxhash::FxHashMap;
45#[cfg(feature = "serde")]
46use serde::{de::DeserializeOwned, Deserialize, Serialize};
47use std::{
48    any::{Any, TypeId},
49    borrow::Borrow,
50    cell::RefCell,
51    cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
52    collections::HashMap,
53    default::Default,
54    fmt::{self, Debug, Display},
55    hash::{Hash, Hasher},
56    mem::{self, ManuallyDrop},
57    ops::{Deref, DerefMut},
58    ptr,
59    sync::{Arc, LazyLock, Mutex, Weak},
60};
61
62pub mod arc;
63
64thread_local! {
65    static POOLS: RefCell<FxHashMap<Discriminant, Opaque>> =
66        RefCell::new(HashMap::default());
67}
68
69const DEFAULT_SIZES: (usize, usize) = (1024, 1024);
70
71static SIZES: LazyLock<Mutex<FxHashMap<Discriminant, (usize, usize)>>> =
72    LazyLock::new(|| Mutex::new(FxHashMap::default()));
73
74// This is safe because:
75// 1. Containers are reset before being returned to pools, so they contain no values
76// 2. We only reuse pools for types with identical memory layouts (same size/alignment via Discriminant)
77// 3. The Opaque wrapper ensures proper cleanup when the thread local is destroyed
78fn with_pool<T, R, F>(sizes: Option<(usize, usize)>, f: F) -> R
79where
80    T: IsoPoolable,
81    F: FnOnce(Option<&Pool<T>>) -> R,
82{
83    let mut f = Some(f);
84    // if the user implements Drop on the pooled item and tries to put it back
85    // in the pool then we will end up calling ourselves recursively from the
86    // pool destructor. This is why we must use try_with on the thread local
87    let res = POOLS.try_with(|pools| match pools.try_borrow_mut() {
88        Err(_) => (f.take().unwrap())(None),
89        Ok(mut pools) => match T::DISCRIMINANT {
90            Some(d) => {
91                let pool = pools.entry(d).or_insert_with(|| {
92                    let (size, cap) = sizes.unwrap_or_else(|| {
93                        SIZES
94                            .lock()
95                            .unwrap()
96                            .get(&d)
97                            .map(|(s, c)| (*s, *c))
98                            .unwrap_or(DEFAULT_SIZES)
99                    });
100                    let b = Box::new(Pool::<T>::new(size, cap));
101                    let t = Box::into_raw(b) as *mut ();
102                    let drop = Some(Box::new(|t: *mut ()| unsafe {
103                        drop(Box::from_raw(t as *mut Pool<T>))
104                    }) as Box<dyn FnOnce(*mut ())>);
105                    Opaque { t, drop }
106                });
107                (f.take().unwrap())(unsafe { Some(&*(pool.t as *mut Pool<T>)) })
108            }
109            None => (f.take().unwrap())(None),
110        },
111    });
112    match res {
113        Err(_) => (f.take().unwrap())(None),
114        Ok(r) => r,
115    }
116}
117
118/// Clear all thread local global pools on this thread.
119///
120/// Note this will happen automatically when the thread dies.
121pub fn clear() {
122    POOLS.with_borrow_mut(|pools| pools.clear())
123}
124
125/// Delete the thread local pool for the specified `T`.
126///
127/// Note this will happen automatically when the current thread dies.
128pub fn clear_type<T: IsoPoolable>() {
129    POOLS.with_borrow_mut(|pools| {
130        if let Some(d) = T::DISCRIMINANT {
131            pools.remove(&d);
132        }
133    })
134}
135
136/// Set the pool size for the global pools of `T`.
137///
138/// Pools that have already been created will not be resized, but new pools (on new threads)
139/// will use the specified size as their max size. If you wish to resize an existing pool you
140/// can first clear_type (or clear) and then set_size.
141pub fn set_size<T: IsoPoolable>(max_pool_size: usize, max_element_capacity: usize) {
142    if let Some(d) = T::DISCRIMINANT {
143        SIZES.lock().unwrap().insert(d, (max_pool_size, max_element_capacity));
144    }
145}
146
147/// Get the max pool size and max element capacity for a given type.
148///
149/// If get_size returns None then the type will not be pooled.
150pub fn get_size<T: IsoPoolable>() -> Option<(usize, usize)> {
151    T::DISCRIMINANT.map(|d| {
152        SIZES.lock().unwrap().get(&d).map(|(s, c)| (*s, *c)).unwrap_or(DEFAULT_SIZES)
153    })
154}
155
156fn take_inner<T: IsoPoolable>(sizes: Option<(usize, usize)>) -> GPooled<T> {
157    with_pool(sizes, |pool| {
158        pool.map(|p| p.take()).unwrap_or_else(|| GPooled::orphan(T::empty()))
159    })
160}
161
162/// Take a `T` from the thread local global pool.
163///
164/// If there is no pool for `T` or there are no `T`s pooled then create a new empty `T`.
165/// If `T` has no discriminant return an orphan.
166pub fn take<T: IsoPoolable>() -> GPooled<T> {
167    take_inner(None)
168}
169
170/// Take a `T` from the thread local global pool with custom pool sizes.
171///
172/// If there is no pool for `T` or there are no `T`s pooled then create a new empty `T`.
173/// If `T` has no discriminant return an orphan. Also set the pool sizes for this type
174/// if they have not already been set.
175pub fn take_sz<T: IsoPoolable>(max: usize, max_elements: usize) -> GPooled<T> {
176    take_inner(Some((max, max_elements)))
177}
178
179/// Get a reference to the thread local global pool of `T`s.
180///
181/// Returns `None` if `T` has no discriminant. You can use [get_size], [set_size],
182/// [clear] and [clear_type] to control these global pools on the current thread.
183/// This function unlike [pool_any] does not require `T` to implement [Any], so you
184/// could use it to pool a type like `HashMap<&str, &str>`.
185pub fn pool<T: IsoPoolable>() -> Option<Pool<T>> {
186    with_pool(None, |pool| pool.cloned())
187}
188
189/// Get a reference to the thread local global pool of `T`s with custom sizes.
190///
191/// Returns `None` if `T` has no discriminant. You can use [get_size], [set_size],
192/// [clear] and [clear_type] to control these global pools on the current thread.
193/// This function unlike [pool_any] does not require `T` to implement [Any], so you
194/// could use it to pool a type like `HashMap<&str, &str>`. Also sets the pool sizes
195/// for this type if they have not already been set.
196pub fn pool_sz<T: IsoPoolable>(max: usize, max_elements: usize) -> Option<Pool<T>> {
197    with_pool(Some((max, max_elements)), |pool| pool.cloned())
198}
199
200thread_local! {
201    static ANY_POOLS: RefCell<FxHashMap<TypeId, Box<dyn Any>>> =
202        RefCell::new(HashMap::default());
203}
204
205/// Get a reference to a pool from the generic thread local pool set.
206///
207/// This works for any type that implements [Any] + [Poolable]. Note this is a different
208/// set of pools vs ones returned by [pool]. If your container type implements both
209/// [IsoPoolable] and [Any] then you can choose either of these two pool sets, it
210/// doesn't really matter for performance which one you choose as long as your
211/// choice is consistent.
212pub fn pool_any<T: Any + Poolable>(size: usize, max: usize) -> Pool<T> {
213    ANY_POOLS.with_borrow_mut(|pools| {
214        pools
215            .entry(TypeId::of::<T>())
216            .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
217            .downcast_ref::<Pool<T>>()
218            .unwrap()
219            .clone()
220    })
221}
222
223/// Take a poolable type `T` from the generic thread local pool set.
224///
225/// This works for types that implement [Any] + [Poolable]. It is much more efficient
226/// to use [take] if your container type implements [IsoPoolable], and even more efficient
227/// to use [pool] or [pool_any] and store the pool somewhere.
228pub fn take_any<T: Any + Poolable>(size: usize, max: usize) -> GPooled<T> {
229    ANY_POOLS.with_borrow_mut(|pools| {
230        pools
231            .entry(TypeId::of::<T>())
232            .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
233            .downcast_ref::<Pool<T>>()
234            .unwrap()
235            .take()
236    })
237}
238
239/// A wrapper for globally pooled objects with cross-thread pool affinity.
240///
241/// `GPooled<T>` ensures objects always return to their origin pool, regardless of which
242/// thread drops them. This is essential for producer-consumer patterns where one thread
243/// creates objects and other threads consume them.
244///
245/// # When to Use
246///
247/// Use `GPooled` when:
248/// - One thread primarily creates objects, other threads consume them
249/// - You need objects to return to a specific pool
250/// - You have a producer-consumer pattern across threads
251///
252/// Otherwise, prefer [`LPooled`](crate::local::LPooled) for better performance.
253///
254/// # Example
255///
256/// ```
257/// use poolshark::global::{Pool, GPooled};
258/// use std::sync::LazyLock;
259///
260/// // Shared pool for cross-thread usage
261/// static MESSAGES: LazyLock<Pool<String>> = LazyLock::new(|| Pool::new(1024, 4096));
262///
263/// fn producer() -> GPooled<String> {
264///     let mut msg = MESSAGES.take();
265///     msg.push_str("Hello from producer");
266///     msg  // Can be sent to consumer thread
267/// }
268///
269/// fn consumer(msg: GPooled<String>) {
270///     println!("{}", msg);
271///     // Dropped here, returns to MESSAGES pool (not consumer's thread-local pool)
272/// }
273/// ```
274///
275/// # Behavior
276///
277/// - **Pool affinity**: Always returns to the pool it was created from
278/// - **Thread-safe**: Can be sent between threads
279/// - **Overhead**: One word (8 bytes on 64-bit) to store pool pointer
280/// - **Lock-free**: Uses `crossbeam` lock-free queues
281#[derive(Clone)]
282pub struct GPooled<T: Poolable> {
283    pool: ManuallyDrop<WeakPool<Self>>,
284    object: ManuallyDrop<T>,
285}
286
287impl<T: Poolable + Debug> fmt::Debug for GPooled<T> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        write!(f, "{:?}", &self.object)
290    }
291}
292
293impl<T: Poolable + Display> fmt::Display for GPooled<T> {
294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295        write!(f, "{}", &*self.object)
296    }
297}
298
299impl<T: IsoPoolable> Default for GPooled<T> {
300    fn default() -> Self {
301        take()
302    }
303}
304
305impl<T: IsoPoolable> GPooled<T> {
306    pub fn take() -> Self {
307        take()
308    }
309
310    pub fn take_sz(max: usize, max_elements: usize) -> Self {
311        take_sz(max, max_elements)
312    }
313}
314
315impl<T: IsoPoolable + Extend<E>, E> Extend<E> for GPooled<T> {
316    fn extend<I: IntoIterator<Item = E>>(&mut self, iter: I) {
317        self.object.extend(iter)
318    }
319}
320
321unsafe impl<T: Poolable> RawPoolable for GPooled<T> {
322    fn empty(pool: WeakPool<Self>) -> Self {
323        Self {
324            pool: ManuallyDrop::new(pool),
325            object: ManuallyDrop::new(Poolable::empty()),
326        }
327    }
328
329    fn reset(&mut self) {
330        Poolable::reset(&mut *self.object)
331    }
332
333    fn capacity(&self) -> usize {
334        Poolable::capacity(&*self.object)
335    }
336
337    fn really_drop(self) {
338        drop(self.detach())
339    }
340}
341
342impl<T: Poolable> Borrow<T> for GPooled<T> {
343    fn borrow(&self) -> &T {
344        &self.object
345    }
346}
347
348impl Borrow<str> for GPooled<String> {
349    fn borrow(&self) -> &str {
350        &self.object
351    }
352}
353
354impl<T: Poolable + PartialEq> PartialEq for GPooled<T> {
355    fn eq(&self, other: &GPooled<T>) -> bool {
356        self.object.eq(&other.object)
357    }
358}
359
360impl<T: Poolable + Eq> Eq for GPooled<T> {}
361
362impl<T: Poolable + PartialOrd> PartialOrd for GPooled<T> {
363    fn partial_cmp(&self, other: &GPooled<T>) -> Option<Ordering> {
364        self.object.partial_cmp(&other.object)
365    }
366}
367
368impl<T: Poolable + Ord> Ord for GPooled<T> {
369    fn cmp(&self, other: &GPooled<T>) -> Ordering {
370        self.object.cmp(&other.object)
371    }
372}
373
374impl<T: Poolable + Hash> Hash for GPooled<T> {
375    fn hash<H>(&self, state: &mut H)
376    where
377        H: Hasher,
378    {
379        Hash::hash(&self.object, state)
380    }
381}
382
383impl<T: Poolable> GPooled<T> {
384    /// Creates a `GPooled` that isn't connected to any pool.
385    ///
386    /// Useful for branches where you know a given `Pooled` will always be empty.
387    pub fn orphan(t: T) -> Self {
388        Self { pool: ManuallyDrop::new(WeakPool::new()), object: ManuallyDrop::new(t) }
389    }
390
391    /// Assign the `GPooled` to the specified pool.
392    ///
393    /// When dropped, it will be placed in `pool` instead of the pool it was originally
394    /// allocated from. If an orphan is assigned a pool it will no longer be orphaned.
395    pub fn assign(&mut self, pool: &Pool<T>) {
396        let old = mem::replace(&mut self.pool, ManuallyDrop::new(pool.downgrade()));
397        drop(ManuallyDrop::into_inner(old))
398    }
399
400    /// Detach the object from the pool, returning the inner value.
401    ///
402    /// The detached object will not be returned to any pool when dropped.
403    pub fn detach(self) -> T {
404        let mut t = ManuallyDrop::new(self);
405        unsafe {
406            ManuallyDrop::drop(&mut t.pool);
407            ManuallyDrop::take(&mut t.object)
408        }
409    }
410}
411
412impl<T: Poolable> AsRef<T> for GPooled<T> {
413    fn as_ref(&self) -> &T {
414        &self.object
415    }
416}
417
418impl<T: Poolable> Deref for GPooled<T> {
419    type Target = T;
420
421    fn deref(&self) -> &T {
422        &self.object
423    }
424}
425
426impl<T: Poolable> DerefMut for GPooled<T> {
427    fn deref_mut(&mut self) -> &mut T {
428        &mut self.object
429    }
430}
431
432impl<T: Poolable> Drop for GPooled<T> {
433    fn drop(&mut self) {
434        if self.really_dropped() {
435            match self.pool.upgrade() {
436                Some(pool) => pool.insert(unsafe { ptr::read(self) }),
437                None => unsafe {
438                    ManuallyDrop::drop(&mut self.pool);
439                    ManuallyDrop::drop(&mut self.object);
440                },
441            }
442        }
443    }
444}
445
446#[cfg(feature = "serde")]
447impl<T: Poolable + Serialize> Serialize for GPooled<T> {
448    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
449    where
450        S: serde::Serializer,
451    {
452        self.object.serialize(serializer)
453    }
454}
455
456#[cfg(feature = "serde")]
457impl<'de, T: Poolable + DeserializeOwned + 'static> Deserialize<'de> for GPooled<T> {
458    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
459    where
460        D: serde::Deserializer<'de>,
461    {
462        let mut t = take_any::<T>(1024, 1024);
463        Self::deserialize_in_place(deserializer, &mut t)?;
464        Ok(t)
465    }
466
467    fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
468    where
469        D: serde::Deserializer<'de>,
470    {
471        <T as Deserialize>::deserialize_in_place(deserializer, &mut place.object)
472    }
473}
474
475#[derive(Debug)]
476struct PoolInner<T: RawPoolable> {
477    max_elt_capacity: usize,
478    pool: ArrayQueue<T>,
479}
480
481impl<T: RawPoolable> Drop for PoolInner<T> {
482    fn drop(&mut self) {
483        while let Some(t) = self.pool.pop() {
484            RawPoolable::really_drop(t)
485        }
486    }
487}
488
489/// A weak reference to a global Pool
490pub struct WeakPool<T: RawPoolable>(Weak<PoolInner<T>>);
491
492impl<T: RawPoolable> Debug for WeakPool<T> {
493    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494        write!(f, "<weak pool>")
495    }
496}
497
498impl<T: RawPoolable> Clone for WeakPool<T> {
499    fn clone(&self) -> Self {
500        Self(Weak::clone(&self.0))
501    }
502}
503
504impl<T: RawPoolable> WeakPool<T> {
505    pub fn new() -> Self {
506        WeakPool(Weak::new())
507    }
508
509    pub fn upgrade(&self) -> Option<RawPool<T>> {
510        self.0.upgrade().map(RawPool)
511    }
512}
513
514/// A global pool
515pub type Pool<T> = RawPool<GPooled<T>>;
516
517/// a lock-free, thread-safe, dynamically-sized object pool.
518///
519/// this pool begins with an initial capacity and will continue
520/// creating new objects on request when none are available. Pooled
521/// objects are returned to the pool on destruction.
522///
523/// if, during an attempted return, a pool already has
524/// `maximum_capacity` objects in the pool, the pool will throw away
525/// that object.
526#[derive(Debug)]
527pub struct RawPool<T: RawPoolable>(Arc<PoolInner<T>>);
528
529impl<T: RawPoolable> Clone for RawPool<T> {
530    fn clone(&self) -> Self {
531        Self(Arc::clone(&self.0))
532    }
533}
534
535impl<T: RawPoolable> RawPool<T> {
536    pub fn downgrade(&self) -> WeakPool<T> {
537        WeakPool(Arc::downgrade(&self.0))
538    }
539
540    /// Creates a new `RawPool<T>`.
541    ///
542    /// This pool will retain up to `max_capacity` objects of size less than or equal to
543    /// `max_elt_capacity`. Objects larger than `max_elt_capacity` will be deallocated immediately.
544    pub fn new(max_capacity: usize, max_elt_capacity: usize) -> RawPool<T> {
545        RawPool(Arc::new(PoolInner {
546            pool: ArrayQueue::new(max_capacity),
547            max_elt_capacity,
548        }))
549    }
550
551    /// Try to take an element from the pool.
552    ///
553    /// Returns `None` if the pool is empty.
554    pub fn try_take(&self) -> Option<T> {
555        self.0.pool.pop()
556    }
557
558    /// Takes an item from the pool.
559    ///
560    /// Creates a new item if none are available.
561    pub fn take(&self) -> T {
562        self.0.pool.pop().unwrap_or_else(|| RawPoolable::empty(self.downgrade()))
563    }
564
565    /// Insert an object into the pool.
566    ///
567    /// The object may be dropped if the pool is at capacity or if the object
568    /// has too much capacity.
569    pub fn insert(&self, mut t: T) {
570        let cap = t.capacity();
571        if cap > 0 && cap <= self.0.max_elt_capacity {
572            t.reset();
573            if let Err(t) = self.0.pool.push(t) {
574                RawPoolable::really_drop(t)
575            }
576        } else {
577            RawPoolable::really_drop(t)
578        }
579    }
580
581    /// Throw away some pooled objects to reduce memory usage.
582    ///
583    /// If the number of pooled objects is > 10% of the capacity then throw away 10%
584    /// of the capacity. Otherwise throw away 1% of the capacity. Always throw away
585    /// at least 1 object until the pool is empty.
586    pub fn prune(&self) {
587        let len = self.0.pool.len();
588        let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
589        let one_percent = std::cmp::max(1, ten_percent / 10);
590        if len > ten_percent {
591            for _ in 0..ten_percent {
592                if let Some(v) = self.0.pool.pop() {
593                    RawPoolable::really_drop(v)
594                }
595            }
596        } else if len > one_percent {
597            for _ in 0..one_percent {
598                if let Some(v) = self.0.pool.pop() {
599                    RawPoolable::really_drop(v)
600                }
601            }
602        } else if len > 0 {
603            if let Some(v) = self.0.pool.pop() {
604                RawPoolable::really_drop(v)
605            }
606        }
607    }
608}