rocket_ws_community/
lib.rs

1//! WebSocket support for Rocket.
2//!
3//! This crate implements support for WebSockets via Rocket's [connection
4//! upgrade API](rocket::Response#upgrading) and
5//! [tungstenite](tokio_tungstenite).
6//!
7//! # Usage
8//!
9//! Depend on the crate. Here, we rename the dependency to `ws` for convenience:
10//!
11//! ```toml
12//! [dependencies]
13//! ws = { package = "rocket_ws", version = "0.1.3" }
14//! ```
15//!
16//! Then, use [`WebSocket`] as a request guard in any route and either call
17//! [`WebSocket::channel()`] or return a stream via [`Stream!`] or
18//! [`WebSocket::stream()`] in the handler. The examples below are equivalent:
19//!
20//! ```rust
21//! # extern crate rocket_ws_community as rocket_ws;
22//! # use rocket::get;
23//! # use rocket_ws as ws;
24//! #
25//! #[get("/echo?channel")]
26//! fn echo_channel(ws: ws::WebSocket) -> ws::Channel<'static> {
27//!     use rocket::futures::{SinkExt, StreamExt};
28//!
29//!     ws.channel(move |mut stream| Box::pin(async move {
30//!         while let Some(message) = stream.next().await {
31//!             let _ = stream.send(message?).await;
32//!         }
33//!
34//!         Ok(())
35//!     }))
36//! }
37//!
38//! #[get("/echo?stream")]
39//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
40//!     ws::Stream! { ws =>
41//!         for await message in ws {
42//!             yield message?;
43//!         }
44//!     }
45//! }
46//!
47//! #[get("/echo?compose")]
48//! fn echo_compose(ws: ws::WebSocket) -> ws::Stream!['static] {
49//!     ws.stream(|io| io)
50//! }
51//! ```
52//!
53//! WebSocket connections are configurable via [`WebSocket::config()`]:
54//!
55//! ```rust
56//! # extern crate rocket_ws_community as rocket_ws;
57//! # use rocket::get;
58//! # use rocket_ws as ws;
59//! #
60//! #[get("/echo")]
61//! fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
62//!     let ws = ws.config(ws::Config {
63//!         max_send_queue: Some(5),
64//!         ..Default::default()
65//!     });
66//!
67//!     ws::Stream! { ws =>
68//!         for await message in ws {
69//!             yield message?;
70//!         }
71//!     }
72//! }
73//! ```
74
75#![doc(html_root_url = "https://api.rocket.rs/master/rocket_ws")]
76#![doc(html_favicon_url = "https://rocket.rs/images/favicon.ico")]
77#![doc(html_logo_url = "https://rocket.rs/images/logo-boxed.png")]
78
79mod tungstenite {
80    #[doc(inline)]
81    pub use tokio_tungstenite::tungstenite::*;
82}
83
84mod duplex;
85mod websocket;
86
87pub use self::websocket::{Channel, WebSocket};
88
89/// A WebSocket message.
90///
91/// A value of this type is typically constructed by calling `.into()` on a
92/// supported message type. This includes strings via `&str` and `String` and
93/// bytes via `&[u8]` and `Vec<u8>`:
94///
95/// ```rust
96/// # extern crate rocket_ws_community as rocket_ws;
97/// # use rocket::get;
98/// # use rocket_ws as ws;
99/// #
100/// #[get("/echo")]
101/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
102///     ws::Stream! { ws =>
103///         yield "Hello".into();
104///         yield String::from("Hello").into();
105///         yield (&[1u8, 2, 3][..]).into();
106///         yield vec![1u8, 2, 3].into();
107///     }
108/// }
109/// ```
110///
111/// Other kinds of messages can be constructed directly:
112///
113/// ```rust
114/// # extern crate rocket_ws_community as rocket_ws;
115/// # use rocket::get;
116/// # use rocket_ws as ws;
117/// #
118/// #[get("/echo")]
119/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
120///     ws::Stream! { ws =>
121///         yield ws::Message::Ping(vec![b'h', b'i'])
122///     }
123/// }
124/// ```
125pub use self::tungstenite::Message;
126
127/// WebSocket connection configuration.
128///
129/// The default configuration for a [`WebSocket`] can be changed by calling
130/// [`WebSocket::config()`] with a value of this type. The defaults are obtained
131/// via [`Default::default()`]. You don't generally need to reconfigure a
132/// `WebSocket` unless you're certain you need different values. In other words,
133/// this structure should rarely be used.
134///
135/// # Example
136///
137/// ```rust
138/// # extern crate rocket_ws_community as rocket_ws;
139/// # use rocket::get;
140/// # use rocket_ws as ws;
141/// use rocket::data::ToByteUnit;
142///
143/// #[get("/echo")]
144/// fn echo_stream(ws: ws::WebSocket) -> ws::Stream!['static] {
145///     let ws = ws.config(ws::Config {
146///         // Enable backpressure with a max send queue size of `5`.
147///         max_send_queue: Some(5),
148///         // Decrease the maximum (complete) message size to 4MiB.
149///         max_message_size: Some(4.mebibytes().as_u64() as usize),
150///         // Decrease the maximum size of _one_ frame (not message) to 1MiB.
151///         max_frame_size: Some(1.mebibytes().as_u64() as usize),
152///         // Use the default values for the rest.
153///         ..Default::default()
154///     });
155///
156///     ws::Stream! { ws =>
157///         for await message in ws {
158///             yield message?;
159///         }
160///     }
161/// }
162/// ```
163///
164/// **Original `tungstenite` Documentation Follows**
165///
166pub use self::tungstenite::protocol::WebSocketConfig as Config;
167
168/// Structures for constructing raw WebSocket frames.
169pub mod frame {
170    pub use crate::tungstenite::protocol::frame::coding::CloseCode;
171    pub use crate::tungstenite::protocol::frame::{CloseFrame, Frame};
172    #[doc(hidden)]
173    pub use crate::Message;
174}
175
176/// Types representing incoming and/or outgoing `async` [`Message`] streams.
177pub mod stream {
178    pub use crate::duplex::DuplexStream;
179    pub use crate::websocket::MessageStream;
180}
181
182/// Library [`Error`](crate::result::Error) and
183/// [`Result`](crate::result::Result) types.
184pub mod result {
185    pub use crate::tungstenite::error::{Error, Result};
186}
187
188/// Type and expression macro for `async` WebSocket [`Message`] streams.
189///
190/// This macro can be used both where types are expected or
191/// where expressions are expected.
192///
193/// # Type Position
194///
195/// When used in a type position, the macro invoked as `Stream['r]` expands to:
196///
197/// - [`MessageStream`]`<'r, impl `[`Stream`]`<Item = `[`Result`]`<`[`Message`]`>>> + 'r>`
198///
199/// The lifetime need not be specified as `'r`. For instance, `Stream['request]`
200/// is valid and expands as expected:
201///
202/// - [`MessageStream`]`<'request, impl `[`Stream`]`<Item = `[`Result`]`<`[`Message`]`>>> + 'request>`
203///
204/// As a convenience, when the macro is invoked as `Stream![]`, the lifetime
205/// defaults to `'static`. That is, `Stream![]` is equivalent to
206/// `Stream!['static]`.
207///
208/// [`MessageStream`]: crate::stream::MessageStream
209/// [`Stream`]: rocket::futures::stream::Stream
210/// [`Result`]: crate::result::Result
211/// [`Message`]: crate::Message
212///
213/// # Expression Position
214///
215/// When invoked as an expression, the macro behaves similarly to Rocket's
216/// [`stream!`](rocket::response::stream::stream) macro. Specifically, it
217/// supports `yield` and `for await` syntax. It is invoked as follows:
218///
219/// ```rust
220/// # extern crate rocket_ws_community as rocket_ws;
221/// # use rocket::get;
222/// use rocket_ws as ws;
223///
224/// #[get("/")]
225/// fn echo(ws: ws::WebSocket) -> ws::Stream![] {
226///     ws::Stream! { ws =>
227///         for await message in ws {
228///             yield message?;
229///             yield "foo".into();
230///             yield vec![1, 2, 3, 4].into();
231///         }
232///     }
233/// }
234/// ```
235///
236/// It enjoins the following type requirements:
237///
238///   * The type of `ws` _must_ be [`WebSocket`]. `ws` can be any ident.
239///   * The type of yielded expressions (`expr` in `yield expr`) _must_ be [`Message`].
240///   * The `Err` type of expressions short-circuited with `?` _must_ be [`Error`].
241///
242/// [`Error`]: crate::result::Error
243///
244/// The macro takes any series of statements and expands them into an expression
245/// of type `impl Stream<Item = `[`Result`]`<T>>`, a stream that `yield`s elements of
246/// type [`Result`]`<T>`. It automatically converts yielded items of type `T` into
247/// `Ok(T)`. It supports any Rust statement syntax with the following
248/// extensions:
249///
250///   * `?` short-circuits stream termination on `Err`
251///
252///     The type of the error value must be [`Error`].
253///     <br /> <br />
254///
255///   * `yield expr`
256///
257///     Yields the result of evaluating `expr` to the caller (the stream
258///     consumer) wrapped in `Ok`.
259///
260///     `expr` must be of type `T`.
261///     <br /> <br />
262///
263///   * `for await x in stream { .. }`
264///
265///     `await`s the next element in `stream`, binds it to `x`, and executes the
266///     block with the binding.
267///
268///     `stream` must implement `Stream<Item = T>`; the type of `x` is `T`.
269///
270/// ### Examples
271///
272/// Borrow from the request. Send a single message and close:
273///
274/// ```rust
275/// # extern crate rocket_ws_community as rocket_ws;
276/// # use rocket::get;
277/// use rocket_ws as ws;
278///
279/// #[get("/hello/<user>")]
280/// fn ws_hello(ws: ws::WebSocket, user: &str) -> ws::Stream!['_] {
281///     ws::Stream! { ws =>
282///         yield user.into();
283///     }
284/// }
285/// ```
286///
287/// Borrow from the request with explicit lifetime:
288///
289/// ```rust
290/// # extern crate rocket_ws_community as rocket_ws;
291/// # use rocket::get;
292/// use rocket_ws as ws;
293///
294/// #[get("/hello/<user>")]
295/// fn ws_hello<'r>(ws: ws::WebSocket, user: &'r str) -> ws::Stream!['r] {
296///     ws::Stream! { ws =>
297///         yield user.into();
298///     }
299/// }
300/// ```
301///
302/// Emit several messages and short-circuit if the client sends a bad message:
303///
304/// ```rust
305/// # extern crate rocket_ws_community as rocket_ws;
306/// # use rocket::get;
307/// use rocket_ws as ws;
308///
309/// #[get("/")]
310/// fn echo(ws: ws::WebSocket) -> ws::Stream![] {
311///     ws::Stream! { ws =>
312///         for await message in ws {
313///             for i in 0..5u8 {
314///                 yield i.to_string().into();
315///             }
316///
317///             yield message?;
318///         }
319///     }
320/// }
321/// ```
322///
323#[macro_export]
324macro_rules! Stream {
325    () => ($crate::Stream!['static]);
326    ($l:lifetime) => (
327        $crate::stream::MessageStream<$l, impl rocket::futures::Stream<
328            Item = $crate::result::Result<$crate::Message>
329        > + $l>
330    );
331    ($channel:ident => $($token:tt)*) => (
332        let ws: $crate::WebSocket = $channel;
333        ws.stream(move |$channel| rocket::async_stream::try_stream! {
334            $($token)*
335        })
336    );
337}