fastwebsockets_stream/
lib.rs

1//! # fastwebsockets-stream
2//!
3//! `fastwebsockets-stream` provides an adapter that exposes a `fastwebsockets::WebSocket`
4//! as an `AsyncRead` / `AsyncWrite` byte stream compatible with `tokio` and
5//! utilities such as `tokio_util::codec::Framed`.
6//!
7//! ## Overview
8//!
9//! The adapter type is [`WebSocketStream`], which wraps a `fastwebsockets::WebSocket<S>`
10//! and presents websocket application payloads as a continuous byte stream.
11//! This is useful when you want to reuse existing codecs (length-delimited,
12//! line-based, protobuf, etc.) or any code that operates on `AsyncRead` /
13//! `AsyncWrite` without reimplementing websocket framing logic.
14//!
15//! The adapter supports both text and binary payloads (controlled by
16//! [`PayloadType`]) and will validate that incoming data frames match the
17//! configured payload type. Control frames (Ping/Pong) are handled automatically
18//! by the underlying `fastwebsockets::WebSocket` (auto-pong) and `Close` frames
19//! are translated to EOF for `AsyncRead` consumers.
20//!
21//! ## Key types
22//!
23//! * [`WebSocketStream<S>`] — the main adapter implementing `tokio::io::AsyncRead`
24//!   and `tokio::io::AsyncWrite`.
25//! * [`PayloadType`] — selects whether the stream works with Text or Binary
26//!   application frames.
27//!
28//! ## Examples
29//!
30//! The examples below demonstrate a minimal server and a client that speak a
31//! simple binary protocol. They create an actual TCP listener, upgrade the
32//! connection to WebSocket using `fastwebsockets` + `hyper` and then use
33//! `WebSocketStream` as an `AsyncRead`/`AsyncWrite` stream.
34//!
35//! > Note: these examples are integration-style and spawn network tasks. They
36//! > are intended to be runnable in a test/runtime environment (they use
37//! > `tokio` and `hyper` helpers).
38//!
39//! ### Server example
40//!
41//! ```rust
42//! use fastwebsockets::{Frame, OpCode, WebSocketError, upgrade};
43//! use fastwebsockets_stream::{PayloadType, WebSocketStream};
44//! use http_body_util::Empty;
45//! use hyper::Request;
46//! use hyper::Response;
47//! use hyper::body::Bytes;
48//! use hyper::body::Incoming;
49//! use hyper::header::CONNECTION;
50//! use hyper::header::UPGRADE;
51//! use hyper::server::conn::http1;
52//! use hyper::service::service_fn;
53//! use hyper_util::rt::TokioIo;
54//! use std::future::Future;
55//! use std::net::Ipv4Addr;
56//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
57//! use tokio::net::TcpListener;
58//!
59//! struct SpawnExecutor;
60//!
61//! impl<F> hyper::rt::Executor<F> for SpawnExecutor
62//! where
63//!     F: Future + Send + 'static,
64//!     F::Output: Send + 'static,
65//! {
66//!     fn execute(&self, fut: F) {
67//!         // Spawn the provided future onto tokio's executor.
68//!         tokio::task::spawn(fut);
69//!     }
70//! }
71//!
72//! #[tokio::test]
73//! async fn server_example() {
74//!     // Bind a local ephemeral port.
75//!     let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0u16))
76//!         .await
77//!         .unwrap();
78//!     let addr = listener.local_addr().unwrap();
79//!
80//!     // Spawn a task that accepts incoming TCP connections and upgrades them.
81//!     tokio::spawn(async move {
82//!         loop {
83//!             let (stream, _) = listener.accept().await.unwrap();
84//!             let io = TokioIo::new(stream);
85//!
86//!             // Serve a single HTTP/1.1 connection that supports upgrades.
87//!             tokio::spawn(async move {
88//!                 if let Err(err) = http1::Builder::new()
89//!                     .serve_connection(io, service_fn(handle))
90//!                     .with_upgrades()
91//!                     .await
92//!                 {
93//!                     eprintln!("Error serving connection: {:?}", err);
94//!                 }
95//!             });
96//!         }
97//!     });
98//!
99//!     // The server runs in the background; we just ensure it starts successfully.
100//!     println!("Server listening on {}", addr);
101//! }
102//!
103//! async fn handle(mut request: Request<Incoming>) -> Result<Response<Empty<Bytes>>, WebSocketError> {
104//!     // Ensure this is an upgrade request for WebSocket.
105//!     assert!(upgrade::is_upgrade_request(&request));
106//!
107//!     // Convert the request into a WebSocket upgrade response and a future
108//!     // that resolves to the established WebSocket once the upgrade completes.
109//!     let (response, ws_fut) = upgrade::upgrade(&mut request)?;
110//!
111//!     // Spawn the application logic that talks over the websocket.
112//!     tokio::spawn(async move {
113//!         // Allocate a small buffer for reads.
114//!         let mut buf = [0u8; 6];
115//!
116//!         // Await the completed websocket.
117//!         let mut websocket = ws_fut.await.unwrap();
118//!
119//!         let message = websocket.read_frame().await.unwrap();
120//!         assert_eq!(message.opcode, OpCode::Binary);
121//!
122//!         // Write the echoed message back.
123//!         let _ = websocket.write_frame(Frame::binary(message.payload))
124//!             .await
125//!             .unwrap();
126//!
127//!         let message = websocket.read_frame().await.unwrap();
128//!         assert_eq!(message.opcode, OpCode::Close);
129//!     });
130//!
131//!     Ok(response)
132//! }
133//! ```
134//!
135//! ### Client example
136//!
137//! ```rust
138//! use fastwebsockets::handshake::client;
139//! use fastwebsockets::handshake;
140//! use fastwebsockets::{Frame, OpCode};
141//! use fastwebsockets_stream::{PayloadType, WebSocketStream};
142//! use http_body_util::Empty;
143//! use hyper::header::CONNECTION;
144//! use hyper::header::UPGRADE;
145//! use hyper::Request;
146//! use hyper::body::Bytes;
147//! use tokio::io::{AsyncReadExt, AsyncWriteExt};
148//! use tokio::net::TcpStream;
149//! use std::future::Future;
150//!
151//! struct SpawnExecutor;
152//!
153//! impl<F> hyper::rt::Executor<F> for SpawnExecutor
154//! where
155//!     F: Future + Send + 'static,
156//!     F::Output: Send + 'static,
157//! {
158//!     fn execute(&self, fut: F) {
159//!         tokio::task::spawn(fut);
160//!     }
161//! }
162//!
163//! #[tokio::test]
164//! async fn client_example() {
165//!     // Connect to a running server (update address to your server).
166//!     // This example assumes a server like the one in the server example is running.
167//!     let addr = "127.0.0.1:9000";
168//!     let stream = TcpStream::connect(addr).await.unwrap();
169//!
170//!     // Build a WebSocket client handshake request.
171//!     let request = Request::builder()
172//!         .method("GET")
173//!         .uri("ws://127.0.0.1:9000")
174//!         .header("Host", "127.0.0.1")
175//!         .header(UPGRADE, "websocket")
176//!         .header(CONNECTION, "upgrade")
177//!         .header("Sec-WebSocket-Key", handshake::generate_key())
178//!         .header("Sec-WebSocket-Version", "13")
179//!         .body(Empty::<Bytes>::new())
180//!         .unwrap();
181//!
182//!     // Perform the client handshake. `SpawnExecutor` is used by `hyper` to run
183//!     // internal tasks that may be required during the handshake.
184//!     let (stream, _response) = client(&SpawnExecutor, request, stream)
185//!         .await
186//!         .unwrap();
187//!
188//!     // Wrap websocket with the adapter and perform a small exchange.
189//!     let mut ws_stream = WebSocketStream::new(stream, PayloadType::Binary);
190//!     let mut buf = [0 as u8; 6];
191//!
192//!     let mut bytes = ws_stream.write(b"Hello!").await.unwrap();
193//!     assert_eq!(bytes, 6);
194//!
195//!     bytes = ws_stream.read(&mut buf).await.unwrap();
196//!     assert_eq!(bytes, 6);
197//!     assert_eq!(&buf, b"Hello!");
198//! }
199//! ```
200//!
201//! ## Notes and caveats
202//!
203//! * Each call to `AsyncWrite::poll_write` will produce a single WebSocket data
204//!   frame containing exactly the bytes provided in that call. If you need to
205//!   stream a large logical message across multiple websocket frames, you
206//!   should implement framing at the codec layer above the `WebSocketStream`.
207//! * If the peer sends a data frame with an opcode that doesn't match the
208//!   configured `PayloadType` (e.g. the stream is configured `Binary` but the
209//!   peer sends `Text`), reads will return an error.
210//! * `into_inner` on the adapter will return the inner `WebSocket` only if no
211//!   read/write future currently owns it (i.e. there is no in-flight read or
212//!   write). If an operation is in-progress, `into_inner` will return `None`.
213//!
214//! ## See Also
215//! - [`fastwebsockets`](https://docs.rs/fastwebsockets)
216//! - [`tokio::io::AsyncRead`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html)
217//! - [`tokio::io::AsyncWrite`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncWrite.html)
218//!
219//! ## License
220//!
221//! Licensed under the [MIT License](https://opensource.org/licenses/MIT).
222//!
223//! See the `LICENSE` file in the repository root for details.
224
225mod stream;
226
227pub use stream::PayloadType;
228pub use stream::WebSocketStream;