use ControlMessage;
use bip_peer::messages::builders::ExtendedMessageBuilder;
use discovery::IDiscoveryMessage;
use discovery::ODiscoveryMessage;
use discovery::error::DiscoveryError;
use error::UberError;
use extended::ExtendedListener;
use extended::ExtendedModule;
use extended::IExtendedMessage;
use extended::OExtendedMessage;
use futures::{Async, AsyncSink};
use futures::Poll;
use futures::Sink;
use futures::StartSend;
use futures::Stream;
trait DiscoveryTrait
: ExtendedListener + Sink<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError> + Stream<Item = ODiscoveryMessage, Error = DiscoveryError>
{
}
impl<T> DiscoveryTrait for T
where
T: ExtendedListener + Sink<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError> + Stream<Item = ODiscoveryMessage, Error = DiscoveryError>,
{
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum IUberMessage {
Control(ControlMessage),
Extended(IExtendedMessage),
Discovery(IDiscoveryMessage),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OUberMessage {
Extended(OExtendedMessage),
Discovery(ODiscoveryMessage),
}
pub struct UberModuleBuilder {
discovery: Vec<Box<DiscoveryTrait<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError, Item = ODiscoveryMessage, Error = DiscoveryError>>>,
ext_builder: Option<ExtendedMessageBuilder>,
}
impl UberModuleBuilder {
pub fn new() -> UberModuleBuilder {
UberModuleBuilder {
discovery: Vec::new(),
ext_builder: None,
}
}
pub fn with_extended_builder(mut self, builder: Option<ExtendedMessageBuilder>) -> UberModuleBuilder {
self.ext_builder = builder;
self
}
pub fn with_discovery_module<T>(mut self, module: T) -> UberModuleBuilder
where
T: ExtendedListener
+ Sink<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError>
+ Stream<Item = ODiscoveryMessage, Error = DiscoveryError>
+ 'static,
{
self.discovery.push(Box::new(module)
as Box<
DiscoveryTrait<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError, Item = ODiscoveryMessage, Error = DiscoveryError>,
>);
self
}
pub fn build(self) -> UberModule {
UberModule::from_builder(self)
}
}
trait IsReady {
fn is_ready(&self) -> bool;
}
impl<T> IsReady for AsyncSink<T> {
fn is_ready(&self) -> bool {
self.is_ready()
}
}
impl<T> IsReady for Async<T> {
fn is_ready(&self) -> bool {
self.is_ready()
}
}
pub struct UberModule {
discovery: Vec<Box<DiscoveryTrait<SinkItem = IDiscoveryMessage, SinkError = DiscoveryError, Item = ODiscoveryMessage, Error = DiscoveryError>>>,
extended: Option<ExtendedModule>,
last_sink_state: Option<ModuleState>,
last_stream_state: Option<ModuleState>,
}
#[derive(Debug, Copy, Clone)]
enum ModuleState {
Extended,
Discovery(usize),
}
impl UberModule {
pub fn from_builder(builder: UberModuleBuilder) -> UberModule {
UberModule {
discovery: builder.discovery,
extended: builder
.ext_builder
.map(|builder| ExtendedModule::new(builder)),
last_sink_state: None,
last_stream_state: None,
}
}
fn next_state(&self, state: Option<ModuleState>) -> Option<ModuleState> {
match state {
None => {
if self.extended.is_some() {
Some(ModuleState::Extended)
} else if !self.discovery.is_empty() {
Some(ModuleState::Discovery(0))
} else {
None
}
},
Some(ModuleState::Extended) => {
if !self.discovery.is_empty() {
Some(ModuleState::Discovery(0))
} else {
None
}
},
Some(ModuleState::Discovery(index)) => {
if index + 1 < self.discovery.len() {
Some(ModuleState::Discovery(index + 1))
} else {
None
}
},
}
}
fn loop_states<G, A, L, R, E>(&mut self, is_sink: bool, init: Result<R, E>, get_next_state: G, assign_state: A, logic: L) -> Result<R, E>
where
G: Fn(&UberModule) -> Option<ModuleState>,
A: Fn(&mut UberModule, Option<ModuleState>),
L: Fn(&mut UberModule, ModuleState) -> Result<R, E>,
R: IsReady,
{
let is_stream = !is_sink;
let mut result = init;
let mut opt_next_state = get_next_state(self);
let mut should_continue = result
.as_ref()
.map(|async| (is_sink && async.is_ready()) || (is_stream && !async.is_ready()))
.unwrap_or(false);
while should_continue && opt_next_state.is_some() {
let next_state = opt_next_state.unwrap();
result = logic(self, next_state);
should_continue = result
.as_ref()
.map(|async| (is_sink && async.is_ready()) || (is_stream && !async.is_ready()))
.unwrap_or(false);
if should_continue {
assign_state(self, opt_next_state);
}
opt_next_state = get_next_state(self);
}
if opt_next_state.is_none() && should_continue {
assign_state(self, None);
}
result
}
fn start_sink_state(&mut self, message: &IUberMessage) -> StartSend<(), UberError> {
self.loop_states(
true,
Ok(AsyncSink::Ready),
|uber| uber.next_state(uber.last_sink_state),
|uber, state| {
uber.last_sink_state = state;
},
|uber, state| match (state, message) {
(ModuleState::Discovery(index), &IUberMessage::Control(ref control)) => {
uber.discovery[index]
.start_send(IDiscoveryMessage::Control(control.clone()))
.map(|async| async.map(|_| ()))
.map_err(|err| err.into())
},
(ModuleState::Discovery(index), &IUberMessage::Discovery(ref discovery)) => {
uber.discovery[index]
.start_send(discovery.clone())
.map(|async| async.map(|_| ()))
.map_err(|err| err.into())
},
(ModuleState::Extended, &IUberMessage::Control(ref control)) => {
let d_modules = &mut uber.discovery[..];
uber.extended
.as_mut()
.map(|ext_module| {
ext_module.process_message(IExtendedMessage::Control(control.clone()), d_modules);
Ok(AsyncSink::Ready)
})
.unwrap_or(Ok(AsyncSink::Ready))
},
(ModuleState::Extended, &IUberMessage::Extended(ref extended)) => {
let d_modules = &mut uber.discovery[..];
uber.extended
.as_mut()
.map(|ext_module| {
ext_module.process_message(extended.clone(), d_modules);
Ok(AsyncSink::Ready)
})
.unwrap_or(Ok(AsyncSink::Ready))
},
_ => {
Ok(AsyncSink::Ready)
},
},
)
}
fn poll_sink_state(&mut self) -> Poll<(), UberError> {
self.loop_states(
true,
Ok(Async::Ready(())),
|uber| uber.next_state(uber.last_sink_state),
|uber, state| {
uber.last_sink_state = state;
},
|uber, state| match state {
ModuleState::Discovery(index) => {
uber.discovery[index]
.poll_complete()
.map_err(|err| err.into())
},
ModuleState::Extended => {
Ok(Async::Ready(()))
},
},
)
}
fn poll_stream_state(&mut self) -> Poll<Option<OUberMessage>, UberError> {
self.loop_states(
false,
Ok(Async::NotReady),
|uber| uber.next_state(uber.last_stream_state),
|uber, state| {
uber.last_stream_state = state;
},
|uber, state| match state {
ModuleState::Extended => {
uber.extended
.as_mut()
.map(|ext_module| {
ext_module
.poll()
.map(|async_opt_message| {
async_opt_message.map(|opt_message| opt_message.map(|message| OUberMessage::Extended(message)))
})
.map_err(|err| err.into())
})
.unwrap_or(Ok(Async::Ready(None)))
},
ModuleState::Discovery(index) => {
uber.discovery[index]
.poll()
.map(|async_opt_message| {
async_opt_message.map(|opt_message| opt_message.map(|message| OUberMessage::Discovery(message)))
})
.map_err(|err| err.into())
},
},
)
}
}
impl Sink for UberModule {
type SinkItem = IUberMessage;
type SinkError = UberError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.start_sink_state(&item)
.map(|async| async.map(|_| item))
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.poll_sink_state()
}
}
impl Stream for UberModule {
type Item = OUberMessage;
type Error = UberError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let result = self.poll_stream_state();
result
}
}