fastwebsockets_stream/lib.rs
1//! # fastwebsockets-stream
2//!
3//! `fastwebsockets-stream` provides an adapter that exposes a `fastwebsockets::WebSocket`
4//! as an `AsyncRead` / `AsyncWrite` byte stream compatible with `tokio` and
5//! utilities such as `tokio_util::codec::Framed`.
6//!
7//! ## Overview
8//!
9//! The adapter type is [`WebSocketStream`], which wraps a `fastwebsockets::WebSocket<S>`
10//! and presents websocket application payloads as a continuous byte stream.
11//! This is useful when you want to reuse existing codecs (length-delimited,
12//! line-based, protobuf, etc.) or any code that operates on `AsyncRead` /
13//! `AsyncWrite` without reimplementing websocket framing logic.
14//!
15//! The adapter supports both text and binary payloads (controlled by
16//! [`PayloadType`]) and will validate that incoming data frames match the
17//! configured payload type. Control frames (Ping/Pong) are handled automatically
18//! by the underlying `fastwebsockets::WebSocket` (auto-pong) and `Close` frames
19//! are translated to EOF for `AsyncRead` consumers.
20//!
21//! ## Key types
22//!
23//! * [`WebSocketStream<S>`] — the main adapter implementing `tokio::io::AsyncRead`
24//! and `tokio::io::AsyncWrite`.
25//! * [`PayloadType`] — selects whether the stream works with Text or Binary
26//! application frames.
27//!
28//! ## Examples
29//!
30//! The examples below demonstrate a minimal server and a client that speak a
31//! simple binary protocol. They create an actual TCP listener, upgrade the
32//! connection to WebSocket using `fastwebsockets` + `hyper` and then use
33//! `WebSocketStream` as an `AsyncRead`/`AsyncWrite` stream.
34//!
35//! > Note: these examples are integration-style and spawn network tasks. They
36//! > are intended to be runnable in a test/runtime environment (they use
37//! > `tokio` and `hyper` helpers).
38//!
39//! ### Server example
40//!
41//! ```rust
42//! use fastwebsockets::{Frame, OpCode, WebSocketError, upgrade};
43//! use fastwebsockets_stream::{PayloadType, WebSocketStream};
44//! use http_body_util::Empty;
45//! use hyper::Request;
46//! use hyper::Response;
47//! use hyper::body::Bytes;
48//! use hyper::body::Incoming;
49//! use hyper::header::CONNECTION;
50//! use hyper::header::UPGRADE;
51//! use hyper::server::conn::http1;
52//! use hyper::service::service_fn;
53//! use hyper_util::rt::TokioIo;
54//! use std::future::Future;
55//! use std::net::Ipv4Addr;
56//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
57//! use tokio::net::TcpListener;
58//!
59//! struct SpawnExecutor;
60//!
61//! impl<F> hyper::rt::Executor<F> for SpawnExecutor
62//! where
63//! F: Future + Send + 'static,
64//! F::Output: Send + 'static,
65//! {
66//! fn execute(&self, fut: F) {
67//! // Spawn the provided future onto tokio's executor.
68//! tokio::task::spawn(fut);
69//! }
70//! }
71//!
72//! #[tokio::test]
73//! async fn server_example() {
74//! // Bind a local ephemeral port.
75//! let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0u16))
76//! .await
77//! .unwrap();
78//! let addr = listener.local_addr().unwrap();
79//!
80//! // Spawn a task that accepts incoming TCP connections and upgrades them.
81//! tokio::spawn(async move {
82//! loop {
83//! let (stream, _) = listener.accept().await.unwrap();
84//! let io = TokioIo::new(stream);
85//!
86//! // Serve a single HTTP/1.1 connection that supports upgrades.
87//! tokio::spawn(async move {
88//! if let Err(err) = http1::Builder::new()
89//! .serve_connection(io, service_fn(handle))
90//! .with_upgrades()
91//! .await
92//! {
93//! eprintln!("Error serving connection: {:?}", err);
94//! }
95//! });
96//! }
97//! });
98//!
99//! // The server runs in the background; we just ensure it starts successfully.
100//! println!("Server listening on {}", addr);
101//! }
102//!
103//! async fn handle(mut request: Request<Incoming>) -> Result<Response<Empty<Bytes>>, WebSocketError> {
104//! // Ensure this is an upgrade request for WebSocket.
105//! assert!(upgrade::is_upgrade_request(&request));
106//!
107//! // Convert the request into a WebSocket upgrade response and a future
108//! // that resolves to the established WebSocket once the upgrade completes.
109//! let (response, ws_fut) = upgrade::upgrade(&mut request)?;
110//!
111//! // Spawn the application logic that talks over the websocket.
112//! tokio::spawn(async move {
113//! // Allocate a small buffer for reads.
114//! let mut buf = [0u8; 6];
115//!
116//! // Await the completed websocket.
117//! let mut websocket = ws_fut.await.unwrap();
118//!
119//! let message = websocket.read_frame().await.unwrap();
120//! assert_eq!(message.opcode, OpCode::Binary);
121//!
122//! // Write the echoed message back.
123//! let _ = websocket.write_frame(Frame::binary(message.payload))
124//! .await
125//! .unwrap();
126//!
127//! let message = websocket.read_frame().await.unwrap();
128//! assert_eq!(message.opcode, OpCode::Close);
129//! });
130//!
131//! Ok(response)
132//! }
133//! ```
134//!
135//! ### Client example
136//!
137//! ```rust
138//! use fastwebsockets::handshake::client;
139//! use fastwebsockets::handshake;
140//! use fastwebsockets::{Frame, OpCode};
141//! use fastwebsockets_stream::{PayloadType, WebSocketStream};
142//! use http_body_util::Empty;
143//! use hyper::header::CONNECTION;
144//! use hyper::header::UPGRADE;
145//! use hyper::Request;
146//! use hyper::body::Bytes;
147//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
148//! use tokio::net::TcpStream;
149//! use std::future::Future;
150//!
151//! struct SpawnExecutor;
152//!
153//! impl<F> hyper::rt::Executor<F> for SpawnExecutor
154//! where
155//! F: Future + Send + 'static,
156//! F::Output: Send + 'static,
157//! {
158//! fn execute(&self, fut: F) {
159//! tokio::task::spawn(fut);
160//! }
161//! }
162//!
163//! #[tokio::test]
164//! async fn client_example() {
165//! // Connect to a running server (update address to your server).
166//! // This example assumes a server like the one in the server example is running.
167//! let addr = "127.0.0.1:9000";
168//! let stream = TcpStream::connect(addr).await.unwrap();
169//!
170//! // Build a WebSocket client handshake request.
171//! let request = Request::builder()
172//! .method("GET")
173//! .uri("ws://127.0.0.1:9000")
174//! .header("Host", "127.0.0.1")
175//! .header(UPGRADE, "websocket")
176//! .header(CONNECTION, "upgrade")
177//! .header("Sec-WebSocket-Key", handshake::generate_key())
178//! .header("Sec-WebSocket-Version", "13")
179//! .body(Empty::<Bytes>::new())
180//! .unwrap();
181//!
182//! // Perform the client handshake. `SpawnExecutor` is used by `hyper` to run
183//! // internal tasks that may be required during the handshake.
184//! let (stream, _response) = client(&SpawnExecutor, request, stream)
185//! .await
186//! .unwrap();
187//!
188//! // Wrap websocket with the adapter and perform a small exchange.
189//! let mut ws_stream = WebSocketStream::new(stream, PayloadType::Binary);
190//! let mut buf = [0 as u8; 6];
191//!
192//! let mut bytes = ws_stream.write(b"Hello!").await.unwrap();
193//! assert_eq!(bytes, 6);
194//!
195//! bytes = ws_stream.read(&mut buf).await.unwrap();
196//! assert_eq!(bytes, 6);
197//! assert_eq!(&buf, b"Hello!");
198//! }
199//! ```
200//!
201//! ## Notes and caveats
202//!
203//! * Each call to `AsyncWrite::poll_write` will produce a single WebSocket data
204//! frame containing exactly the bytes provided in that call. If you need to
205//! stream a large logical message across multiple websocket frames, you
206//! should implement framing at the codec layer above the `WebSocketStream`.
207//! * If the peer sends a data frame with an opcode that doesn't match the
208//! configured `PayloadType` (e.g. the stream is configured `Binary` but the
209//! peer sends `Text`), reads will return an error.
210//! * `into_inner` on the adapter will return the inner `WebSocket` only if no
211//! read/write future currently owns it (i.e. there is no in-flight read or
212//! write). If an operation is in-progress, `into_inner` will return `None`.
213//!
214//! ## See Also
215//! - [`fastwebsockets`](https://docs.rs/fastwebsockets)
216//! - [`tokio::io::AsyncRead`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html)
217//! - [`tokio::io::AsyncWrite`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncWrite.html)
218//!
219//! ## License
220//!
221//! Licensed under the [MIT License](https://opensource.org/licenses/MIT).
222//!
223//! See the `LICENSE` file in the repository root for details.
224
225mod stream;
226
227pub use stream::PayloadType;
228pub use stream::WebSocketStream;