use ControlMessage;
use bip_peer::PeerInfo;
use bip_peer::messages::ExtendedMessage;
use bip_peer::messages::builders::ExtendedMessageBuilder;
use error::UberError;
use futures::Async;
use futures::Poll;
use futures::Stream;
use futures::task;
use futures::task::Task;
use std::collections::{HashMap, VecDeque};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum IExtendedMessage {
Control(ControlMessage),
RecievedExtendedMessage(PeerInfo, ExtendedMessage),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OExtendedMessage {
SendExtendedMessage(PeerInfo, ExtendedMessage),
}
pub trait ExtendedListener {
fn extend(&self, _info: &PeerInfo, _builder: ExtendedMessageBuilder) -> ExtendedMessageBuilder {
_builder
}
fn on_update(&mut self, _info: &PeerInfo, _extended: &ExtendedPeerInfo) {}
}
pub struct ExtendedPeerInfo {
ours: Option<ExtendedMessage>,
theirs: Option<ExtendedMessage>,
}
impl ExtendedPeerInfo {
pub fn new(ours: Option<ExtendedMessage>, theirs: Option<ExtendedMessage>) -> ExtendedPeerInfo {
ExtendedPeerInfo {
ours: ours,
theirs: theirs,
}
}
pub fn update_ours(&mut self, message: ExtendedMessage) {
self.ours = Some(message);
}
pub fn update_theirs(&mut self, message: ExtendedMessage) {
self.theirs = Some(message);
}
pub fn our_message(&self) -> Option<&ExtendedMessage> {
self.ours.as_ref()
}
pub fn their_message(&self) -> Option<&ExtendedMessage> {
self.theirs.as_ref()
}
}
pub struct ExtendedModule {
builder: ExtendedMessageBuilder,
peers: HashMap<PeerInfo, ExtendedPeerInfo>,
out_queue: VecDeque<OExtendedMessage>,
opt_task: Option<Task>,
}
impl ExtendedModule {
pub fn new(builder: ExtendedMessageBuilder) -> ExtendedModule {
ExtendedModule {
builder: builder,
peers: HashMap::new(),
out_queue: VecDeque::new(),
opt_task: None,
}
}
pub fn process_message<D>(&mut self, message: IExtendedMessage, d_modules: &mut [Box<D>])
where
D: ExtendedListener + ?Sized,
{
match message {
IExtendedMessage::Control(ControlMessage::PeerConnected(info)) => {
let mut builder = self.builder.clone();
for d_module in d_modules.iter() {
let temp_builder = builder;
builder = d_module.extend(&info, temp_builder);
}
let ext_message = builder.build();
let ext_peer_info = ExtendedPeerInfo::new(Some(ext_message.clone()), None);
for d_module in d_modules {
d_module.on_update(&info, &ext_peer_info);
}
self.peers.insert(info, ext_peer_info);
self.out_queue
.push_back(OExtendedMessage::SendExtendedMessage(info, ext_message));
},
IExtendedMessage::Control(ControlMessage::PeerDisconnected(info)) => {
self.peers.remove(&info);
},
IExtendedMessage::RecievedExtendedMessage(info, ext_message) => {
let ext_peer_info = self.peers.get_mut(&info).unwrap();
ext_peer_info.update_theirs(ext_message);
for d_module in d_modules {
d_module.on_update(&info, &ext_peer_info);
}
},
_ => {
()
},
}
self.check_stream_unblock();
}
fn check_stream_unblock(&mut self) {
if !self.out_queue.is_empty() {
if let Some(task) = self.opt_task.take() {
task.notify();
}
}
}
}
impl Stream for ExtendedModule {
type Item = OExtendedMessage;
type Error = UberError;
fn poll(&mut self) -> Poll<Option<OExtendedMessage>, UberError> {
let opt_message = self.out_queue.pop_front();
if let Some(message) = opt_message {
Ok(Async::Ready(Some(message)))
} else {
self.opt_task = Some(task::current());
Ok(Async::NotReady)
}
}
}