use crate::{Transport, transport::TransportError};
use bytes::{Bytes, IntoBuf};
use futures::{future::{self, FutureResult}, prelude::*, stream, sync::mpsc};
use multiaddr::{Protocol, Multiaddr};
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;
use std::{error, fmt, sync::Arc};
#[inline]
pub fn connector() -> (Dialer, Listener) {
let (tx, rx) = mpsc::unbounded();
(Dialer(tx), Listener(Arc::new(Mutex::new(rx))))
}
#[inline]
pub fn connector_custom_type<T>() -> (Dialer<T>, Listener<T>) {
let (tx, rx) = mpsc::unbounded();
(Dialer(tx), Listener(Arc::new(Mutex::new(rx))))
}
pub struct Dialer<T = Bytes>(mpsc::UnboundedSender<Chan<T>>);
impl<T> Clone for Dialer<T> {
fn clone(&self) -> Self {
Dialer(self.0.clone())
}
}
impl<T: IntoBuf + Send + 'static> Transport for Dialer<T> {
type Output = Channel<T>;
type Error = MemoryTransportError;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=MemoryTransportError> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, MemoryTransportError>;
type Dial = Box<Future<Item=Self::Output, Error=MemoryTransportError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
if !is_memory_addr(&addr) {
return Err(TransportError::MultiaddrNotSupported(addr))
}
let (a_tx, a_rx) = mpsc::unbounded();
let (b_tx, b_rx) = mpsc::unbounded();
let a = Chan { incoming: a_rx, outgoing: b_tx };
let b = Chan { incoming: b_rx, outgoing: a_tx };
let future = self.0.send(b)
.map(move |_| a.into())
.map_err(|_| MemoryTransportError::RemoteClosed);
Ok(Box::new(future))
}
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if server == observed {
Some(server.clone())
} else {
None
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum MemoryTransportError {
RemoteClosed,
}
impl fmt::Display for MemoryTransportError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
MemoryTransportError::RemoteClosed =>
write!(f, "The other side of the memory transport has been closed."),
}
}
}
impl error::Error for MemoryTransportError {}
pub struct Listener<T = Bytes>(Arc<Mutex<mpsc::UnboundedReceiver<Chan<T>>>>);
impl<T> Clone for Listener<T> {
fn clone(&self) -> Self {
Listener(self.0.clone())
}
}
impl<T: IntoBuf + Send + 'static> Transport for Listener<T> {
type Output = Channel<T>;
type Error = MemoryTransportError;
type Listener = Box<Stream<Item=(Self::ListenerUpgrade, Multiaddr), Error=MemoryTransportError> + Send>;
type ListenerUpgrade = FutureResult<Self::Output, MemoryTransportError>;
type Dial = Box<Future<Item=Self::Output, Error=MemoryTransportError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
if !is_memory_addr(&addr) {
return Err(TransportError::MultiaddrNotSupported(addr));
}
let addr2 = addr.clone();
let receiver = self.0.clone();
let stream = stream::poll_fn(move || receiver.lock().poll())
.map(move |channel| {
(future::ok(channel.into()), addr.clone())
})
.map_err(|()| unreachable!());
Ok((Box::new(stream), addr2))
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
#[inline]
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if server == observed {
Some(server.clone())
} else {
None
}
}
}
fn is_memory_addr(a: &Multiaddr) -> bool {
let mut iter = a.iter();
if iter.next() != Some(Protocol::Memory) {
return false;
}
if iter.next().is_some() {
return false;
}
true
}
pub type Channel<T> = RwStreamSink<Chan<T>>;
pub struct Chan<T = Bytes> {
incoming: mpsc::UnboundedReceiver<T>,
outgoing: mpsc::UnboundedSender<T>,
}
impl<T> Stream for Chan<T> {
type Item = T;
type Error = MemoryTransportError;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.incoming.poll().map_err(|()| MemoryTransportError::RemoteClosed)
}
}
impl<T> Sink for Chan<T> {
type SinkItem = T;
type SinkError = MemoryTransportError;
#[inline]
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.outgoing.start_send(item).map_err(|_| MemoryTransportError::RemoteClosed)
}
#[inline]
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.outgoing.poll_complete().map_err(|_| MemoryTransportError::RemoteClosed)
}
#[inline]
fn close(&mut self) -> Poll<(), Self::SinkError> {
self.outgoing.close().map_err(|_| MemoryTransportError::RemoteClosed)
}
}
impl<T: IntoBuf> Into<RwStreamSink<Chan<T>>> for Chan<T> {
#[inline]
fn into(self) -> RwStreamSink<Chan<T>> {
RwStreamSink::new(self)
}
}