parallel_processor/execution_manager/
executor.rs1use crate::execution_manager::async_channel::DoublePriorityAsyncChannel;
2use crate::execution_manager::execution_context::{
3 ExecutionContext, ExecutorDropper, PacketsChannel,
4};
5use crate::execution_manager::executor_address::{ExecutorAddress, WeakExecutorAddress};
6use crate::execution_manager::memory_tracker::MemoryTracker;
7use crate::execution_manager::objects_pool::PoolObject;
8use crate::execution_manager::packet::{Packet, PacketTrait, PacketsPool};
9use crate::scheduler::{PriorityScheduler, ThreadPriorityHandle};
10use std::any::Any;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use tokio::runtime::Handle;
17
18static EXECUTOR_GLOBAL_ID: AtomicU64 = AtomicU64::new(0);
19
20pub trait AsyncExecutor: Sized + Send + Sync + 'static {
21 type InputPacket: Send + Sync + 'static;
22 type OutputPacket: PacketTrait + Send + Sync + 'static;
23 type GlobalParams: Send + Sync + 'static;
24 type InitData: Send + Sync + Clone + 'static;
25
26 fn generate_new_address(data: Self::InitData) -> ExecutorAddress {
27 let exec = ExecutorAddress {
28 executor_keeper: Arc::new(ExecutorDropper::new()),
29 init_data: Arc::new(data),
30 executor_type_id: std::any::TypeId::of::<Self>(),
31 executor_internal_id: EXECUTOR_GLOBAL_ID.fetch_add(1, Ordering::Relaxed),
32 };
33 exec
34 }
35
36 fn new() -> Self;
37
38 fn async_executor_main<'a>(
39 &'a mut self,
40 global_params: &'a Self::GlobalParams,
41 receiver: ExecutorReceiver<Self>,
42 memory_tracker: MemoryTracker<Self>,
43 ) -> impl Future<Output = ()> + Send + 'a;
44}
45
46pub struct ExecutorReceiver<E: AsyncExecutor> {
47 pub(crate) context: Arc<ExecutionContext>,
48 pub(crate) addresses_channel: DoublePriorityAsyncChannel<(
49 WeakExecutorAddress,
50 Arc<AtomicU64>,
51 Arc<PoolObject<PacketsChannel>>,
52 Arc<dyn Any + Sync + Send + 'static>,
53 )>,
54 pub(crate) _phantom: PhantomData<E>,
55}
56
57impl<E: AsyncExecutor> ExecutorReceiver<E> {
58 pub async fn obtain_address_with_priority(
65 &mut self,
66 priority: usize,
67 thread_handle: &ThreadPriorityHandle,
68 ) -> Result<(ExecutorAddressOperations<E>, Arc<E::InitData>), ()> {
69 PriorityScheduler::execute_blocking_call_async(thread_handle, async {
70 let (addr, counter, channel, init_data) =
71 self.addresses_channel.recv_offset(priority).await?;
72
73 Ok((
74 ExecutorAddressOperations {
75 addr,
76 counter,
77 channel,
78 context: self.context.clone(),
79 is_finished: AtomicBool::new(false),
80 _phantom: PhantomData,
81 },
82 init_data.downcast().unwrap(),
83 ))
84 })
85 .await
86 }
87}
88
89pub struct ExecutorAddressOperations<'a, E: AsyncExecutor> {
90 addr: WeakExecutorAddress,
91 counter: Arc<AtomicU64>,
92 channel: Arc<PoolObject<PacketsChannel>>,
93 context: Arc<ExecutionContext>,
94 is_finished: AtomicBool,
95 _phantom: PhantomData<&'a E>,
96}
97impl<'a, E: AsyncExecutor> ExecutorAddressOperations<'a, E> {
98 pub async fn receive_packet(
99 &self,
100 handle: &ThreadPriorityHandle,
101 ) -> Option<Packet<E::InputPacket>> {
102 if self.is_finished.load(Ordering::SeqCst) {
103 return None;
104 }
105
106 PriorityScheduler::execute_blocking_call_async(handle, async {
107 match self.channel.recv().await {
108 Ok(packet) => Some(packet.downcast()),
109 Err(()) => {
110 self.is_finished.store(true, Ordering::SeqCst);
111 None
112 }
113 }
114 })
115 .await
116 }
117 pub fn declare_addresses(&self, addresses: Vec<ExecutorAddress>, priority: usize) {
118 self.context.register_executors_batch(addresses, priority);
119 }
120 pub async fn pool_alloc_await(
121 &self,
122 new_size: usize,
123 handle: &ThreadPriorityHandle,
124 force: bool,
125 ) -> Arc<PoolObject<PacketsPool<E::OutputPacket>>> {
126 let pool = PriorityScheduler::execute_blocking_call_async(handle, async {
127 self.context.allocate_pool::<E>(force).await.unwrap()
128 })
129 .await;
130 pool.set_size(new_size);
131 pool
132 }
133 pub fn packet_send(
134 &self,
135 address: ExecutorAddress,
136 packet: Packet<E::OutputPacket>,
137 handle: &ThreadPriorityHandle,
138 ) {
139 PriorityScheduler::execute_blocking_call(handle, || {
140 self.context.send_packet(address, packet);
141 })
142 }
143
144 pub fn get_context(&self) -> &ExecutionContext {
145 &self.context
146 }
147
148 pub fn make_spawner(&self) -> ExecutorsSpawner<'a> {
149 ExecutorsSpawner {
150 handles: Vec::new(),
151 _phantom: PhantomData,
152 }
153 }
154
155 pub fn get_address(&self) -> WeakExecutorAddress {
156 self.addr
157 }
158}
159
160impl<'a, E: AsyncExecutor> Drop for ExecutorAddressOperations<'a, E> {
161 fn drop(&mut self) {
162 if self.counter.fetch_sub(1, Ordering::SeqCst) <= 1 {
163 self.context.wait_condvar.notify_all();
164 }
165 }
166}
167
168pub struct ExecutorsSpawner<'a> {
169 handles: Vec<tokio::task::JoinHandle<()>>,
170 _phantom: PhantomData<&'a ()>,
171}
172
173impl<'a> ExecutorsSpawner<'a> {
174 pub fn spawn_executor(&mut self, executor: impl Future<Output = ()> + 'a) {
175 let current_runtime = Handle::current();
176 let executor = unsafe {
177 std::mem::transmute::<_, Pin<Box<dyn Future<Output = ()> + Send>>>(
178 Box::pin(executor) as Pin<Box<dyn Future<Output = ()>>>
179 )
180 };
181 self.handles.push(current_runtime.spawn(executor));
182 }
183
184 pub async fn executors_await(&mut self) {
185 for handle in self.handles.drain(..) {
186 handle.await.unwrap();
187 }
188 }
189}
190
191impl<'a> Drop for ExecutorsSpawner<'a> {
192 fn drop(&mut self) {
193 if self.handles.len() > 0 {
194 panic!("Executors not awaited!");
195 }
196 }
197}