pub mod error;
pub mod framed;
pub mod tls;
use error::Error;
use framed::BytesConnection;
use futures::prelude::*;
use libp2p_core::{
ConnectedPoint,
Transport,
multiaddr::Multiaddr,
transport::{map::{MapFuture, MapStream}, ListenerEvent, TransportError}
};
use rw_stream_sink::RwStreamSink;
use tokio_io::{AsyncRead, AsyncWrite};
#[derive(Debug, Clone)]
pub struct WsConfig<T> {
transport: framed::WsConfig<T>
}
impl<T> WsConfig<T> {
pub fn new(transport: T) -> Self {
framed::WsConfig::new(transport).into()
}
pub fn max_redirects(&self) -> u8 {
self.transport.max_redirects()
}
pub fn set_max_redirects(&mut self, max: u8) -> &mut Self {
self.transport.set_max_redirects(max);
self
}
pub fn max_data_size(&self) -> u64 {
self.transport.max_data_size()
}
pub fn set_max_data_size(&mut self, size: u64) -> &mut Self {
self.transport.set_max_data_size(size);
self
}
pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self {
self.transport.set_tls_config(c);
self
}
}
impl<T> From<framed::WsConfig<T>> for WsConfig<T> {
fn from(framed: framed::WsConfig<T>) -> Self {
WsConfig {
transport: framed
}
}
}
impl<T> Transport for WsConfig<T>
where
T: Transport + Send + Clone + 'static,
T::Error: Send + 'static,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::ListenerUpgrade: Send + 'static,
T::Output: AsyncRead + AsyncWrite + Send + 'static
{
type Output = RwStreamSink<BytesConnection<T::Output>>;
type Error = Error<T::Error>;
type Listener = MapStream<InnerStream<T::Output, T::Error>, WrapperFn<T::Output>>;
type ListenerUpgrade = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
type Dial = MapFuture<InnerFuture<T::Output, T::Error>, WrapperFn<T::Output>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.transport.map(wrap_connection as WrapperFn<T::Output>).listen_on(addr)
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.transport.map(wrap_connection as WrapperFn<T::Output>).dial(addr)
}
}
pub type InnerStream<T, E> =
Box<(dyn Stream<Error = Error<E>, Item = ListenerEvent<InnerFuture<T, E>>> + Send)>;
pub type InnerFuture<T, E> =
Box<(dyn Future<Item = BytesConnection<T>, Error = Error<E>> + Send)>;
pub type WrapperFn<T> =
fn(BytesConnection<T>, ConnectedPoint) -> RwStreamSink<BytesConnection<T>>;
fn wrap_connection<T>(c: BytesConnection<T>, _: ConnectedPoint) -> RwStreamSink<BytesConnection<T>>
where
T: AsyncRead + AsyncWrite
{
RwStreamSink::new(c)
}
#[cfg(test)]
mod tests {
use libp2p_tcp as tcp;
use tokio::runtime::current_thread::Runtime;
use futures::{Future, Stream};
use libp2p_core::{
Transport,
multiaddr::Protocol,
transport::ListenerEvent
};
use super::WsConfig;
#[test]
fn dialer_connects_to_listener_ipv4() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip4/127.0.0.1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
#[test]
fn dialer_connects_to_listener_ipv6() {
let ws_config = WsConfig::new(tcp::TcpConfig::new());
let mut listener = ws_config.clone()
.listen_on("/ip6/::1/tcp/0/ws".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
assert_eq!(Some(Protocol::Ws("/".into())), addr.iter().nth(2));
assert_ne!(Some(Protocol::Tcp(0)), addr.iter().nth(1));
let listener = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(e, _)| e)
.and_then(|(c, _)| c.unwrap().0);
let dialer = ws_config.clone().dial(addr.clone()).unwrap();
let future = listener
.select(dialer)
.map_err(|(e, _)| e)
.and_then(|(_, n)| n);
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
}
}