parallel_processor/execution_manager/
packet.rs1use crate::execution_manager::async_channel::AsyncChannel;
2use crate::execution_manager::memory_tracker::MemoryTrackerManager;
3use crate::execution_manager::objects_pool::{ObjectsPool, PoolObjectTrait};
4use std::any::Any;
5use std::cell::UnsafeCell;
6use std::mem::ManuallyDrop;
7use std::ops::{Deref, DerefMut};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11pub trait PacketTrait: PoolObjectTrait + Sync + Send {
12 fn get_size(&self) -> usize;
13}
14
15trait PacketPoolReturnerTrait: Send + Sync {
16 fn send_any(&self, packet: Box<dyn Any>);
17}
18
19impl<T: Send + Sync + PacketTrait + 'static> PacketPoolReturnerTrait
20 for (
21 Arc<(AsyncChannel<Box<T>>, AtomicU64)>,
22 Arc<MemoryTrackerManager>,
23 )
24{
25 fn send_any(&self, packet: Box<dyn Any>) {
26 self.0 .1.fetch_sub(1, Ordering::Relaxed);
27 let mut packet = packet.downcast::<T>().unwrap();
28 self.1.remove_queue_packet(packet.deref());
29 packet.reset();
30 let _ = self.0 .0.send(packet, true);
31 }
32}
33
34pub struct Packet<T: 'static> {
35 object: ManuallyDrop<Box<T>>,
36 returner: Option<Arc<dyn PacketPoolReturnerTrait>>,
37 _not_sync: std::marker::PhantomData<UnsafeCell<()>>,
38}
39
40pub struct PacketAny {
41 object: ManuallyDrop<Box<dyn Any + Send + Sync>>,
42 returner: Option<Arc<dyn PacketPoolReturnerTrait>>,
43}
44
45pub struct PacketsPool<T: Sync + Send + 'static> {
46 objects_pool: ObjectsPool<Box<T>>,
47 returner: Arc<(
48 Arc<(AsyncChannel<Box<T>>, AtomicU64)>,
49 Arc<MemoryTrackerManager>,
50 )>,
51}
52
53impl<T: PacketTrait> PoolObjectTrait for PacketsPool<T> {
55 type InitData = (usize, T::InitData, Arc<MemoryTrackerManager>);
56
57 fn allocate_new((cap, init_data, mem_tracker): &Self::InitData) -> Self {
58 Self::new(*cap, init_data.clone(), mem_tracker)
59 }
60
61 fn reset(&mut self) {}
62}
63
64impl<T: PacketTrait> PacketsPool<T> {
65 pub fn new(
66 cap: usize,
67 init_data: T::InitData,
68 mem_tracker: &Arc<MemoryTrackerManager>,
69 ) -> Self {
70 let objects_pool = ObjectsPool::new(cap, init_data);
71 let returner = Arc::new((objects_pool.returner.clone(), mem_tracker.clone()));
72 Self {
73 objects_pool,
74 returner,
75 }
76 }
77
78 pub async fn alloc_packet(&self) -> Packet<T> {
79 let mut object = self.objects_pool.alloc_object().await;
80
81 let packet = Packet {
82 object: ManuallyDrop::new(unsafe { ManuallyDrop::take(&mut object.value) }),
83 returner: Some(self.returner.clone()),
84 _not_sync: std::marker::PhantomData,
85 };
86
87 unsafe {
88 std::ptr::drop_in_place(&mut object.returner);
89 std::mem::forget(object);
90 }
91
92 packet
93 }
94
95 pub fn set_size(&self, new_size: usize) {
96 self.objects_pool.set_size(new_size);
97 }
98
99 pub fn alloc_packet_blocking(&self) -> Packet<T> {
100 let mut object = self.objects_pool.alloc_object_blocking();
101
102 let packet = Packet {
103 object: ManuallyDrop::new(unsafe { ManuallyDrop::take(&mut object.value) }),
104 returner: Some(self.returner.clone()),
105 _not_sync: std::marker::PhantomData,
106 };
107
108 unsafe {
109 std::ptr::drop_in_place(&mut object.returner);
110 std::mem::forget(object);
111 }
112
113 packet
114 }
115
116 pub fn get_available_items(&self) -> i64 {
117 self.objects_pool.get_available_items()
118 }
119
120 }
124
125impl<T: Any + Send + Sync> Packet<T> {
126 pub fn new_simple(data: T) -> Self {
127 Packet {
128 object: ManuallyDrop::new(Box::new(data)),
129 returner: None,
130 _not_sync: std::marker::PhantomData,
131 }
132 }
133
134 pub fn upcast(mut self) -> PacketAny {
135 let packet = PacketAny {
136 object: ManuallyDrop::<Box<dyn Any + Send + Sync>>::new(unsafe {
137 ManuallyDrop::<Box<T>>::take(&mut self.object)
138 }),
139 returner: self.returner.clone(),
140 };
141
142 unsafe {
143 std::ptr::drop_in_place(&mut self.returner);
144 std::mem::forget(self);
145 }
146
147 packet
148 }
149}
150
151impl<T: Any + Send + Sync> Deref for Packet<T> {
152 type Target = T;
153 #[inline(always)]
154 fn deref(&self) -> &Self::Target {
155 self.object.as_ref()
156 }
157}
158
159impl<T: Any + Send + Sync> DerefMut for Packet<T> {
160 #[inline(always)]
161 fn deref_mut(&mut self) -> &mut Self::Target {
162 self.object.as_mut()
163 }
164}
165
166impl PacketAny {
167 pub fn downcast<T: 'static>(mut self) -> Packet<T> {
168 let packet = Packet {
169 object: ManuallyDrop::new(unsafe {
170 ManuallyDrop::take(&mut self.object).downcast().unwrap()
171 }),
172 returner: self.returner.clone(),
173 _not_sync: std::marker::PhantomData,
174 };
175
176 unsafe {
177 std::ptr::drop_in_place(&mut self.returner);
178 std::mem::forget(self);
179 }
180
181 packet
182 }
183}
184
185impl<T: 'static> Drop for Packet<T> {
186 fn drop(&mut self) {
187 if let Some(returner) = &self.returner {
188 returner.send_any(unsafe { ManuallyDrop::<Box<T>>::take(&mut self.object) });
189 } else {
190 unsafe { ManuallyDrop::drop(&mut self.object) }
191 }
192 }
193}
194
195impl Drop for PacketAny {
196 fn drop(&mut self) {
197 }
199}
200
201impl PoolObjectTrait for () {
202 type InitData = ();
203 fn allocate_new(_init_data: &Self::InitData) -> Self {
204 panic!("Cannot create () type as object!");
205 }
206
207 fn reset(&mut self) {
208 panic!("Cannot reset () type as object!");
209 }
210}
211impl PacketTrait for () {
212 fn get_size(&self) -> usize {
213 0
214 }
215}