architect_api/utils/
pool.rs

1#[cfg(feature = "netidx")]
2use bytes::{Buf, BufMut};
3use crossbeam::queue::ArrayQueue;
4use fxhash::FxHashMap;
5use indexmap::{IndexMap, IndexSet};
6#[cfg(feature = "netidx")]
7use netidx::pack::{Pack, PackError};
8use once_cell::sync::Lazy;
9use parking_lot::Mutex;
10use schemars::{schema::InstanceType, JsonSchema};
11use serde::{de::DeserializeOwned, Deserialize, Serialize};
12use std::{
13    any::{Any, TypeId},
14    borrow::Borrow,
15    cell::RefCell,
16    cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd},
17    collections::{HashMap, HashSet, VecDeque},
18    default::Default,
19    fmt::Debug,
20    hash::{BuildHasher, Hash, Hasher},
21    ops::{Deref, DerefMut},
22    sync::{Arc, Weak},
23};
24use triomphe::Arc as TArc;
25
26/// pool(vis, name, type, capacity, max_elt_size)
27///
28/// Create a static memory pool. Objects are taken from the pool or
29/// allocated normally if it is empty, when they are dropped instead
30/// of being deallocated they are cleared and inserted into the pool,
31/// up to capacity elements no more than max_elt_size may be stored in
32/// the pool.
33#[macro_export]
34macro_rules! pool {
35    ($vis:vis, $name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
36        $vis fn $name() -> &'static api::utils::pool::Pool<$ty> {
37            static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
38            POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
39        }
40    };
41    ($name:ident, $ty:ty, $max_capacity:expr, $max_elt_size:expr) => {
42        fn $name() -> &'static api::utils::pool::Pool<$ty> {
43            static POOL: once_cell::race::OnceBox<api::utils::pool::Pool<$ty>> = once_cell::race::OnceBox::new();
44            POOL.get_or_init(|| Box::new(api::utils::pool::Pool::new($max_capacity, $max_elt_size)))
45        }
46    }
47}
48
49pub trait Poolable {
50    fn empty() -> Self;
51    fn reset(&mut self);
52    fn capacity(&self) -> usize;
53    /// in case you are pooling something ref counted e.g. arc
54    fn really_dropped(&self) -> bool {
55        true
56    }
57}
58
59macro_rules! impl_hashmap {
60    ($ty:ident) => {
61        impl<K, V, R> Poolable for $ty<K, V, R>
62        where
63            K: Hash + Eq,
64            R: Default + BuildHasher,
65        {
66            fn empty() -> Self {
67                $ty::default()
68            }
69
70            fn reset(&mut self) {
71                self.clear()
72            }
73
74            fn capacity(&self) -> usize {
75                $ty::capacity(self)
76            }
77        }
78    };
79}
80
81impl_hashmap!(HashMap);
82impl_hashmap!(IndexMap);
83
84macro_rules! impl_hashset {
85    ($ty:ident) => {
86        impl<K, R> Poolable for $ty<K, R>
87        where
88            K: Hash + Eq,
89            R: Default + BuildHasher,
90        {
91            fn empty() -> Self {
92                $ty::default()
93            }
94
95            fn reset(&mut self) {
96                self.clear()
97            }
98
99            fn capacity(&self) -> usize {
100                $ty::capacity(self)
101            }
102        }
103    };
104}
105
106impl_hashset!(HashSet);
107impl_hashset!(IndexSet);
108
109impl<T> Poolable for Vec<T> {
110    fn empty() -> Self {
111        Vec::new()
112    }
113
114    fn reset(&mut self) {
115        self.clear()
116    }
117
118    fn capacity(&self) -> usize {
119        Vec::capacity(self)
120    }
121}
122
123impl<T> Poolable for VecDeque<T> {
124    fn empty() -> Self {
125        VecDeque::new()
126    }
127
128    fn reset(&mut self) {
129        self.clear()
130    }
131
132    fn capacity(&self) -> usize {
133        VecDeque::capacity(self)
134    }
135}
136
137impl Poolable for String {
138    fn empty() -> Self {
139        String::new()
140    }
141
142    fn reset(&mut self) {
143        self.clear()
144    }
145
146    fn capacity(&self) -> usize {
147        self.capacity()
148    }
149}
150
151impl<T: Poolable> Poolable for TArc<T> {
152    fn empty() -> Self {
153        TArc::new(T::empty())
154    }
155
156    fn reset(&mut self) {
157        if let Some(inner) = TArc::get_mut(self) {
158            inner.reset()
159        }
160    }
161
162    fn capacity(&self) -> usize {
163        1
164    }
165
166    fn really_dropped(&self) -> bool {
167        TArc::is_unique(&self)
168    }
169}
170
171trait Prune {
172    fn prune(&self);
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
176struct Pid(u32);
177
178impl Pid {
179    fn new() -> Self {
180        use std::sync::atomic::{AtomicU32, Ordering};
181        static PID: AtomicU32 = AtomicU32::new(0);
182        Self(PID.fetch_add(1, Ordering::Relaxed))
183    }
184}
185
186struct GlobalState {
187    pools: FxHashMap<Pid, Box<dyn Prune + Send + 'static>>,
188    pool_shark_running: bool,
189}
190
191static POOLS: Lazy<Mutex<GlobalState>> = Lazy::new(|| {
192    Mutex::new(GlobalState { pools: HashMap::default(), pool_shark_running: false })
193});
194
195#[derive(Debug)]
196struct PoolInner<T: Poolable + Send + 'static> {
197    pool: ArrayQueue<T>,
198    max_elt_capacity: usize,
199    id: Pid,
200}
201
202fn pool_shark() {
203    use std::{
204        thread::{sleep, spawn},
205        time::Duration,
206    };
207    spawn(|| loop {
208        sleep(Duration::from_secs(300));
209        {
210            for p in POOLS.lock().pools.values() {
211                p.prune()
212            }
213        }
214    });
215}
216
217/// a lock-free, thread-safe, dynamically-sized object pool.
218///
219/// this pool begins with an initial capacity and will continue
220/// creating new objects on request when none are available. Pooled
221/// objects are returned to the pool on destruction.
222///
223/// if, during an attempted return, a pool already has
224/// `maximum_capacity` objects in the pool, the pool will throw away
225/// that object.
226#[derive(Clone, Debug)]
227pub struct Pool<T: Poolable + Send + 'static>(Arc<PoolInner<T>>);
228
229impl<T: Poolable + Send + 'static> Drop for Pool<T> {
230    fn drop(&mut self) {
231        // one held by us, and one held by the pool shark
232        if Arc::strong_count(&self.0) <= 2 {
233            let res = POOLS.lock().pools.remove(&self.0.id);
234            drop(res)
235        }
236    }
237}
238
239impl<T: Poolable + Send + 'static> Prune for Pool<T> {
240    fn prune(&self) {
241        let len = self.0.pool.len();
242        let ten_percent = std::cmp::max(1, self.0.pool.capacity() / 10);
243        let one_percent = std::cmp::max(1, ten_percent / 10);
244        if len > ten_percent {
245            for _ in 0..ten_percent {
246                self.0.pool.pop();
247            }
248        } else if len > one_percent {
249            for _ in 0..one_percent {
250                self.0.pool.pop();
251            }
252        } else if len > 0 {
253            self.0.pool.pop();
254        }
255    }
256}
257
258impl<T: Poolable + Send + 'static> Pool<T> {
259    /// creates a new `Pool<T>`. this pool will retain up to
260    /// `max_capacity` objects of size less than or equal to
261    /// max_elt_capacity. Objects larger than max_elt_capacity will be
262    /// deallocated immediatly.
263    pub fn new(max_capacity: usize, max_elt_capacity: usize) -> Pool<T> {
264        let id = Pid::new();
265        let t = Pool(Arc::new(PoolInner {
266            pool: ArrayQueue::new(max_capacity),
267            max_elt_capacity,
268            id,
269        }));
270        let mut gs = POOLS.lock();
271        gs.pools.insert(id, Box::new(Pool(Arc::clone(&t.0))));
272        if !gs.pool_shark_running {
273            gs.pool_shark_running = true;
274            pool_shark()
275        }
276        t
277    }
278
279    /// takes an item from the pool, creating one if none are available.
280    pub fn take(&self) -> Pooled<T> {
281        let object = self.0.pool.pop().unwrap_or_else(Poolable::empty);
282        Pooled { pool: Arc::downgrade(&self.0), object: Some(object) }
283    }
284}
285
286/// an object, checked out from a pool.
287#[derive(Debug, Clone)]
288pub struct Pooled<T: Poolable + Send + 'static> {
289    pool: Weak<PoolInner<T>>,
290    // Safety invariant. This will always be Some unless the
291    // pooled has been dropped
292    object: Option<T>,
293}
294
295impl<T: Poolable + Send + 'static> Pooled<T> {
296    #[inline(always)]
297    fn get(&self) -> &T {
298        match &self.object {
299            Some(ref t) => t,
300            None => unreachable!(),
301        }
302    }
303
304    #[inline(always)]
305    fn get_mut(&mut self) -> &mut T {
306        match &mut self.object {
307            Some(ref mut t) => t,
308            None => unreachable!(),
309        }
310    }
311}
312
313impl<T: Poolable + Sync + Send + 'static> Borrow<T> for Pooled<T> {
314    fn borrow(&self) -> &T {
315        self.get()
316    }
317}
318
319impl Borrow<str> for Pooled<String> {
320    fn borrow(&self) -> &str {
321        self.get().borrow()
322    }
323}
324
325impl<T: Poolable + Send + 'static + PartialEq> PartialEq for Pooled<T> {
326    fn eq(&self, other: &Pooled<T>) -> bool {
327        self.get().eq(other.get())
328    }
329}
330
331impl<T: Poolable + Send + 'static + Eq> Eq for Pooled<T> {}
332
333impl<T: Poolable + Send + 'static + PartialOrd> PartialOrd for Pooled<T> {
334    fn partial_cmp(&self, other: &Pooled<T>) -> Option<Ordering> {
335        self.get().partial_cmp(other.get())
336    }
337}
338
339impl<T: Poolable + Send + 'static + Ord> Ord for Pooled<T> {
340    fn cmp(&self, other: &Pooled<T>) -> Ordering {
341        self.get().cmp(other.get())
342    }
343}
344
345impl<T: Poolable + Send + 'static + Hash> Hash for Pooled<T> {
346    fn hash<H>(&self, state: &mut H)
347    where
348        H: Hasher,
349    {
350        Hash::hash(self.get(), state)
351    }
352}
353
354impl<T: Poolable + Send + 'static> Pooled<T> {
355    /// Creates a `Pooled` that isn't connected to any pool. E.G. for
356    /// branches where you know a given `Pooled` will always be empty.
357    pub fn orphan(t: T) -> Self {
358        Pooled { pool: Weak::new(), object: Some(t) }
359    }
360
361    pub fn detach(mut self) -> T {
362        self.object.take().unwrap()
363    }
364}
365
366impl<T: Poolable + Send + 'static> AsRef<T> for Pooled<T> {
367    fn as_ref(&self) -> &T {
368        self.get()
369    }
370}
371
372impl<T: Poolable + Send + 'static> Deref for Pooled<T> {
373    type Target = T;
374
375    fn deref(&self) -> &T {
376        self.get()
377    }
378}
379
380impl<T: Poolable + Send + 'static> DerefMut for Pooled<T> {
381    fn deref_mut(&mut self) -> &mut T {
382        self.get_mut()
383    }
384}
385
386impl<T: Poolable + Send + 'static> Drop for Pooled<T> {
387    fn drop(&mut self) {
388        if self.get().really_dropped() {
389            if let Some(inner) = self.pool.upgrade() {
390                let cap = self.get().capacity();
391                if cap > 0 && cap <= inner.max_elt_capacity {
392                    let mut object = self.object.take().unwrap();
393                    object.reset();
394                    inner.pool.push(object).ok();
395                }
396            }
397        }
398    }
399}
400
401impl<T: Poolable + Send + 'static + Serialize> Serialize for Pooled<T> {
402    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
403    where
404        S: serde::Serializer,
405    {
406        self.get().serialize(serializer)
407    }
408}
409
410impl<'de, T: Poolable + Send + 'static + DeserializeOwned> Deserialize<'de>
411    for Pooled<T>
412{
413    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
414    where
415        D: serde::Deserializer<'de>,
416    {
417        let mut t = take_t::<T>(1000, 1000);
418        Self::deserialize_in_place(deserializer, &mut t)?;
419        Ok(t)
420    }
421
422    fn deserialize_in_place<D>(deserializer: D, place: &mut Self) -> Result<(), D::Error>
423    where
424        D: serde::Deserializer<'de>,
425    {
426        <T as Deserialize>::deserialize_in_place(deserializer, place.get_mut())
427    }
428}
429thread_local! {
430    static POOLS_S: RefCell<FxHashMap<TypeId, Box<dyn Any>>> =
431        RefCell::new(HashMap::default());
432}
433
434/// Take a poolable type T from the generic thread local pool set.
435/// Note it is much more efficient to construct your own pools.
436/// size and max are the pool parameters used if the pool doesn't
437/// already exist.
438pub fn take_t<T: Any + Poolable + Send + 'static>(size: usize, max: usize) -> Pooled<T> {
439    POOLS_S.with(|pools| {
440        let mut pools = pools.borrow_mut();
441        let pool: &mut Pool<T> = pools
442            .entry(TypeId::of::<T>())
443            .or_insert_with(|| Box::new(Pool::<T>::new(size, max)))
444            .downcast_mut()
445            .unwrap();
446        pool.take()
447    })
448}
449
450impl<T: Poolable + Send + 'static + JsonSchema> JsonSchema for Pooled<T> {
451    fn schema_name() -> String {
452        // Exclude the module path to make the name in generated schemas clearer.
453        "Pooled".to_owned()
454    }
455
456    fn json_schema(
457        _gen: &mut schemars::gen::SchemaGenerator,
458    ) -> schemars::schema::Schema {
459        // FIXME probably
460        schemars::schema::SchemaObject {
461            instance_type: Some(InstanceType::Array.into()),
462            ..Default::default()
463        }
464        .into()
465    }
466
467    fn is_referenceable() -> bool {
468        true
469    }
470}
471
472#[cfg(feature = "netidx")]
473impl<T: Pack + Any + Send + Sync + Poolable> Pack for Pooled<T> {
474    fn encoded_len(&self) -> usize {
475        <T as Pack>::encoded_len(&**self)
476    }
477
478    fn encode(&self, buf: &mut impl BufMut) -> Result<(), PackError> {
479        <T as Pack>::encode(&**self, buf)
480    }
481
482    fn decode(buf: &mut impl Buf) -> Result<Self, PackError> {
483        let mut t = take_t::<T>(1000, 1000);
484        <T as Pack>::decode_into(&mut *t, buf)?;
485        Ok(t)
486    }
487
488    fn decode_into(&mut self, buf: &mut impl Buf) -> Result<(), PackError> {
489        <T as Pack>::decode_into(&mut **self, buf)
490    }
491}