parallel_processor/execution_manager/
executor.rs1use crate::execution_manager::objects_pool::{PoolObject, PoolObjectTrait};
2use crate::execution_manager::packet::{Packet, PacketTrait};
3use crate::execution_manager::packets_channel::bounded::{
4 PacketsChannelReceiverBounded, PacketsChannelSenderBounded,
5};
6use crate::execution_manager::packets_channel::unbounded::PacketsChannelReceiverUnbounded;
7use crate::execution_manager::thread_pool::ScopedThreadPool;
8use std::sync::{Arc, Barrier, BarrierWaitResult};
9
10pub trait AsyncExecutor: Sized + Send + Sync + 'static {
11 type InputPacket: PoolObjectTrait;
12 type OutputPacket: PacketTrait;
13 type GlobalParams: Send + Sync + 'static;
14 type InitData: Send + Sync + Clone + 'static;
15 const ALLOW_PARALLEL_ADDRESS_EXECUTION: bool;
16
17 fn new() -> Self;
18
19 fn executor_main<'a>(
20 &'a mut self,
21 global_params: &'a Self::GlobalParams,
22 receiver: ExecutorReceiver<Self>,
23 );
24}
25
26pub struct AddressConsumer<E: AsyncExecutor> {
27 pub(crate) init_data: Arc<E::InitData>,
28 pub(crate) packets_queue:
29 Arc<PoolObject<PacketsChannelReceiverBounded<Packet<E::InputPacket>>>>,
30}
31
32impl<E: AsyncExecutor> Clone for AddressConsumer<E> {
33 fn clone(&self) -> Self {
34 AddressConsumer {
35 init_data: self.init_data.clone(),
36 packets_queue: self.packets_queue.clone(),
37 }
38 }
39}
40
41pub struct AddressProducer<P: PoolObjectTrait> {
42 pub(crate) packets_queue: PacketsChannelSenderBounded<Packet<P>>,
43}
44
45impl<P: PoolObjectTrait> AddressProducer<P> {
46 pub fn send_packet(&self, packet: Packet<P>) {
47 self.packets_queue.send(packet);
48 }
49}
50
51pub struct ExecutorReceiver<E: AsyncExecutor> {
52 pub(crate) addresses_receiver: PacketsChannelReceiverUnbounded<AddressConsumer<E>>,
53 pub(crate) thread_pool: Option<Arc<ScopedThreadPool>>,
54 pub(crate) barrier: Arc<Barrier>,
55}
56
57impl<E: AsyncExecutor> ExecutorReceiver<E> {
58 pub fn obtain_address(&mut self) -> Result<ExecutorAddressOperations<E>, ()> {
59 let address_receiver = match self.addresses_receiver.recv() {
60 Some(value) => value,
61 None => return Err(()),
62 };
63
64 if E::ALLOW_PARALLEL_ADDRESS_EXECUTION && address_receiver.packets_queue.is_active() {
65 self.addresses_receiver
67 .make_sender()
68 .send(address_receiver.clone());
69 }
70
71 Ok(ExecutorAddressOperations {
72 address_data: address_receiver,
73 thread_pool: self.thread_pool.clone(),
74 })
75 }
76
77 pub fn wait_for_executors(&self) -> BarrierWaitResult {
78 self.barrier.wait()
79 }
80}
81
82pub struct ExecutorAddressOperations<E: AsyncExecutor> {
83 address_data: AddressConsumer<E>,
84 thread_pool: Option<Arc<ScopedThreadPool>>,
85}
86impl<E: AsyncExecutor> ExecutorAddressOperations<E> {
87 pub fn receive_packet(&self) -> Option<Packet<E::InputPacket>> {
88 self.address_data.packets_queue.recv()
89 }
90
91 pub fn get_init_data(&self) -> &E::InitData {
92 &self.address_data.init_data
93 }
94
95 pub fn spawn_executors<'a>(
96 &'a self,
97 count: usize,
98 executor: impl Fn(usize) + Send + Sync + 'a,
99 ) {
100 if count == 1 {
101 executor(0);
102 } else {
103 self.thread_pool
104 .as_ref()
105 .unwrap()
106 .run_scoped_optional(count, |i| executor(i));
107 }
108 }
109}