parallel_processor/execution_manager/
units_io.rs

1use crate::execution_manager::execution_context::ExecutionContext;
2use crate::execution_manager::executor::AsyncExecutor;
3use crate::execution_manager::packet::Packet;
4use std::sync::Arc;
5
6pub enum ExecutorInputAddressMode {
7    Single,
8    Multiple,
9}
10
11pub struct ExecutorInput<T, I: Iterator<Item = T>> {
12    iterator: I,
13    addr_mode: ExecutorInputAddressMode,
14}
15
16impl<T, I: Iterator<Item = T>> ExecutorInput<T, I> {
17    pub fn from_iter(iterator: I, addr_mode: ExecutorInputAddressMode) -> Self {
18        Self {
19            iterator,
20            addr_mode,
21        }
22    }
23}
24
25impl<T: Send + Sync + 'static, I: Iterator<Item = T>> ExecutorInput<T, I> {
26    pub fn set_output_executor<E: AsyncExecutor<InputPacket = T>>(
27        &mut self,
28        context: &Arc<ExecutionContext>,
29        init_data: E::InitData,
30        priority: usize,
31    ) {
32        let mut address = E::generate_new_address(init_data.clone());
33        let mut addresses = vec![];
34
35        let mut data = vec![];
36
37        for value in &mut self.iterator {
38            data.push((address.clone(), Packet::new_simple(value).upcast()));
39            if let ExecutorInputAddressMode::Multiple = &self.addr_mode {
40                addresses.push(address.clone());
41                address = E::generate_new_address(init_data.clone());
42            }
43        }
44
45        if let ExecutorInputAddressMode::Single = &self.addr_mode {
46            addresses.push(address);
47        }
48
49        context.register_executors_batch(addresses, priority);
50
51        for (addr, packet) in data {
52            context.add_input_packet(addr, packet);
53        }
54    }
55}