netidx_core/pool/
mod.rs

1/// This is shamelessly based on the dynamic-pool crate, with
2/// modifications
3use crate::utils::take_t;
4use crossbeam::queue::ArrayQueue;
5use fxhash::FxHashMap;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use std::{
10    borrow::Borrow,
11    cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
12    collections::HashMap,
13    default::Default,
14    fmt::Debug,
15    hash::{Hash, Hasher},
16    mem::{self, ManuallyDrop},
17    ops::{Deref, DerefMut},
18    ptr,
19    sync::{Arc, Weak},
20};
21
22pub mod pooled;
23#[cfg(test)]
24mod test;
25
26/// Implementing this trait allows full low level control over where
27/// the pool pointer is stored. For example if you are pooling an
28/// allocated data structure, you could store the pool pointer in the
29/// allocation to keep the size of the handle struct to a
30/// minimum. E.G. you're pooling a ThinArc. Or, if you have a static
31/// global pool, then you would not need to keep a pool pointer at
32/// all.
33///
34/// The object's drop implementation should return the object to the
35/// pool instead of deallocating it
36///
37/// Implementing this trait correctly is extremely tricky, and
38/// requires unsafe code in almost all cases, therefore it is marked
39/// as unsafe
40///
41/// Most of the time you should use the `Pooled` wrapper as it's
42/// required trait is much eaiser to implement and there is no
43/// practial place to put the pool pointer besides on the stack.
44pub unsafe trait RawPoolable: Send + Sized {
45    /// allocate a new empty object and set it's pool pointer to `pool`
46    fn empty(pool: WeakPool<Self>) -> Self;
47
48    /// empty the collection and reset it to it's default state so it
49    /// can be put back in the pool
50    fn reset(&mut self);
51
52    /// return the capacity of the collection
53    fn capacity(&self) -> usize;
54
55    /// Actually drop the inner object, don't put it back in the pool,
56    /// make sure you do not call both this method and the drop
57    /// implementation that puts the object back in the pool!
58    fn really_drop(self);
59}
60
61pub trait Poolable {
62    /// allocate a new empty collection
63    fn empty() -> Self;
64
65    /// empty the collection and reset it to it's default state so it
66    /// can be put back in the pool
67    fn reset(&mut self);
68
69    /// return the capacity of the collection
70    fn capacity(&self) -> usize;
71
72    /// return true if the object has really been dropped, e.g. if
73    /// you're pooling an Arc, it's strong_count became 1.
74    fn really_dropped(&self) -> bool {
75        true
76    }
77}
78
79/// A generic wrapper for pooled objects. This handles keeping track
80/// of the pool pointer for you and allows you to wrap almost any
81/// container type easily.
82///
83/// Most of the time, this is what you want to use.
84#[derive(Debug, Clone)]
85pub struct Pooled<T: Poolable + Send + 'static> {
86    pool: ManuallyDrop<WeakPool<Self>>,
87    object: ManuallyDrop<T>,
88}
89
90unsafe impl<T: Poolable + Send + 'static> RawPoolable for Pooled<T> {
91    fn empty(pool: WeakPool<Self>) -> Self {
92        Pooled {
93            pool: ManuallyDrop::new(pool),
94            object: ManuallyDrop::new(Poolable::empty()),
95        }
96    }
97
98    fn reset(&mut self) {
99        Poolable::reset(&mut *self.object)
100    }
101
102    fn capacity(&self) -> usize {
103        Poolable::capacity(&*self.object)
104    }
105
106    fn really_drop(self) {
107        drop(self.detach())
108    }
109}
110
111impl<T: Poolable + Send + 'static> Borrow<T> for Pooled<T> {
112    fn borrow(&self) -> &T {
113        &self.object
114    }
115}
116
117impl Borrow<str> for Pooled<String> {
118    fn borrow(&self) -> &str {
119        &self.object
120    }
121}
122
123impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
124    fn eq(&self, other: &Pooled<T>) -> bool {
125        self.object.eq(&other.object)
126    }
127}
128
129impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
130
131impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
132    fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
133        self.object.partial_cmp(&other.object)
134    }
135}
136
137impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
138    fn cmp(&self, other: &Pooled<T>) -> Ordering {
139        self.object.cmp(&other.object)
140    }
141}
142
143impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
144    fn hash<H>(&self, state: &mut H)
145    where
146        H: Hasher,
147    {
148        Hash::hash(&self.object, state)
149    }
150}
151
152impl<T: Poolable + Send + 'static> Pooled<T> {
153    /// Creates a `Pooled` that isn't connected to any pool. E.G. for
154    /// branches where you know a given `Pooled` will always be empty.
155    pub fn orphan(t: T) -> Self {
156        Pooled { pool: ManuallyDrop::new(WeakPool::new()), object: ManuallyDrop::new(t) }
157    }
158
159    /// assign the `Pooled` to the specified pool. When it is dropped
160    /// it will be placed in `pool` instead of the pool it was
161    /// originally allocated from. If an orphan is assigned a pool it
162    /// will no longer be orphaned.
163    pub fn assign(&mut self, pool: &Pool<T>) {
164        let old = mem::replace(&mut self.pool, ManuallyDrop::new(pool.downgrade()));
165        drop(ManuallyDrop::into_inner(old))
166    }
167
168    /// detach the object from the pool, returning it.
169    pub fn detach(self) -> T {
170        let mut t = ManuallyDrop::new(self);
171        unsafe {
172            ManuallyDrop::drop(&mut t.pool);
173            ManuallyDrop::take(&mut t.object)
174        }
175    }
176}
177
178impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
179    fn as_ref(&self) -> &T {
180        &self.object
181    }
182}
183
184impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
185    type Target = T;
186
187    fn deref(&self) -> &T {
188        &self.object
189    }
190}
191
192impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
193    fn deref_mut(&mut self) -> &mut T {
194        &mut self.object
195    }
196}
197
198impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
199    fn drop(&mut self) {
200        if self.really_dropped() {
201            match self.pool.upgrade() {
202                Some(pool) => pool.insert(unsafe { ptr::read(self) }),
203                None => unsafe {
204                    ManuallyDrop::drop(&mut self.pool);
205                    ManuallyDrop::drop(&mut self.object);
206                },
207            }
208        }
209    }
210}
211
212impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
213    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
214    where
215        S: serde::Serializer,
216    {
217        self.object.serialize(serializer)
218    }
219}
220
221impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
222    for Pooled<T>
223{
224    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
225    where
226        D: serde::Deserializer<'de>,
227    {
228        let mut t = take_t::<T>(1000, 1000);
229        Self::deserialize_in_place(deserializer, &mut t)?;
230        Ok(t)
231    }
232
233    fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
234    where
235        D: serde::Deserializer<'de>,
236    {
237        <T as Deserialize>::deserialize_in_place(deserializer, &mut place.object)
238    }
239}
240
241trait Prune {
242    fn prune(&self);
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
246struct Pid(u32);
247
248impl Pid {
249    fn new() -> Self {
250        use std::sync::atomic::{AtomicU32, Ordering};
251        static PID: AtomicU32 = AtomicU32::new(0);
252        Self(PID.fetch_add(1, Ordering::Relaxed))
253    }
254}
255
256struct GlobalState {
257    pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
258    pool_shark_running: bool,
259}
260
261static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
262    Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
263});
264
265#[derive(Debug)]
266struct PoolInner<T: RawPoolable> {
267    pool: ArrayQueue<T>,
268    max_elt_capacity: usize,
269    id: Pid,
270}
271
272impl<T: RawPoolable> Drop for PoolInner<T> {
273    fn drop(&mut self) {
274        while let Some(t) = self.pool.pop() {
275            RawPoolable::really_drop(t)
276        }
277    }
278}
279
280fn pool_shark() {
281    use std::{
282        thread::{sleep, spawn},
283        time::Duration,
284    };
285    spawn(|| loop {
286        sleep(Duration::from_secs(300));
287        {
288            for p in POOLS.lock().pools.values() {
289                p.prune()
290            }
291        }
292    });
293}
294
295#[derive(Clone, Debug)]
296pub struct WeakPool<T: RawPoolable>(Weak<PoolInner<T>>);
297
298impl<T: RawPoolable + Send + 'static> WeakPool<T> {
299    pub fn new() -> Self {
300        WeakPool(Weak::new())
301    }
302
303    pub fn upgrade(&self) -> Option<RawPool<T>> {
304        self.0.upgrade().map(RawPool)
305    }
306}
307
308pub type Pool<T> = RawPool<Pooled<T>>;
309
310/// a lock-free, thread-safe, dynamically-sized object pool.
311///
312/// this pool begins with an initial capacity and will continue
313/// creating new objects on request when none are available. Pooled
314/// objects are returned to the pool on destruction.
315///
316/// if, during an attempted return, a pool already has
317/// `maximum_capacity` objects in the pool, the pool will throw away
318/// that object.
319#[derive(Clone, Debug)]
320pub struct RawPool<T: RawPoolable + Send + 'static>(Arc<PoolInner<T>>);
321
322impl<T: RawPoolable + Send + 'static> Drop for RawPool<T> {
323    fn drop(&mut self) {
324        // one held by us, and one held by the pool shark
325        if Arc::strong_count(&self.0) <= 2 {
326            let res = POOLS.lock().pools.remove(&self.0.id);
327            drop(res)
328        }
329    }
330}
331
332impl<T: RawPoolable + Send + 'static> Prune for RawPool<T> {
333    fn prune(&self) {
334        let len = self.0.pool.len();
335        let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
336        let one_percent = std::cmp::max(1, ten_percent / 10);
337        if len > ten_percent {
338            for _ in 0..ten_percent {
339                if let Some(v) = self.0.pool.pop() {
340                    RawPoolable::really_drop(v)
341                }
342            }
343        } else if len > one_percent {
344            for _ in 0..one_percent {
345                if let Some(v) = self.0.pool.pop() {
346                    RawPoolable::really_drop(v)
347                }
348            }
349        } else if len > 0 {
350            if let Some(v) = self.0.pool.pop() {
351                RawPoolable::really_drop(v)
352            }
353        }
354    }
355}
356
357impl<T: RawPoolable + Send + 'static> RawPool<T> {
358    pub fn downgrade(&self) -> WeakPool<T> {
359        WeakPool(Arc::downgrade(&self.0))
360    }
361
362    /// creates a new `Pool<T>`. this pool will retain up to
363    /// `max_capacity` objects of size less than or equal to
364    /// max_elt_capacity. Objects larger than max_elt_capacity will be
365    /// deallocated immediatly.
366    pub fn new(max_capacity: usize, max_elt_capacity: usize) -> RawPool<T> {
367        let id = Pid::new();
368        let t = RawPool(Arc::new(PoolInner {
369            pool: ArrayQueue::new(max_capacity),
370            max_elt_capacity,
371            id,
372        }));
373        let mut gs = POOLS.lock();
374        gs.pools.insert(id, Box::new(RawPool(Arc::clone(&t.0))));
375        if !gs.pool_shark_running {
376            gs.pool_shark_running = true;
377            pool_shark()
378        }
379        t
380    }
381
382    /// try to take an element from the pool, return None if it is empty
383    pub fn try_take(&self) -> Option<T> {
384        self.0.pool.pop()
385    }
386
387    /// takes an item from the pool, creating one if none are available.
388    pub fn take(&self) -> T {
389        self.0.pool.pop().unwrap_or_else(|| RawPoolable::empty(self.downgrade()))
390    }
391
392    /// Insert an object into the pool. The object may be dropped if
393    /// the pool is at capacity, or the object has too much capacity.
394    pub fn insert(&self, mut t: T) {
395        let cap = t.capacity();
396        if cap > 0 && cap <= self.0.max_elt_capacity {
397            t.reset();
398            if let Err(t) = self.0.pool.push(t) {
399                RawPoolable::really_drop(t)
400            }
401        } else {
402            RawPoolable::really_drop(t)
403        }
404    }
405}