parallel_processor/execution_manager/
objects_pool.rs1use crate::execution_manager::async_channel::AsyncChannel;
2use std::cmp::max;
3use std::mem::ManuallyDrop;
4use std::ops::{Deref, DerefMut};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8pub trait PoolObjectTrait: Send + Sync + 'static {
9 type InitData: Clone + Sync + Send;
10
11 fn allocate_new(init_data: &Self::InitData) -> Self;
12 fn reset(&mut self);
13}
14
15impl<T: PoolObjectTrait> PoolObjectTrait for Box<T> {
16 type InitData = T::InitData;
17
18 fn allocate_new(init_data: &Self::InitData) -> Self {
19 Box::new(T::allocate_new(init_data))
20 }
21 fn reset(&mut self) {
22 T::reset(self);
23 }
24}
25
26pub struct ObjectsPool<T: Sync + Send + 'static> {
27 queue: AsyncChannel<T>,
28 pub(crate) returner: Arc<(AsyncChannel<T>, AtomicU64)>,
29 allocate_fn: Box<dyn (Fn() -> T) + Sync + Send>,
30 max_count: u64,
31 temp_max_count: AtomicU64,
32}
33
34pub trait PoolReturner<T: Send + Sync>: Send + Sync {
35 fn return_element(&self, el: T);
36}
37
38impl<T: PoolObjectTrait> PoolReturner<T> for (AsyncChannel<T>, AtomicU64) {
39 fn return_element(&self, mut el: T) {
40 self.1.fetch_sub(1, Ordering::Relaxed);
41 el.reset();
42 self.0.send(el, true);
43 }
44}
45
46impl<T: PoolObjectTrait> ObjectsPool<T> {
47 pub fn new(cap: usize, init_data: T::InitData) -> Self {
48 let channel = AsyncChannel::new(cap);
49
50 Self {
51 queue: channel.clone(),
52 returner: Arc::new((channel, AtomicU64::new(0))),
53 allocate_fn: Box::new(move || T::allocate_new(&init_data)),
54 max_count: cap as u64,
55 temp_max_count: AtomicU64::new(0),
56 }
57 }
58
59 pub fn set_size(&self, new_size: usize) {
60 self.temp_max_count
61 .store(new_size as u64, Ordering::Relaxed);
62 }
63
64 #[inline(always)]
65 async fn alloc_wait(&self) -> Result<T, ()> {
66 self.queue.recv().await
67 }
68
69 #[inline(always)]
70 fn alloc_wait_blocking(&self) -> Result<T, ()> {
71 self.queue.recv_blocking()
72 }
73
74 pub async fn alloc_object(&self) -> PoolObject<T> {
75 let el_count = self.returner.1.fetch_add(1, Ordering::Relaxed);
76
77 if el_count >= max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) {
78 return PoolObject::from_element(self.alloc_wait().await.unwrap(), self);
79 }
80
81 match self.queue.try_recv() {
82 Some(el) => PoolObject::from_element(el, self),
83 None => PoolObject::from_element((self.allocate_fn)(), self),
84 }
85 }
86
87 pub fn alloc_object_blocking(&self) -> PoolObject<T> {
88 let el_count = self.returner.1.fetch_add(1, Ordering::Relaxed);
89
90 if el_count >= max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) {
91 return PoolObject::from_element(self.alloc_wait_blocking().unwrap(), self);
92 }
93
94 match self.queue.try_recv() {
95 Some(el) => PoolObject::from_element(el, self),
96 None => PoolObject::from_element((self.allocate_fn)(), self),
97 }
98 }
99
100 pub fn alloc_object_force(&self) -> PoolObject<T> {
101 self.returner.1.fetch_add(1, Ordering::Relaxed);
102 match self.queue.try_recv() {
103 Some(el) => PoolObject::from_element(el, self),
104 None => PoolObject::from_element((self.allocate_fn)(), self),
105 }
106 }
107
108 pub fn get_available_items(&self) -> i64 {
109 (max(self.max_count, self.temp_max_count.load(Ordering::Relaxed)) as i64)
110 - (self.returner.1.load(Ordering::Relaxed) as i64)
111 }
112
113 pub fn get_allocated_items(&self) -> i64 {
114 self.returner.1.load(Ordering::Relaxed) as i64
115 }
116
117 }
123
124pub struct PoolObject<T: Send + Sync> {
125 pub(crate) value: ManuallyDrop<T>,
126 pub(crate) returner: Option<Arc<dyn PoolReturner<T>>>,
127}
128
129impl<T: PoolObjectTrait> PoolObject<T> {
130 fn from_element(value: T, pool: &ObjectsPool<T>) -> Self {
131 Self {
132 value: ManuallyDrop::new(value),
133 returner: Some(pool.returner.clone()),
134 }
135 }
136}
137
138impl<T: Send + Sync> PoolObject<T> {
139 pub fn new_simple(value: T) -> Self {
140 Self {
141 value: ManuallyDrop::new(value),
142 returner: None,
143 }
144 }
145}
146
147impl<T: Send + Sync> Deref for PoolObject<T> {
148 type Target = T;
149 #[inline(always)]
150 fn deref(&self) -> &Self::Target {
151 self.value.deref()
152 }
153}
154
155impl<T: Send + Sync> DerefMut for PoolObject<T> {
156 #[inline(always)]
157 fn deref_mut(&mut self) -> &mut Self::Target {
158 self.value.deref_mut()
159 }
160}
161
162impl<T: Send + Sync> Drop for PoolObject<T> {
163 fn drop(&mut self) {
164 if let Some(returner) = &self.returner {
165 returner.return_element(unsafe { ManuallyDrop::take(&mut self.value) });
166 } else {
167 unsafe { ManuallyDrop::drop(&mut self.value) }
168 }
169 }
170}