websocket/server/
async.rs1use crate::server::upgrade::r#async::{IntoWs, Upgrade};
3use crate::server::InvalidConnection;
4use crate::server::{NoTlsAcceptor, OptionalTlsAcceptor, WsServer};
5use bytes::BytesMut;
6use futures;
7use futures::{Future, Stream};
8use std;
9use std::io;
10use std::net::SocketAddr;
11use std::net::ToSocketAddrs;
12pub use tokio_reactor::Handle;
13use tokio_tcp::{TcpListener, TcpStream};
14
15#[cfg(any(feature = "async-ssl"))]
16use native_tls::TlsAcceptor;
17#[cfg(any(feature = "async-ssl"))]
18use tokio_tls::{TlsAcceptor as TlsAcceptorExt, TlsStream};
19
20pub type Server<S> = WsServer<S, TcpListener>;
23
24pub type Incoming<S> =
30 Box<dyn Stream<Item = (Upgrade<S>, SocketAddr), Error = InvalidConnection<S, BytesMut>> + Send>;
31
32impl<S> WsServer<S, TcpListener>
33where
34 S: OptionalTlsAcceptor,
35{
36 pub fn local_addr(&self) -> io::Result<SocketAddr> {
38 self.listener.local_addr()
39 }
40}
41
42impl WsServer<NoTlsAcceptor, TcpListener> {
44 pub fn bind<A: ToSocketAddrs>(addr: A, handle: &Handle) -> io::Result<Self> {
48 let tcp = ::std::net::TcpListener::bind(addr)?;
49 Ok(Server {
50 listener: TcpListener::from_std(tcp, handle)?,
51 ssl_acceptor: NoTlsAcceptor,
52 })
53 }
54
55 pub fn incoming(self) -> Incoming<TcpStream> {
65 let future = self
66 .listener
67 .incoming()
68 .and_then(|s| s.peer_addr().map(|a| (s, a)))
69 .map_err(|e| InvalidConnection {
70 stream: None,
71 parsed: None,
72 buffer: None,
73 error: e.into(),
74 })
75 .and_then(|(stream, a)| {
76 let handshake = stream
77 .into_ws()
78 .map_err(|(stream, req, buf, err)| InvalidConnection {
79 stream: Some(stream),
80 parsed: req,
81 buffer: Some(buf),
82 error: err,
83 })
84 .map(move |u| (u, a));
85 futures::future::ok(handshake)
86 })
87 .buffer_unordered(std::usize::MAX);
88 Box::new(future)
89 }
90}
91
92#[cfg(any(feature = "async-ssl"))]
94impl WsServer<TlsAcceptor, TcpListener> {
95 pub fn bind_secure<A: ToSocketAddrs>(
102 addr: A,
103 acceptor: TlsAcceptor,
104 handle: &Handle,
105 ) -> io::Result<Self> {
106 let tcp = ::std::net::TcpListener::bind(addr)?;
107 Ok(Server {
108 listener: TcpListener::from_std(tcp, handle)?,
109 ssl_acceptor: acceptor,
110 })
111 }
112
113 pub fn incoming(self) -> Incoming<TlsStream<TcpStream>> {
123 let acceptor = TlsAcceptorExt::from(self.ssl_acceptor);
124 let future = self
125 .listener
126 .incoming()
127 .and_then(|s| s.peer_addr().map(|a| (s, a)))
128 .map_err(|e| InvalidConnection {
129 stream: None,
130 parsed: None,
131 buffer: None,
132 error: e.into(),
133 })
134 .and_then(move |(stream, a)| {
135 let handshake = acceptor
136 .accept(stream)
137 .map_err(|e| {
138 InvalidConnection {
139 stream: None,
140 parsed: None,
141 buffer: None,
142 error: io::Error::new(io::ErrorKind::Other, e).into(),
144 }
145 })
146 .and_then(move |stream| {
147 stream
148 .into_ws()
149 .map_err(|(stream, req, buf, err)| InvalidConnection {
150 stream: Some(stream),
151 parsed: req,
152 buffer: Some(buf),
153 error: err,
154 })
155 .map(move |u| (u, a))
156 });
157 futures::future::ok(handshake)
158 })
159 .buffer_unordered(std::usize::MAX);
160 Box::new(future)
161 }
162}