parallel_processor/execution_manager/
objects_pool.rs

1use crate::execution_manager::async_channel::AsyncChannel;
2use std::cmp::max;
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8pub trait PoolObjectTrait: Send + Sync + 'static {
9    type InitData: Clone + Sync + Send;
10
11    fn allocate_new(init_data: &Self::InitData) -> Self;
12    fn reset(&mut self);
13}
14
15impl<T: PoolObjectTrait> PoolObjectTrait for Box<T> {
16    type InitData = T::InitData;
17
18    fn allocate_new(init_data: &Self::InitData) -> Self {
19        Box::new(T::allocate_new(init_data))
20    }
21    fn reset(&mut self) {
22        T::reset(self);
23    }
24}
25
26pub struct ObjectsPool<T: Sync + Send + 'static> {
27    queue: AsyncChannel<T>,
28    pub(crate) returner: Arc<(AsyncChannel<T>, AtomicU64)>,
29    allocate_fn: Box<dyn (Fn() -> T) + Sync + Send>,
30    max_count: u64,
31    temp_max_count: AtomicU64,
32}
33
34pub trait PoolReturner<T: Send + Sync>: Send + Sync {
35    fn return_element(&self, el: T);
36}
37
38impl<T: PoolObjectTrait> PoolReturner<T> for (AsyncChannel<T>, AtomicU64) {
39    fn return_element(&self, mut el: T) {
40        self.1.fetch_sub(1, Ordering::Relaxed);
41        el.reset();
42        self.0.send(el, true);
43    }
44}
45
46impl<T: PoolObjectTrait> ObjectsPool<T> {
47    pub fn new(cap: usize, init_data: T::InitData) -> Self {
48        let channel = AsyncChannel::new(cap);
49
50        Self {
51            queue: channel.clone(),
52            returner: Arc::new((channel, AtomicU64::new(0))),
53            allocate_fn: Box::new(move || T::allocate_new(&init_data)),
54            max_count: cap as u64,
55            temp_max_count: AtomicU64::new(0),
56        }
57    }
58
59    pub fn set_size(&self, new_size: usize) {
60        self.temp_max_count
61            .store(new_size as u64, Ordering::Relaxed);
62    }
63
64    #[inline(always)]
65    async fn alloc_wait(&self) -> Result<T, ()> {
66        self.queue.recv().await
67    }
68
69    #[inline(always)]
70    fn alloc_wait_blocking(&self) -> Result<T, ()> {
71        self.queue.recv_blocking()
72    }
73
74    pub async fn alloc_object(&self) -> PoolObject<T> {
75        let el_count = self.returner.1.fetch_add(1, Ordering::Relaxed);
76
77        if el_count >= max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) {
78            return PoolObject::from_element(self.alloc_wait().await.unwrap(), self);
79        }
80
81        match self.queue.try_recv() {
82            Some(el) => PoolObject::from_element(el, self),
83            None => PoolObject::from_element((self.allocate_fn)(), self),
84        }
85    }
86
87    pub fn alloc_object_blocking(&self) -> PoolObject<T> {
88        let el_count = self.returner.1.fetch_add(1, Ordering::Relaxed);
89
90        if el_count >= max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) {
91            return PoolObject::from_element(self.alloc_wait_blocking().unwrap(), self);
92        }
93
94        match self.queue.try_recv() {
95            Some(el) => PoolObject::from_element(el, self),
96            None => PoolObject::from_element((self.allocate_fn)(), self),
97        }
98    }
99
100    pub fn alloc_object_force(&self) -> PoolObject<T> {
101        self.returner.1.fetch_add(1, Ordering::Relaxed);
102        match self.queue.try_recv() {
103            Some(el) => PoolObject::from_element(el, self),
104            None => PoolObject::from_element((self.allocate_fn)(), self),
105        }
106    }
107
108    pub fn get_available_items(&self) -> i64 {
109        (max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) as i64)
110            - (self.returner.1.load(Ordering::Relaxed) as i64)
111    }
112
113    pub fn get_allocated_items(&self) -> i64 {
114        self.returner.1.load(Ordering::Relaxed) as i64
115    }
116
117    // pub fn wait_for_item_timeout(&self, timeout: Duration) {
118    //     if let Ok(recv) = self.queue.recv_timeout(timeout) {
119    //         let _ = self.returner.0.try_send(recv);
120    //     }
121    // }
122}
123
124pub struct PoolObject<T: Send + Sync> {
125    pub(crate) value: ManuallyDrop<T>,
126    pub(crate) returner: Option<Arc<dyn PoolReturner<T>>>,
127}
128
129impl<T: PoolObjectTrait> PoolObject<T> {
130    fn from_element(value: T, pool: &ObjectsPool<T>) -> Self {
131        Self {
132            value: ManuallyDrop::new(value),
133            returner: Some(pool.returner.clone()),
134        }
135    }
136}
137
138impl<T: Send + Sync> PoolObject<T> {
139    pub fn new_simple(value: T) -> Self {
140        Self {
141            value: ManuallyDrop::new(value),
142            returner: None,
143        }
144    }
145}
146
147impl<T: Send + Sync> Deref for PoolObject<T> {
148    type Target = T;
149    #[inline(always)]
150    fn deref(&self) -> &Self::Target {
151        self.value.deref()
152    }
153}
154
155impl<T: Send + Sync> DerefMut for PoolObject<T> {
156    #[inline(always)]
157    fn deref_mut(&mut self) -> &mut Self::Target {
158        self.value.deref_mut()
159    }
160}
161
162impl<T: Send + Sync> Drop for PoolObject<T> {
163    fn drop(&mut self) {
164        if let Some(returner) = &self.returner {
165            returner.return_element(unsafe { ManuallyDrop::take(&mut self.value) });
166        } else {
167            unsafe { ManuallyDrop::drop(&mut self.value) }
168        }
169    }
170}