use crate::protocol::{RemoteInfo, IdentifyProtocolConfig};
use futures::{future, prelude::*, stream, AndThen, MapErr};
use susyp2p_core::{
Multiaddr, PeerId, PublicKey, muxing, Transport,
transport::{TransportError, ListenerEvent, upgrade::TransportUpgradeError},
upgrade::{self, OutboundUpgradeApply, UpgradeError}
};
use std::io::Error as IoError;
use std::mem;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct IdentifyTransport<TTrans> {
transport: TTrans,
}
impl<TTrans> IdentifyTransport<TTrans> {
#[inline]
pub fn new(transport: TTrans) -> Self {
IdentifyTransport {
transport,
}
}
}
impl<TTrans, TMuxer> Transport for IdentifyTransport<TTrans>
where
TTrans: Transport<Output = TMuxer>,
TTrans::Error: 'static,
TMuxer: muxing::StreamMuxer + Send + Sync + 'static, TMuxer::Substream: Send + Sync + 'static, {
type Output = (PeerId, TMuxer);
type Error = TransportUpgradeError<TTrans::Error, IoError>; type Listener = stream::Empty<ListenerEvent<Self::ListenerUpgrade>, Self::Error>;
type ListenerUpgrade = future::Empty<Self::Output, Self::Error>;
type Dial = AndThen<
MapErr<TTrans::Dial, fn(TTrans::Error) -> Self::Error>,
MapErr<IdRetriever<TMuxer>, fn(UpgradeError<IoError>) -> Self::Error>,
fn(TMuxer) -> MapErr<IdRetriever<TMuxer>, fn(UpgradeError<IoError>) -> Self::Error>
>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dial = self.transport.dial(addr)
.map_err(|err| err.map(TransportUpgradeError::Transport))?;
Ok(dial.map_err::<fn(_) -> _, _>(TransportUpgradeError::Transport).and_then(|muxer| {
IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(TransportUpgradeError::Upgrade)
}))
}
}
pub struct IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
state: IdRetrieverState<TMuxer>
}
enum IdRetrieverState<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
OpeningSubstream(Arc<TMuxer>, muxing::OutboundSubstreamRefWrapFuture<Arc<TMuxer>>, IdentifyProtocolConfig),
NegotiatingIdentify(Arc<TMuxer>, OutboundUpgradeApply<muxing::SubstreamRef<Arc<TMuxer>>, IdentifyProtocolConfig>),
Finishing(Arc<TMuxer>, PublicKey),
Poisoned,
}
impl<TMuxer> IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
fn new(muxer: TMuxer, config: IdentifyProtocolConfig) -> Self {
let muxer = Arc::new(muxer);
let opening = muxing::outbound_from_ref_and_wrap(muxer.clone());
IdRetriever {
state: IdRetrieverState::OpeningSubstream(muxer, opening, config)
}
}
}
impl<TMuxer> Future for IdRetriever<TMuxer>
where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
type Item = (PeerId, TMuxer);
type Error = UpgradeError<IoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, IdRetrieverState::Poisoned) {
IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => {
match opening.poll() {
Ok(Async::Ready(substream)) => {
let upgrade = upgrade::apply_outbound(substream, config);
self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade)
},
Ok(Async::NotReady) => {
self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config);
return Ok(Async::NotReady);
},
Err(err) => return Err(UpgradeError::Apply(err))
}
},
IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => {
match nego.poll() {
Ok(Async::Ready(RemoteInfo { info, .. })) => {
self.state = IdRetrieverState::Finishing(muxer, info.public_key);
},
Ok(Async::NotReady) => {
self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego);
return Ok(Async::NotReady);
},
Err(err) => return Err(err),
}
},
IdRetrieverState::Finishing(muxer, public_key) => {
let unwrapped = Arc::try_unwrap(muxer).unwrap_or_else(|_| {
panic!("We clone the Arc only to put it into substreams. Once in the \
Finishing state, no substream or upgrade exists anymore. \
Therefore, there exists only one instance of the Arc. QED")
});
return Ok(Async::Ready((public_key.into(), unwrapped)));
},
IdRetrieverState::Poisoned => {
panic!("Future state panicked inside poll() or is finished")
},
}
}
}
}