parallel_processor/execution_manager/
executor.rs

1use crate::execution_manager::async_channel::DoublePriorityAsyncChannel;
2use crate::execution_manager::execution_context::{
3    ExecutionContext, ExecutorDropper, PacketsChannel,
4};
5use crate::execution_manager::executor_address::{ExecutorAddress, WeakExecutorAddress};
6use crate::execution_manager::memory_tracker::MemoryTracker;
7use crate::execution_manager::objects_pool::PoolObject;
8use crate::execution_manager::packet::{Packet, PacketTrait, PacketsPool};
9use crate::scheduler::{PriorityScheduler, ThreadPriorityHandle};
10use std::any::Any;
11use std::future::Future;
12use std::marker::PhantomData;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use tokio::runtime::Handle;
17
18static EXECUTOR_GLOBAL_ID: AtomicU64 = AtomicU64::new(0);
19
20pub trait AsyncExecutor: Sized + Send + Sync + 'static {
21    type InputPacket: Send + Sync + 'static;
22    type OutputPacket: PacketTrait + Send + Sync + 'static;
23    type GlobalParams: Send + Sync + 'static;
24    type InitData: Send + Sync + Clone + 'static;
25
26    fn generate_new_address(data: Self::InitData) -> ExecutorAddress {
27        let exec = ExecutorAddress {
28            executor_keeper: Arc::new(ExecutorDropper::new()),
29            init_data: Arc::new(data),
30            executor_type_id: std::any::TypeId::of::<Self>(),
31            executor_internal_id: EXECUTOR_GLOBAL_ID.fetch_add(1, Ordering::Relaxed),
32        };
33        exec
34    }
35
36    fn new() -> Self;
37
38    fn async_executor_main<'a>(
39        &'a mut self,
40        global_params: &'a Self::GlobalParams,
41        receiver: ExecutorReceiver<Self>,
42        memory_tracker: MemoryTracker<Self>,
43    ) -> impl Future<Output = ()> + Send + 'a;
44}
45
46pub struct ExecutorReceiver<E: AsyncExecutor> {
47    pub(crate) context: Arc<ExecutionContext>,
48    pub(crate) addresses_channel: DoublePriorityAsyncChannel<(
49        WeakExecutorAddress,
50        Arc<AtomicU64>,
51        Arc<PoolObject<PacketsChannel>>,
52        Arc<dyn Any + Sync + Send + 'static>,
53    )>,
54    pub(crate) _phantom: PhantomData<E>,
55}
56
57impl<E: AsyncExecutor> ExecutorReceiver<E> {
58    // pub async fn obtain_address(
59    //     &mut self,
60    // ) -> Result<(ExecutorAddressOperations<E>, Arc<E::InitData>), ()> {
61    //     self.obtain_address_with_priority(0).await
62    // }
63
64    pub async fn obtain_address_with_priority(
65        &mut self,
66        priority: usize,
67        thread_handle: &ThreadPriorityHandle,
68    ) -> Result<(ExecutorAddressOperations<E>, Arc<E::InitData>), ()> {
69        PriorityScheduler::execute_blocking_call_async(thread_handle, async {
70            let (addr, counter, channel, init_data) =
71                self.addresses_channel.recv_offset(priority).await?;
72
73            Ok((
74                ExecutorAddressOperations {
75                    addr,
76                    counter,
77                    channel,
78                    context: self.context.clone(),
79                    is_finished: AtomicBool::new(false),
80                    _phantom: PhantomData,
81                },
82                init_data.downcast().unwrap(),
83            ))
84        })
85        .await
86    }
87}
88
89pub struct ExecutorAddressOperations<'a, E: AsyncExecutor> {
90    addr: WeakExecutorAddress,
91    counter: Arc<AtomicU64>,
92    channel: Arc<PoolObject<PacketsChannel>>,
93    context: Arc<ExecutionContext>,
94    is_finished: AtomicBool,
95    _phantom: PhantomData<&'a E>,
96}
97impl<'a, E: AsyncExecutor> ExecutorAddressOperations<'a, E> {
98    pub async fn receive_packet(
99        &self,
100        handle: &ThreadPriorityHandle,
101    ) -> Option<Packet<E::InputPacket>> {
102        if self.is_finished.load(Ordering::SeqCst) {
103            return None;
104        }
105
106        PriorityScheduler::execute_blocking_call_async(handle, async {
107            match self.channel.recv().await {
108                Ok(packet) => Some(packet.downcast()),
109                Err(()) => {
110                    self.is_finished.store(true, Ordering::SeqCst);
111                    None
112                }
113            }
114        })
115        .await
116    }
117    pub fn declare_addresses(&self, addresses: Vec<ExecutorAddress>, priority: usize) {
118        self.context.register_executors_batch(addresses, priority);
119    }
120    pub async fn pool_alloc_await(
121        &self,
122        new_size: usize,
123        handle: &ThreadPriorityHandle,
124    ) -> Arc<PoolObject<PacketsPool<E::OutputPacket>>> {
125        let pool = PriorityScheduler::execute_blocking_call_async(handle, async {
126            self.context.allocate_pool::<E>(false).await.unwrap()
127        })
128        .await;
129        pool.set_size(new_size);
130        pool
131    }
132    pub fn packet_send(
133        &self,
134        address: ExecutorAddress,
135        packet: Packet<E::OutputPacket>,
136        handle: &ThreadPriorityHandle,
137    ) {
138        PriorityScheduler::execute_blocking_call(handle, || {
139            self.context.send_packet(address, packet);
140        })
141    }
142
143    pub fn get_context(&self) -> &ExecutionContext {
144        &self.context
145    }
146
147    pub fn make_spawner(&self) -> ExecutorsSpawner<'a> {
148        ExecutorsSpawner {
149            handles: Vec::new(),
150            _phantom: PhantomData,
151        }
152    }
153
154    pub fn get_address(&self) -> WeakExecutorAddress {
155        self.addr
156    }
157}
158
159impl<'a, E: AsyncExecutor> Drop for ExecutorAddressOperations<'a, E> {
160    fn drop(&mut self) {
161        if self.counter.fetch_sub(1, Ordering::SeqCst) <= 1 {
162            self.context.wait_condvar.notify_all();
163        }
164    }
165}
166
167pub struct ExecutorsSpawner<'a> {
168    handles: Vec<tokio::task::JoinHandle<()>>,
169    _phantom: PhantomData<&'a ()>,
170}
171
172impl<'a> ExecutorsSpawner<'a> {
173    pub fn spawn_executor(&mut self, executor: impl Future<Output = ()> + 'a) {
174        let current_runtime = Handle::current();
175        let executor = unsafe {
176            std::mem::transmute::<_, Pin<Box<dyn Future<Output = ()> + Send>>>(
177                Box::pin(executor) as Pin<Box<dyn Future<Output = ()>>>
178            )
179        };
180        self.handles.push(current_runtime.spawn(executor));
181    }
182
183    pub async fn executors_await(&mut self) {
184        for handle in self.handles.drain(..) {
185            handle.await.unwrap();
186        }
187    }
188}
189
190impl<'a> Drop for ExecutorsSpawner<'a> {
191    fn drop(&mut self) {
192        if self.handles.len() > 0 {
193            panic!("Executors not awaited!");
194        }
195    }
196}