architect_api/utils/
pool.rs

1use crossbeam::queue::ArrayQueue;
2use fxhash::FxHashMap;
3use once_cell::sync::Lazy;
4use parking_lot::Mutex;
5use schemars::{schema::InstanceType, JsonSchema};
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7use std::{
8    any::{Any, TypeId},
9    borrow::Borrow,
10    cell::RefCell,
11    cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
12    collections::{HashMap, HashSet, VecDeque},
13    default::Default,
14    fmt::Debug,
15    hash::{BuildHasher, Hash, Hasher},
16    ops::{Deref, DerefMut},
17    sync::{Arc, Weak},
18};
19use triomphe::Arc as TArc;
20
21/// pool(vis, name, type, capacity, max_elt_size)
22///
23/// Create a static memory pool. Objects are taken from the pool or
24/// allocated normally if it is empty, when they are dropped instead
25/// of being deallocated they are cleared and inserted into the pool,
26/// up to capacity elements no more than max_elt_size may be stored in
27/// the pool.
28#[macro_export]
29macro_rules! pool {
30    ($vis:vis, $name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
31        $vis fn $name() -> &'static api::utils::pool::Pool<$ty> {
32            static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
33            POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
34        }
35    };
36    ($name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
37        fn $name() -> &'static api::utils::pool::Pool<$ty> {
38            static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
39            POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
40        }
41    }
42}
43
44pub trait Poolable {
45    fn empty() -> Self;
46    fn reset(&mut self);
47    fn capacity(&self) -> usize;
48    /// in case you are pooling something ref counted e.g. arc
49    fn really_dropped(&self) -> bool {
50        true
51    }
52}
53
54macro_rules! impl_hashmap {
55    ($ty:ident) => {
56        impl<K, V, R> Poolable for $ty<K, V, R>
57        where
58            K: Hash + Eq,
59            R: Default + BuildHasher,
60        {
61            fn empty() -> Self {
62                $ty::default()
63            }
64
65            fn reset(&mut self) {
66                self.clear()
67            }
68
69            fn capacity(&self) -> usize {
70                $ty::capacity(self)
71            }
72        }
73    };
74}
75
76impl_hashmap!(HashMap);
77
78macro_rules! impl_hashset {
79    ($ty:ident) => {
80        impl<K, R> Poolable for $ty<K, R>
81        where
82            K: Hash + Eq,
83            R: Default + BuildHasher,
84        {
85            fn empty() -> Self {
86                $ty::default()
87            }
88
89            fn reset(&mut self) {
90                self.clear()
91            }
92
93            fn capacity(&self) -> usize {
94                $ty::capacity(self)
95            }
96        }
97    };
98}
99
100impl_hashset!(HashSet);
101
102impl<T> Poolable for Vec<T> {
103    fn empty() -> Self {
104        Vec::new()
105    }
106
107    fn reset(&mut self) {
108        self.clear()
109    }
110
111    fn capacity(&self) -> usize {
112        Vec::capacity(self)
113    }
114}
115
116impl<T> Poolable for VecDeque<T> {
117    fn empty() -> Self {
118        VecDeque::new()
119    }
120
121    fn reset(&mut self) {
122        self.clear()
123    }
124
125    fn capacity(&self) -> usize {
126        VecDeque::capacity(self)
127    }
128}
129
130impl Poolable for String {
131    fn empty() -> Self {
132        String::new()
133    }
134
135    fn reset(&mut self) {
136        self.clear()
137    }
138
139    fn capacity(&self) -> usize {
140        self.capacity()
141    }
142}
143
144impl<T: Poolable> Poolable for TArc<T> {
145    fn empty() -> Self {
146        TArc::new(T::empty())
147    }
148
149    fn reset(&mut self) {
150        if let Some(inner) = TArc::get_mut(self) {
151            inner.reset()
152        }
153    }
154
155    fn capacity(&self) -> usize {
156        1
157    }
158
159    fn really_dropped(&self) -> bool {
160        TArc::is_unique(self)
161    }
162}
163
164trait Prune {
165    fn prune(&self);
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
169struct Pid(u32);
170
171impl Pid {
172    fn new() -> Self {
173        use std::sync::atomic::{AtomicU32, Ordering};
174        static PID: AtomicU32 = AtomicU32::new(0);
175        Self(PID.fetch_add(1, Ordering::Relaxed))
176    }
177}
178
179struct GlobalState {
180    pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
181    pool_shark_running: bool,
182}
183
184static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
185    Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
186});
187
188#[derive(Debug)]
189struct PoolInner<T: Poolable + Send + 'static> {
190    pool: ArrayQueue<T>,
191    max_elt_capacity: usize,
192    id: Pid,
193}
194
195fn pool_shark() {
196    use std::{
197        thread::{sleep, spawn},
198        time::Duration,
199    };
200    spawn(|| loop {
201        sleep(Duration::from_secs(300));
202        {
203            for p in POOLS.lock().pools.values() {
204                p.prune()
205            }
206        }
207    });
208}
209
210/// a lock-free, thread-safe, dynamically-sized object pool.
211///
212/// this pool begins with an initial capacity and will continue
213/// creating new objects on request when none are available. Pooled
214/// objects are returned to the pool on destruction.
215///
216/// if, during an attempted return, a pool already has
217/// `maximum_capacity` objects in the pool, the pool will throw away
218/// that object.
219#[derive(Clone, Debug)]
220pub struct Pool<T: Poolable + Send + 'static>(Arc<PoolInner<T>>);
221
222impl<T: Poolable + Send + 'static> Drop for Pool<T> {
223    fn drop(&mut self) {
224        // one held by us, and one held by the pool shark
225        if Arc::strong_count(&self.0) <= 2 {
226            let res = POOLS.lock().pools.remove(&self.0.id);
227            drop(res)
228        }
229    }
230}
231
232impl<T: Poolable + Send + 'static> Prune for Pool<T> {
233    fn prune(&self) {
234        let len = self.0.pool.len();
235        let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
236        let one_percent = std::cmp::max(1, ten_percent / 10);
237        if len > ten_percent {
238            for _ in 0..ten_percent {
239                self.0.pool.pop();
240            }
241        } else if len > one_percent {
242            for _ in 0..one_percent {
243                self.0.pool.pop();
244            }
245        } else if len > 0 {
246            self.0.pool.pop();
247        }
248    }
249}
250
251impl<T: Poolable + Send + 'static> Pool<T> {
252    /// creates a new `Pool<T>`. this pool will retain up to
253    /// `max_capacity` objects of size less than or equal to
254    /// max_elt_capacity. Objects larger than max_elt_capacity will be
255    /// deallocated immediatly.
256    pub fn new(max_capacity: usize, max_elt_capacity: usize) -> Pool<T> {
257        let id = Pid::new();
258        let t = Pool(Arc::new(PoolInner {
259            pool: ArrayQueue::new(max_capacity),
260            max_elt_capacity,
261            id,
262        }));
263        let mut gs = POOLS.lock();
264        gs.pools.insert(id, Box::new(Pool(Arc::clone(&t.0))));
265        if !gs.pool_shark_running {
266            gs.pool_shark_running = true;
267            pool_shark()
268        }
269        t
270    }
271
272    /// takes an item from the pool, creating one if none are available.
273    pub fn take(&self) -> Pooled<T> {
274        let object = self.0.pool.pop().unwrap_or_else(Poolable::empty);
275        Pooled { pool: Arc::downgrade(&self.0), object: Some(object) }
276    }
277}
278
279/// an object, checked out from a pool.
280#[derive(Debug, Clone)]
281pub struct Pooled<T: Poolable + Send + 'static> {
282    pool: Weak<PoolInner<T>>,
283    // Safety invariant. This will always be Some unless the
284    // pooled has been dropped
285    object: Option<T>,
286}
287
288impl<T: Poolable + Send + 'static> Pooled<T> {
289    #[inline(always)]
290    fn get(&self) -> &T {
291        match &self.object {
292            Some(ref t) => t,
293            None => unreachable!(),
294        }
295    }
296
297    #[inline(always)]
298    fn get_mut(&mut self) -> &mut T {
299        match &mut self.object {
300            Some(ref mut t) => t,
301            None => unreachable!(),
302        }
303    }
304}
305
306impl<T: Poolable + Sync + Send + 'static> Borrow<T> for Pooled<T> {
307    fn borrow(&self) -> &T {
308        self.get()
309    }
310}
311
312impl Borrow<str> for Pooled<String> {
313    fn borrow(&self) -> &str {
314        self.get().borrow()
315    }
316}
317
318impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
319    fn eq(&self, other: &Pooled<T>) -> bool {
320        self.get().eq(other.get())
321    }
322}
323
324impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
325
326impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
327    fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
328        self.get().partial_cmp(other.get())
329    }
330}
331
332impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
333    fn cmp(&self, other: &Pooled<T>) -> Ordering {
334        self.get().cmp(other.get())
335    }
336}
337
338impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
339    fn hash<H>(&self, state: &mut H)
340    where
341        H: Hasher,
342    {
343        Hash::hash(self.get(), state)
344    }
345}
346
347impl<T: Poolable + Send + 'static> Pooled<T> {
348    /// Creates a `Pooled` that isn't connected to any pool. E.G. for
349    /// branches where you know a given `Pooled` will always be empty.
350    pub fn orphan(t: T) -> Self {
351        Pooled { pool: Weak::new(), object: Some(t) }
352    }
353
354    pub fn detach(mut self) -> T {
355        self.object.take().unwrap()
356    }
357}
358
359impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
360    fn as_ref(&self) -> &T {
361        self.get()
362    }
363}
364
365impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
366    type Target = T;
367
368    fn deref(&self) -> &T {
369        self.get()
370    }
371}
372
373impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
374    fn deref_mut(&mut self) -> &mut T {
375        self.get_mut()
376    }
377}
378
379impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
380    fn drop(&mut self) {
381        if self.get().really_dropped() {
382            if let Some(inner) = self.pool.upgrade() {
383                let cap = self.get().capacity();
384                if cap > 0 && cap <= inner.max_elt_capacity {
385                    let mut object = self.object.take().unwrap();
386                    object.reset();
387                    inner.pool.push(object).ok();
388                }
389            }
390        }
391    }
392}
393
394impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
395    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
396    where
397        S: serde::Serializer,
398    {
399        self.get().serialize(serializer)
400    }
401}
402
403impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
404    for Pooled<T>
405{
406    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
407    where
408        D: serde::Deserializer<'de>,
409    {
410        let mut t = take_t::<T>(1000, 1000);
411        Self::deserialize_in_place(deserializer, &mut t)?;
412        Ok(t)
413    }
414
415    fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
416    where
417        D: serde::Deserializer<'de>,
418    {
419        <T as Deserialize>::deserialize_in_place(deserializer, place.get_mut())
420    }
421}
422thread_local! {
423    static POOLS_S: RefCell<FxHashMap<TypeId, Box<dyn Any>>> =
424        RefCell::new(HashMap::default());
425}
426
427/// Take a poolable type T from the generic thread local pool set.
428/// Note it is much more efficient to construct your own pools.
429/// size and max are the pool parameters used if the pool doesn't
430/// already exist.
431pub fn take_t<T: Any + Poolable + Send + 'static>(size: usize, max: usize) -> Pooled<T> {
432    POOLS_S.with(|pools| {
433        let mut pools = pools.borrow_mut();
434        let pool: &mut Pool<T> = pools
435            .entry(TypeId::of::<T>())
436            .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
437            .downcast_mut()
438            .unwrap();
439        pool.take()
440    })
441}
442
443impl<T: Poolable + Send + 'static + JsonSchema> JsonSchema for Pooled<T> {
444    fn schema_name() -> String {
445        // Exclude the module path to make the name in generated schemas clearer.
446        "Pooled".to_owned()
447    }
448
449    fn json_schema(
450        _gen: &mut schemars::gen::SchemaGenerator,
451    ) -> schemars::schema::Schema {
452        // FIXME probably
453        schemars::schema::SchemaObject {
454            instance_type: Some(InstanceType::Array.into()),
455            ..Default::default()
456        }
457        .into()
458    }
459
460    fn is_referenceable() -> bool {
461        true
462    }
463}