parallel_processor/execution_manager/
executor.rs

1use crate::execution_manager::async_channel::DoublePriorityAsyncChannel;
2use crate::execution_manager::execution_context::{
3    ExecutionContext, ExecutorDropper, PacketsChannel,
4};
5use crate::execution_manager::executor_address::{ExecutorAddress, WeakExecutorAddress};
6use crate::execution_manager::memory_tracker::MemoryTracker;
7use crate::execution_manager::objects_pool::PoolObject;
8use crate::execution_manager::packet::{Packet, PacketTrait, PacketsPool};
9use crate::scheduler::{PriorityScheduler, ThreadPriorityHandle};
10use std::any::Any;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use tokio::runtime::Handle;
17
18static EXECUTOR_GLOBAL_ID: AtomicU64 = AtomicU64::new(0);
19
20pub trait AsyncExecutor: Sized + Send + Sync + 'static {
21    type InputPacket: Send + Sync + 'static;
22    type OutputPacket: PacketTrait + Send + Sync + 'static;
23    type GlobalParams: Send + Sync + 'static;
24    type InitData: Send + Sync + Clone + 'static;
25
26    fn generate_new_address(data: Self::InitData) -> ExecutorAddress {
27        let exec = ExecutorAddress {
28            executor_keeper: Arc::new(ExecutorDropper::new()),
29            init_data: Arc::new(data),
30            executor_type_id: std::any::TypeId::of::<Self>(),
31            executor_internal_id: EXECUTOR_GLOBAL_ID.fetch_add(1, Ordering::Relaxed),
32        };
33        exec
34    }
35
36    fn new() -> Self;
37
38    fn async_executor_main<'a>(
39        &'a mut self,
40        global_params: &'a Self::GlobalParams,
41        receiver: ExecutorReceiver<Self>,
42        memory_tracker: MemoryTracker<Self>,
43    ) -> impl Future<Output = ()> + Send + 'a;
44}
45
46pub struct ExecutorReceiver<E: AsyncExecutor> {
47    pub(crate) context: Arc<ExecutionContext>,
48    pub(crate) addresses_channel: DoublePriorityAsyncChannel<(
49        WeakExecutorAddress,
50        Arc<AtomicU64>,
51        Arc<PoolObject<PacketsChannel>>,
52        Arc<dyn Any + Sync + Send + 'static>,
53    )>,
54    pub(crate) _phantom: PhantomData<E>,
55}
56
57impl<E: AsyncExecutor> ExecutorReceiver<E> {
58    // pub async fn obtain_address(
59    //     &mut self,
60    // ) -> Result<(ExecutorAddressOperations<E>, Arc<E::InitData>), ()> {
61    //     self.obtain_address_with_priority(0).await
62    // }
63
64    pub async fn obtain_address_with_priority(
65        &mut self,
66        priority: usize,
67        thread_handle: &ThreadPriorityHandle,
68    ) -> Result<(ExecutorAddressOperations<E>, Arc<E::InitData>), ()> {
69        PriorityScheduler::execute_blocking_call_async(thread_handle, async {
70            let (addr, counter, channel, init_data) =
71                self.addresses_channel.recv_offset(priority).await?;
72
73            Ok((
74                ExecutorAddressOperations {
75                    addr,
76                    counter,
77                    channel,
78                    context: self.context.clone(),
79                    is_finished: AtomicBool::new(false),
80                    _phantom: PhantomData,
81                },
82                init_data.downcast().unwrap(),
83            ))
84        })
85        .await
86    }
87}
88
89pub struct ExecutorAddressOperations<'a, E: AsyncExecutor> {
90    addr: WeakExecutorAddress,
91    counter: Arc<AtomicU64>,
92    channel: Arc<PoolObject<PacketsChannel>>,
93    context: Arc<ExecutionContext>,
94    is_finished: AtomicBool,
95    _phantom: PhantomData<&'a E>,
96}
97impl<'a, E: AsyncExecutor> ExecutorAddressOperations<'a, E> {
98    pub async fn receive_packet(
99        &self,
100        handle: &ThreadPriorityHandle,
101    ) -> Option<Packet<E::InputPacket>> {
102        if self.is_finished.load(Ordering::SeqCst) {
103            return None;
104        }
105
106        PriorityScheduler::execute_blocking_call_async(handle, async {
107            match self.channel.recv().await {
108                Ok(packet) => Some(packet.downcast()),
109                Err(()) => {
110                    self.is_finished.store(true, Ordering::SeqCst);
111                    None
112                }
113            }
114        })
115        .await
116    }
117    pub fn declare_addresses(&self, addresses: Vec<ExecutorAddress>, priority: usize) {
118        self.context.register_executors_batch(addresses, priority);
119    }
120    pub async fn pool_alloc_await(
121        &self,
122        new_size: usize,
123        handle: &ThreadPriorityHandle,
124        force: bool,
125    ) -> Arc<PoolObject<PacketsPool<E::OutputPacket>>> {
126        let pool = PriorityScheduler::execute_blocking_call_async(handle, async {
127            self.context.allocate_pool::<E>(force).await.unwrap()
128        })
129        .await;
130        pool.set_size(new_size);
131        pool
132    }
133    pub fn packet_send(
134        &self,
135        address: ExecutorAddress,
136        packet: Packet<E::OutputPacket>,
137        handle: &ThreadPriorityHandle,
138    ) {
139        PriorityScheduler::execute_blocking_call(handle, || {
140            self.context.send_packet(address, packet);
141        })
142    }
143
144    pub fn get_context(&self) -> &ExecutionContext {
145        &self.context
146    }
147
148    pub fn make_spawner(&self) -> ExecutorsSpawner<'a> {
149        ExecutorsSpawner {
150            handles: Vec::new(),
151            _phantom: PhantomData,
152        }
153    }
154
155    pub fn get_address(&self) -> WeakExecutorAddress {
156        self.addr
157    }
158}
159
160impl<'a, E: AsyncExecutor> Drop for ExecutorAddressOperations<'a, E> {
161    fn drop(&mut self) {
162        if self.counter.fetch_sub(1, Ordering::SeqCst) <= 1 {
163            self.context.wait_condvar.notify_all();
164        }
165    }
166}
167
168pub struct ExecutorsSpawner<'a> {
169    handles: Vec<tokio::task::JoinHandle<()>>,
170    _phantom: PhantomData<&'a ()>,
171}
172
173impl<'a> ExecutorsSpawner<'a> {
174    pub fn spawn_executor(&mut self, executor: impl Future<Output = ()> + 'a) {
175        let current_runtime = Handle::current();
176        let executor = unsafe {
177            std::mem::transmute::<_, Pin<Box<dyn Future<Output = ()> + Send>>>(
178                Box::pin(executor) as Pin<Box<dyn Future<Output = ()>>>
179            )
180        };
181        self.handles.push(current_runtime.spawn(executor));
182    }
183
184    pub async fn executors_await(&mut self) {
185        for handle in self.handles.drain(..) {
186            handle.await.unwrap();
187        }
188    }
189}
190
191impl<'a> Drop for ExecutorsSpawner<'a> {
192    fn drop(&mut self) {
193        if self.handles.len() > 0 {
194            panic!("Executors not awaited!");
195        }
196    }
197}