parallel_processor/execution_manager/
packet.rs

1use crate::execution_manager::async_channel::AsyncChannel;
2use crate::execution_manager::memory_tracker::MemoryTrackerManager;
3use crate::execution_manager::objects_pool::{ObjectsPool, PoolObjectTrait};
4use std::any::Any;
5use std::cell::UnsafeCell;
6use std::mem::ManuallyDrop;
7use std::ops::{Deref, DerefMut};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11pub trait PacketTrait: PoolObjectTrait + Sync + Send {
12    fn get_size(&self) -> usize;
13}
14
15trait PacketPoolReturnerTrait: Send + Sync {
16    fn send_any(&self, packet: Box<dyn Any>);
17}
18
19impl<T: Send + Sync + PacketTrait + 'static> PacketPoolReturnerTrait
20    for (
21        Arc<(AsyncChannel<Box<T>>, AtomicU64)>,
22        Arc<MemoryTrackerManager>,
23    )
24{
25    fn send_any(&self, packet: Box<dyn Any>) {
26        self.0 .1.fetch_sub(1, Ordering::Relaxed);
27        let mut packet = packet.downcast::<T>().unwrap();
28        self.1.remove_queue_packet(packet.deref());
29        packet.reset();
30        let _ = self.0 .0.send(packet, true);
31    }
32}
33
34pub struct Packet<T: 'static> {
35    object: ManuallyDrop<Box<T>>,
36    returner: Option<Arc<dyn PacketPoolReturnerTrait>>,
37    _not_sync: std::marker::PhantomData<UnsafeCell<()>>,
38}
39
40pub struct PacketAny {
41    object: ManuallyDrop<Box<dyn Any + Send + Sync>>,
42    returner: Option<Arc<dyn PacketPoolReturnerTrait>>,
43}
44
45pub struct PacketsPool<T: Sync + Send + 'static> {
46    objects_pool: ObjectsPool<Box<T>>,
47    returner: Arc<(
48        Arc<(AsyncChannel<Box<T>>, AtomicU64)>,
49        Arc<MemoryTrackerManager>,
50    )>,
51}
52
53// Recursively implement the object trait for the pool, so it can be used recursively
54impl<T: PacketTrait> PoolObjectTrait for PacketsPool<T> {
55    type InitData = (usize, T::InitData, Arc<MemoryTrackerManager>);
56
57    fn allocate_new((cap, init_data, mem_tracker): &Self::InitData) -> Self {
58        Self::new(*cap, init_data.clone(), mem_tracker)
59    }
60
61    fn reset(&mut self) {}
62}
63
64impl<T: PacketTrait> PacketsPool<T> {
65    pub fn new(
66        cap: usize,
67        init_data: T::InitData,
68        mem_tracker: &Arc<MemoryTrackerManager>,
69    ) -> Self {
70        let objects_pool = ObjectsPool::new(cap, init_data);
71        let returner = Arc::new((objects_pool.returner.clone(), mem_tracker.clone()));
72        Self {
73            objects_pool,
74            returner,
75        }
76    }
77
78    pub async fn alloc_packet(&self) -> Packet<T> {
79        let mut object = self.objects_pool.alloc_object().await;
80
81        let packet = Packet {
82            object: ManuallyDrop::new(unsafe { ManuallyDrop::take(&mut object.value) }),
83            returner: Some(self.returner.clone()),
84            _not_sync: std::marker::PhantomData,
85        };
86
87        unsafe {
88            std::ptr::drop_in_place(&mut object.returner);
89            std::mem::forget(object);
90        }
91
92        packet
93    }
94
95    pub fn set_size(&self, new_size: usize) {
96        self.objects_pool.set_size(new_size);
97    }
98
99    pub fn alloc_packet_blocking(&self) -> Packet<T> {
100        let mut object = self.objects_pool.alloc_object_blocking();
101
102        let packet = Packet {
103            object: ManuallyDrop::new(unsafe { ManuallyDrop::take(&mut object.value) }),
104            returner: Some(self.returner.clone()),
105            _not_sync: std::marker::PhantomData,
106        };
107
108        unsafe {
109            std::ptr::drop_in_place(&mut object.returner);
110            std::mem::forget(object);
111        }
112
113        packet
114    }
115
116    pub fn get_available_items(&self) -> i64 {
117        self.objects_pool.get_available_items()
118    }
119
120    // pub fn wait_for_item_timeout(&self, timeout: Duration) {
121    //     self.objects_pool.wait_for_item_timeout(timeout)
122    // }
123}
124
125impl<T: Any + Send + Sync> Packet<T> {
126    pub fn new_simple(data: T) -> Self {
127        Packet {
128            object: ManuallyDrop::new(Box::new(data)),
129            returner: None,
130            _not_sync: std::marker::PhantomData,
131        }
132    }
133
134    pub fn upcast(mut self) -> PacketAny {
135        let packet = PacketAny {
136            object: ManuallyDrop::<Box<dyn Any + Send + Sync>>::new(unsafe {
137                ManuallyDrop::<Box<T>>::take(&mut self.object)
138            }),
139            returner: self.returner.clone(),
140        };
141
142        unsafe {
143            std::ptr::drop_in_place(&mut self.returner);
144            std::mem::forget(self);
145        }
146
147        packet
148    }
149}
150
151impl<T: Any + Send + Sync> Deref for Packet<T> {
152    type Target = T;
153    #[inline(always)]
154    fn deref(&self) -> &Self::Target {
155        self.object.as_ref()
156    }
157}
158
159impl<T: Any + Send + Sync> DerefMut for Packet<T> {
160    #[inline(always)]
161    fn deref_mut(&mut self) -> &mut Self::Target {
162        self.object.as_mut()
163    }
164}
165
166impl PacketAny {
167    pub fn downcast<T: 'static>(mut self) -> Packet<T> {
168        let packet = Packet {
169            object: ManuallyDrop::new(unsafe {
170                ManuallyDrop::take(&mut self.object).downcast().unwrap()
171            }),
172            returner: self.returner.clone(),
173            _not_sync: std::marker::PhantomData,
174        };
175
176        unsafe {
177            std::ptr::drop_in_place(&mut self.returner);
178            std::mem::forget(self);
179        }
180
181        packet
182    }
183}
184
185impl<T: 'static> Drop for Packet<T> {
186    fn drop(&mut self) {
187        if let Some(returner) = &self.returner {
188            returner.send_any(unsafe { ManuallyDrop::<Box<T>>::take(&mut self.object) });
189        } else {
190            unsafe { ManuallyDrop::drop(&mut self.object) }
191        }
192    }
193}
194
195impl Drop for PacketAny {
196    fn drop(&mut self) {
197        // panic!("Cannot drop packet any!");
198    }
199}
200
201impl PoolObjectTrait for () {
202    type InitData = ();
203    fn allocate_new(_init_data: &Self::InitData) -> Self {
204        panic!("Cannot create () type as object!");
205    }
206
207    fn reset(&mut self) {
208        panic!("Cannot reset () type as object!");
209    }
210}
211impl PacketTrait for () {
212    fn get_size(&self) -> usize {
213        0
214    }
215}