parallel_processor/execution_manager/
units_io.rs1use crate::execution_manager::execution_context::ExecutionContext;
2use crate::execution_manager::executor::AsyncExecutor;
3use crate::execution_manager::packet::Packet;
4use std::sync::Arc;
5
6pub enum ExecutorInputAddressMode {
7 Single,
8 Multiple,
9}
10
11pub struct ExecutorInput<T, I: Iterator<Item = T>> {
12 iterator: I,
13 addr_mode: ExecutorInputAddressMode,
14}
15
16impl<T, I: Iterator<Item = T>> ExecutorInput<T, I> {
17 pub fn from_iter(iterator: I, addr_mode: ExecutorInputAddressMode) -> Self {
18 Self {
19 iterator,
20 addr_mode,
21 }
22 }
23}
24
25impl<T: Send + Sync + 'static, I: Iterator<Item = T>> ExecutorInput<T, I> {
26 pub fn set_output_executor<E: AsyncExecutor<InputPacket = T>>(
27 &mut self,
28 context: &Arc<ExecutionContext>,
29 init_data: E::InitData,
30 priority: usize,
31 ) {
32 let mut address = E::generate_new_address(init_data.clone());
33 let mut addresses = vec![];
34
35 let mut data = vec![];
36
37 for value in &mut self.iterator {
38 data.push((address.clone(), Packet::new_simple(value).upcast()));
39 if let ExecutorInputAddressMode::Multiple = &self.addr_mode {
40 addresses.push(address.clone());
41 address = E::generate_new_address(init_data.clone());
42 }
43 }
44
45 if let ExecutorInputAddressMode::Single = &self.addr_mode {
46 addresses.push(address);
47 }
48
49 context.register_executors_batch(addresses, priority);
50
51 for (addr, packet) in data {
52 context.add_input_packet(addr, packet);
53 }
54 }
55}