1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use ControlMessage;
use bip_peer::PeerInfo;
use bip_peer::messages::ExtendedMessage;
use bip_peer::messages::builders::ExtendedMessageBuilder;
use error::UberError;
use futures::Async;
use futures::Poll;
use futures::Stream;
use futures::task;
use futures::task::Task;
use std::collections::{HashMap, VecDeque};

/// Enumeration of extended messages that can be sent to the extended module.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum IExtendedMessage {
    Control(ControlMessage),
    RecievedExtendedMessage(PeerInfo, ExtendedMessage),
}

/// Enumeration of extended messages that can be received from the extended module.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OExtendedMessage {
    SendExtendedMessage(PeerInfo, ExtendedMessage),
}

/// Trait for a module to take part in constructing the extended message for a peer.
pub trait ExtendedListener {
    /// Extend the given extended message builder for the given peer.
    fn extend(&self, _info: &PeerInfo, _builder: ExtendedMessageBuilder) -> ExtendedMessageBuilder {
        _builder
    }

    /// One or both sides of a peer connection had their extended information updated.
    ///
    /// This can be called multiple times for any given peer as extension information updates.
    fn on_update(&mut self, _info: &PeerInfo, _extended: &ExtendedPeerInfo) {}
}

/// Container for both the local and remote `ExtendedMessage`.
pub struct ExtendedPeerInfo {
    ours: Option<ExtendedMessage>,
    theirs: Option<ExtendedMessage>,
}

impl ExtendedPeerInfo {
    pub fn new(ours: Option<ExtendedMessage>, theirs: Option<ExtendedMessage>) -> ExtendedPeerInfo {
        ExtendedPeerInfo {
            ours: ours,
            theirs: theirs,
        }
    }

    pub fn update_ours(&mut self, message: ExtendedMessage) {
        self.ours = Some(message);
    }

    pub fn update_theirs(&mut self, message: ExtendedMessage) {
        self.theirs = Some(message);
    }

    pub fn our_message(&self) -> Option<&ExtendedMessage> {
        self.ours.as_ref()
    }

    pub fn their_message(&self) -> Option<&ExtendedMessage> {
        self.theirs.as_ref()
    }
}

//------------------------------------------------------------------------------//

pub struct ExtendedModule {
    builder: ExtendedMessageBuilder,
    peers: HashMap<PeerInfo, ExtendedPeerInfo>,
    out_queue: VecDeque<OExtendedMessage>,
    opt_task: Option<Task>,
}

impl ExtendedModule {
    pub fn new(builder: ExtendedMessageBuilder) -> ExtendedModule {
        ExtendedModule {
            builder: builder,
            peers: HashMap::new(),
            out_queue: VecDeque::new(),
            opt_task: None,
        }
    }

    pub fn process_message<D>(&mut self, message: IExtendedMessage, d_modules: &mut [Box<D>])
    where
        D: ExtendedListener + ?Sized,
    {
        match message {
            IExtendedMessage::Control(ControlMessage::PeerConnected(info)) => {
                let mut builder = self.builder.clone();

                for d_module in d_modules.iter() {
                    let temp_builder = builder;
                    builder = d_module.extend(&info, temp_builder);
                }

                let ext_message = builder.build();
                let ext_peer_info = ExtendedPeerInfo::new(Some(ext_message.clone()), None);

                for d_module in d_modules {
                    d_module.on_update(&info, &ext_peer_info);
                }

                self.peers.insert(info, ext_peer_info);
                self.out_queue
                    .push_back(OExtendedMessage::SendExtendedMessage(info, ext_message));
            },
            IExtendedMessage::Control(ControlMessage::PeerDisconnected(info)) => {
                self.peers.remove(&info);
            },
            IExtendedMessage::RecievedExtendedMessage(info, ext_message) => {
                let ext_peer_info = self.peers.get_mut(&info).unwrap();
                ext_peer_info.update_theirs(ext_message);

                for d_module in d_modules {
                    d_module.on_update(&info, &ext_peer_info);
                }
            },
            _ => {
                ()
            },
        }

        self.check_stream_unblock();
    }

    fn check_stream_unblock(&mut self) {
        if !self.out_queue.is_empty() {
            if let Some(task) = self.opt_task.take() {
                task.notify();
            }
        }
    }
}

impl Stream for ExtendedModule {
    type Item = OExtendedMessage;
    type Error = UberError;

    fn poll(&mut self) -> Poll<Option<OExtendedMessage>, UberError> {
        let opt_message = self.out_queue.pop_front();

        if let Some(message) = opt_message {
            Ok(Async::Ready(Some(message)))
        } else {
            self.opt_task = Some(task::current());

            Ok(Async::NotReady)
        }
    }
}