websocket/server/
async.rs

1//! The asynchronous implementation of a websocket server.
2use crate::server::upgrade::r#async::{IntoWs, Upgrade};
3use crate::server::InvalidConnection;
4use crate::server::{NoTlsAcceptor, OptionalTlsAcceptor, WsServer};
5use bytes::BytesMut;
6use futures;
7use futures::{Future, Stream};
8use std;
9use std::io;
10use std::net::SocketAddr;
11use std::net::ToSocketAddrs;
12pub use tokio_reactor::Handle;
13use tokio_tcp::{TcpListener, TcpStream};
14
15#[cfg(any(feature = "async-ssl"))]
16use native_tls::TlsAcceptor;
17#[cfg(any(feature = "async-ssl"))]
18use tokio_tls::{TlsAcceptor as TlsAcceptorExt, TlsStream};
19
20/// The asynchronous specialization of a websocket server.
21/// Use this struct to create asynchronous servers.
22pub type Server<S> = WsServer<S, TcpListener>;
23
24/// A stream of websocket connections and addresses the server generates.
25///
26/// Each item of the stream is the address of the incoming connection and an `Upgrade`
27/// struct which lets the user decide whether to turn the connection into a websocket
28/// connection or reject it.
29pub type Incoming<S> =
30	Box<dyn Stream<Item = (Upgrade<S>, SocketAddr), Error = InvalidConnection<S, BytesMut>> + Send>;
31
32impl<S> WsServer<S, TcpListener>
33where
34	S: OptionalTlsAcceptor,
35{
36	/// Get the socket address of this server
37	pub fn local_addr(&self) -> io::Result<SocketAddr> {
38		self.listener.local_addr()
39	}
40}
41
42/// Asynchronous methods for creating an async server and accepting incoming connections.
43impl WsServer<NoTlsAcceptor, TcpListener> {
44	/// Bind a websocket server to an address.
45	/// Creating a websocket server can be done immediately so this does not
46	/// return a `Future` but a simple `Result`.
47	pub fn bind<A: ToSocketAddrs>(addr: A, handle: &Handle) -> io::Result<Self> {
48		let tcp = ::std::net::TcpListener::bind(addr)?;
49		Ok(Server {
50			listener: TcpListener::from_std(tcp, handle)?,
51			ssl_acceptor: NoTlsAcceptor,
52		})
53	}
54
55	/// Turns the server into a stream of connection objects.
56	///
57	/// Each item of the stream is the address of the incoming connection and an `Upgrade`
58	/// struct which lets the user decide whether to turn the connection into a websocket
59	/// connection or reject it.
60	///
61	/// See the [`examples/async-server.rs`]
62	/// (https://github.com/cyderize/rust-websocket/blob/master/examples/async-server.rs)
63	/// example for a good echo server example.
64	pub fn incoming(self) -> Incoming<TcpStream> {
65		let future = self
66			.listener
67			.incoming()
68			.and_then(|s| s.peer_addr().map(|a| (s, a)))
69			.map_err(|e| InvalidConnection {
70				stream: None,
71				parsed: None,
72				buffer: None,
73				error: e.into(),
74			})
75			.and_then(|(stream, a)| {
76				let handshake = stream
77					.into_ws()
78					.map_err(|(stream, req, buf, err)| InvalidConnection {
79						stream: Some(stream),
80						parsed: req,
81						buffer: Some(buf),
82						error: err,
83					})
84					.map(move |u| (u, a));
85				futures::future::ok(handshake)
86			})
87			.buffer_unordered(std::usize::MAX);
88		Box::new(future)
89	}
90}
91
92/// Asynchronous methods for creating an async SSL server and accepting incoming connections.
93#[cfg(any(feature = "async-ssl"))]
94impl WsServer<TlsAcceptor, TcpListener> {
95	/// Bind an SSL websocket server to an address.
96	/// Creating a websocket server can be done immediately so this does not
97	/// return a `Future` but a simple `Result`.
98	///
99	/// Since this is an SSL server one needs to provide a `TlsAcceptor` that contains
100	/// the server's SSL information.
101	pub fn bind_secure<A: ToSocketAddrs>(
102		addr: A,
103		acceptor: TlsAcceptor,
104		handle: &Handle,
105	) -> io::Result<Self> {
106		let tcp = ::std::net::TcpListener::bind(addr)?;
107		Ok(Server {
108			listener: TcpListener::from_std(tcp, handle)?,
109			ssl_acceptor: acceptor,
110		})
111	}
112
113	/// Turns the server into a stream of connection objects.
114	///
115	/// Each item of the stream is the address of the incoming connection and an `Upgrade`
116	/// struct which lets the user decide whether to turn the connection into a websocket
117	/// connection or reject it.
118	///
119	/// See the [`examples/async-server.rs`]
120	/// (https://github.com/cyderize/rust-websocket/blob/master/examples/async-server.rs)
121	/// example for a good echo server example.
122	pub fn incoming(self) -> Incoming<TlsStream<TcpStream>> {
123		let acceptor = TlsAcceptorExt::from(self.ssl_acceptor);
124		let future = self
125			.listener
126			.incoming()
127			.and_then(|s| s.peer_addr().map(|a| (s, a)))
128			.map_err(|e| InvalidConnection {
129				stream: None,
130				parsed: None,
131				buffer: None,
132				error: e.into(),
133			})
134			.and_then(move |(stream, a)| {
135				let handshake = acceptor
136					.accept(stream)
137					.map_err(|e| {
138						InvalidConnection {
139							stream: None,
140							parsed: None,
141							buffer: None,
142							// TODO: better error types
143							error: io::Error::new(io::ErrorKind::Other, e).into(),
144						}
145					})
146					.and_then(move |stream| {
147						stream
148							.into_ws()
149							.map_err(|(stream, req, buf, err)| InvalidConnection {
150								stream: Some(stream),
151								parsed: req,
152								buffer: Some(buf),
153								error: err,
154							})
155							.map(move |u| (u, a))
156					});
157				futures::future::ok(handshake)
158			})
159			.buffer_unordered(std::usize::MAX);
160		Box::new(future)
161	}
162}