acto_rs/elem/wrap/
source_wrap.rs1use lossyq::spsc::{Sender};
2use super::super::super::{Task, Message, ChannelId, SenderName,
3 SenderChannelId, ReceiverChannelId, ChannelPosition
4};
5use super::super::counter::{OutputCounter};
6use super::super::source::{Source};
7
8pub struct SourceWrap<OutputValue: Send, OutputError: Send>
9{
10 name : String,
11 state : Box<Source<OutputValue=OutputValue, OutputError=OutputError>+Send>,
12 output_tx : Sender<Message<OutputValue, OutputError>>,
13}
14
15pub fn new<OutputValue: Send, OutputError: Send>(
16 name : String,
17 state : Box<Source<OutputValue=OutputValue, OutputError=OutputError>+Send>,
18 output_tx : Sender<Message<OutputValue, OutputError>>)
19 -> SourceWrap<OutputValue, OutputError>
20{
21 SourceWrap{ name: name, state: state, output_tx: output_tx }
22}
23
24impl<OutputValue: Send, OutputError: Send> OutputCounter
25 for SourceWrap<OutputValue, OutputError>
26{
27 fn get_tx_count(&self, ch_id: SenderChannelId) -> usize {
28 if ch_id.0 == 0 {
29 self.output_tx.seqno()
30 } else {
31 0
32 }
33 }
34}
35
36impl<OutputValue: Send, OutputError: Send> Task
37 for SourceWrap<OutputValue, OutputError>
38{
39 fn execute(&mut self, stop: &mut bool) {
40 self.state.process(&mut self.output_tx, stop);
41 }
42 fn name(&self) -> &String { &self.name }
43 fn input_count(&self) -> usize { 0 }
44 fn output_count(&self) -> usize { 1 }
45
46 fn input_id(&self, _ch_id: ReceiverChannelId) -> Option<(ChannelId, SenderName)> {
47 None
48 }
49
50 fn input_channel_pos(&self, _ch_id: ReceiverChannelId) -> ChannelPosition {
51 ChannelPosition( 0 )
52 }
53
54 fn output_channel_pos(&self, ch_id: SenderChannelId) -> ChannelPosition {
55 ChannelPosition( self.get_tx_count(ch_id) )
56 }
57}