1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
use lossyq::spsc::{Sender, channel}; use super::super::{Message, ChannelWrapper, SenderName, SenderChannelId, ReceiverChannelId, ReceiverName }; use super::wrap::scatter_wrap; pub trait Scatter { type InputValue : Send; type InputError : Send; type OutputValue : Send; type OutputError : Send; fn process( &mut self, input: &mut ChannelWrapper<Self::InputValue, Self::InputError>, output: &mut Vec<Sender<Message<Self::OutputValue, Self::OutputError>>>, stop: &mut bool); } pub fn new<InputValue: Send, InputError: Send, OutputValue: Send, OutputError: Send>( name : &str, output_q_size : usize, scatter : Box<Scatter<InputValue=InputValue, InputError=InputError, OutputValue=OutputValue, OutputError=OutputError>+Send>, n_channels : usize) -> (Box<scatter_wrap::ScatterWrap<InputValue, InputError, OutputValue, OutputError>>, Vec<Box<ChannelWrapper<OutputValue, OutputError>>>) { let mut tx_vec = Vec::with_capacity(n_channels); let mut rx_vec = Vec::with_capacity(n_channels); let name = String::from(name); for i in 0..n_channels { let (output_tx, output_rx) = channel(output_q_size); tx_vec.push(output_tx); rx_vec.push( Box::new( ChannelWrapper::SenderNotConnected( SenderChannelId(i), output_rx, SenderName(name.clone()) ) ) ); } ( Box::new( scatter_wrap::new( name.clone(), scatter, ChannelWrapper::ReceiverNotConnected( ReceiverChannelId(0), ReceiverName (name.clone()) ), tx_vec, ) ), rx_vec ) }