parallel_processor/execution_manager/
objects_pool.rs

1use std::mem::ManuallyDrop;
2use std::ops::{Deref, DerefMut};
3use std::sync::Arc;
4
5use crossbeam::queue::ArrayQueue;
6
7pub trait PoolObjectTrait: Send + Sync + 'static {
8    type InitData: Clone + Sync + Send;
9
10    fn allocate_new(init_data: &Self::InitData) -> Self;
11    fn reset(&mut self);
12}
13
14impl<T: PoolObjectTrait> PoolObjectTrait for Box<T> {
15    type InitData = T::InitData;
16
17    fn allocate_new(init_data: &Self::InitData) -> Self {
18        Box::new(T::allocate_new(init_data))
19    }
20    fn reset(&mut self) {
21        T::reset(self);
22    }
23}
24
25#[derive(Clone)]
26pub(crate) struct PoolReturner<T> {
27    returner: Arc<ArrayQueue<T>>,
28}
29
30// Non blocking pool that caches a fixed number of elements and creates new elements as requested
31pub struct ObjectsPool<T: Sync + Send + 'static> {
32    queue: Arc<ArrayQueue<T>>,
33    allocate_fn: Box<dyn (Fn() -> T) + Sync + Send>,
34}
35
36impl<T: PoolObjectTrait> PoolReturner<T> {
37    fn return_element(&self, mut el: T) {
38        el.reset();
39        // Push it into the queue if the capacity allows it, else deallocates
40        let _ = self.returner.push(el);
41    }
42}
43
44impl<T: PoolObjectTrait> ObjectsPool<T> {
45    pub fn new(cache_size: usize, init_data: T::InitData) -> Self {
46        let queue = Arc::new(ArrayQueue::new(cache_size));
47
48        Self {
49            queue,
50            allocate_fn: Box::new(move || T::allocate_new(&init_data)),
51        }
52    }
53
54    pub fn alloc_object(&self) -> PoolObject<T> {
55        match self.queue.pop() {
56            Some(el) => {
57                // Element found in queue, return it
58                PoolObject::from_element(el, self)
59            }
60            None => PoolObject::from_element((self.allocate_fn)(), self),
61        }
62    }
63}
64
65pub struct PoolObject<T: PoolObjectTrait> {
66    pub(crate) value: ManuallyDrop<T>,
67    pub(crate) returner: Option<PoolReturner<T>>,
68}
69
70impl<T: PoolObjectTrait> PoolObject<T> {
71    fn from_element(value: T, pool: &ObjectsPool<T>) -> Self {
72        Self {
73            value: ManuallyDrop::new(value),
74            returner: Some(PoolReturner {
75                returner: pool.queue.clone(),
76            }),
77        }
78    }
79}
80
81impl<T: PoolObjectTrait> PoolObject<T> {
82    pub fn new_simple(value: T) -> Self {
83        Self {
84            value: ManuallyDrop::new(value),
85            returner: None,
86        }
87    }
88}
89
90impl<T: PoolObjectTrait> Deref for PoolObject<T> {
91    type Target = T;
92    #[inline(always)]
93    fn deref(&self) -> &Self::Target {
94        self.value.deref()
95    }
96}
97
98impl<T: PoolObjectTrait> DerefMut for PoolObject<T> {
99    #[inline(always)]
100    fn deref_mut(&mut self) -> &mut Self::Target {
101        self.value.deref_mut()
102    }
103}
104
105impl<T: PoolObjectTrait> Drop for PoolObject<T> {
106    fn drop(&mut self) {
107        if let Some(returner) = &self.returner {
108            returner.return_element(unsafe { ManuallyDrop::take(&mut self.value) });
109        } else {
110            unsafe { ManuallyDrop::drop(&mut self.value) }
111        }
112    }
113}