use crate::server::upgrade::r#async::{IntoWs, Upgrade};
use crate::server::InvalidConnection;
use crate::server::{NoTlsAcceptor, OptionalTlsAcceptor, WsServer};
use bytes::BytesMut;
use futures;
use futures::{Future, Stream};
use std;
use std::io;
use std::net::SocketAddr;
use std::net::ToSocketAddrs;
pub use tokio_reactor::Handle;
use tokio_tcp::{TcpListener, TcpStream};
#[cfg(any(feature = "async-ssl"))]
use native_tls::TlsAcceptor;
#[cfg(any(feature = "async-ssl"))]
use tokio_tls::{TlsAcceptor as TlsAcceptorExt, TlsStream};
pub type Server<S> = WsServer<S, TcpListener>;
pub type Incoming<S> =
Box<dyn Stream<Item = (Upgrade<S>, SocketAddr), Error = InvalidConnection<S, BytesMut>> + Send>;
impl<S> WsServer<S, TcpListener>
where
S: OptionalTlsAcceptor,
{
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.listener.local_addr()
}
}
impl WsServer<NoTlsAcceptor, TcpListener> {
pub fn bind<A: ToSocketAddrs>(addr: A, handle: &Handle) -> io::Result<Self> {
let tcp = ::std::net::TcpListener::bind(addr)?;
Ok(Server {
listener: TcpListener::from_std(tcp, handle)?,
ssl_acceptor: NoTlsAcceptor,
})
}
pub fn incoming(self) -> Incoming<TcpStream> {
let future = self
.listener
.incoming()
.and_then(|s| s.peer_addr().map(|a| (s, a)))
.map_err(|e| InvalidConnection {
stream: None,
parsed: None,
buffer: None,
error: e.into(),
})
.and_then(|(stream, a)| {
let handshake = stream
.into_ws()
.map_err(|(stream, req, buf, err)| InvalidConnection {
stream: Some(stream),
parsed: req,
buffer: Some(buf),
error: err,
})
.map(move |u| (u, a));
futures::future::ok(handshake)
})
.buffer_unordered(std::usize::MAX);
Box::new(future)
}
}
#[cfg(any(feature = "async-ssl"))]
impl WsServer<TlsAcceptor, TcpListener> {
pub fn bind_secure<A: ToSocketAddrs>(
addr: A,
acceptor: TlsAcceptor,
handle: &Handle,
) -> io::Result<Self> {
let tcp = ::std::net::TcpListener::bind(addr)?;
Ok(Server {
listener: TcpListener::from_std(tcp, handle)?,
ssl_acceptor: acceptor,
})
}
pub fn incoming(self) -> Incoming<TlsStream<TcpStream>> {
let acceptor = TlsAcceptorExt::from(self.ssl_acceptor);
let future = self
.listener
.incoming()
.and_then(|s| s.peer_addr().map(|a| (s, a)))
.map_err(|e| InvalidConnection {
stream: None,
parsed: None,
buffer: None,
error: e.into(),
})
.and_then(move |(stream, a)| {
let handshake = acceptor
.accept(stream)
.map_err(|e| {
InvalidConnection {
stream: None,
parsed: None,
buffer: None,
error: io::Error::new(io::ErrorKind::Other, e).into(),
}
})
.and_then(move |stream| {
stream
.into_ws()
.map_err(|(stream, req, buf, err)| InvalidConnection {
stream: Some(stream),
parsed: req,
buffer: Some(buf),
error: err,
})
.map(move |u| (u, a))
});
futures::future::ok(handshake)
})
.buffer_unordered(std::usize::MAX);
Box::new(future)
}
}