parallel_processor/execution_manager/
executor.rs

1use crate::execution_manager::objects_pool::{PoolObject, PoolObjectTrait};
2use crate::execution_manager::packet::{Packet, PacketTrait};
3use crate::execution_manager::packets_channel::bounded::{
4    PacketsChannelReceiverBounded, PacketsChannelSenderBounded,
5};
6use crate::execution_manager::packets_channel::unbounded::PacketsChannelReceiverUnbounded;
7use crate::execution_manager::thread_pool::ScopedThreadPool;
8use std::sync::{Arc, Barrier, BarrierWaitResult};
9
10pub trait AsyncExecutor: Sized + Send + Sync + 'static {
11    type InputPacket: PoolObjectTrait;
12    type OutputPacket: PacketTrait;
13    type GlobalParams: Send + Sync + 'static;
14    type InitData: Send + Sync + Clone + 'static;
15    const ALLOW_PARALLEL_ADDRESS_EXECUTION: bool;
16
17    fn new() -> Self;
18
19    fn executor_main<'a>(
20        &'a mut self,
21        global_params: &'a Self::GlobalParams,
22        receiver: ExecutorReceiver<Self>,
23    );
24}
25
26pub struct AddressConsumer<E: AsyncExecutor> {
27    pub(crate) init_data: Arc<E::InitData>,
28    pub(crate) packets_queue:
29        Arc<PoolObject<PacketsChannelReceiverBounded<Packet<E::InputPacket>>>>,
30}
31
32impl<E: AsyncExecutor> Clone for AddressConsumer<E> {
33    fn clone(&self) -> Self {
34        AddressConsumer {
35            init_data: self.init_data.clone(),
36            packets_queue: self.packets_queue.clone(),
37        }
38    }
39}
40
41pub struct AddressProducer<P: PoolObjectTrait> {
42    pub(crate) packets_queue: PacketsChannelSenderBounded<Packet<P>>,
43}
44
45impl<P: PoolObjectTrait> AddressProducer<P> {
46    pub fn send_packet(&self, packet: Packet<P>) {
47        self.packets_queue.send(packet);
48    }
49}
50
51pub struct ExecutorReceiver<E: AsyncExecutor> {
52    pub(crate) addresses_receiver: PacketsChannelReceiverUnbounded<AddressConsumer<E>>,
53    pub(crate) thread_pool: Option<Arc<ScopedThreadPool>>,
54    pub(crate) barrier: Arc<Barrier>,
55}
56
57impl<E: AsyncExecutor> ExecutorReceiver<E> {
58    pub fn obtain_address(&mut self) -> Result<ExecutorAddressOperations<E>, ()> {
59        let address_receiver = match self.addresses_receiver.recv() {
60            Some(value) => value,
61            None => return Err(()),
62        };
63
64        if E::ALLOW_PARALLEL_ADDRESS_EXECUTION && address_receiver.packets_queue.is_active() {
65            // Reinsert the current address if it is allowed to be run on multiple threads
66            self.addresses_receiver
67                .make_sender()
68                .send(address_receiver.clone());
69        }
70
71        Ok(ExecutorAddressOperations {
72            address_data: address_receiver,
73            thread_pool: self.thread_pool.clone(),
74        })
75    }
76
77    pub fn wait_for_executors(&self) -> BarrierWaitResult {
78        self.barrier.wait()
79    }
80}
81
82pub struct ExecutorAddressOperations<E: AsyncExecutor> {
83    address_data: AddressConsumer<E>,
84    thread_pool: Option<Arc<ScopedThreadPool>>,
85}
86impl<E: AsyncExecutor> ExecutorAddressOperations<E> {
87    pub fn receive_packet(&self) -> Option<Packet<E::InputPacket>> {
88        self.address_data.packets_queue.recv()
89    }
90
91    pub fn get_init_data(&self) -> &E::InitData {
92        &self.address_data.init_data
93    }
94
95    pub fn spawn_executors<'a>(
96        &'a self,
97        count: usize,
98        executor: impl Fn(usize) + Send + Sync + 'a,
99    ) {
100        if count == 1 {
101            executor(0);
102        } else {
103            self.thread_pool
104                .as_ref()
105                .unwrap()
106                .run_scoped_optional(count, |i| executor(i));
107        }
108    }
109}