pub struct WebSocketStream<S> { /* private fields */ }Expand description
An AsyncRead / AsyncWrite adapter over a fastwebsockets::WebSocket.
WebSocketStream<S> wraps a WebSocket<S> and exposes a byte-stream view
(implementing tokio::io::AsyncRead and tokio::io::AsyncWrite) so that
websocket application payloads can be used with existing I/O and codec
infrastructure such as tokio_util::codec::Framed.
§Behavior
- Incoming WebSocket data frames (Text or Binary depending on the stream’s
PayloadType) are presented as a continuous byte stream. Each data frame’s payload is returned in-order; if a read buffer provided by the caller is smaller than a frame payload, the remainder is buffered internally and served on subsequent reads. - Control frames (Ping/Pong) are handled by the underlying
WebSocket(auto-pong) or ignored by this adapter. ACloseframe marks EOF and subsequent reads returnOk(())with zero bytes (standard EOF semantics). - Writes produce single complete WebSocket data frames of the configured
PayloadType. Eachpoll_writecall sends one WebSocket data frame with the provided bytes as payload. The number of bytes reported as written is the length ofbufsupplied topoll_write.
§Notes on threading and ownership
The adapter temporarily takes ownership of the inner WebSocket when it
needs to perform an asynchronous read or write operation. To achieve this
without requiring WebSocket itself to be Sync/Send across await points
we spawn a boxed future that owns the websocket and returns it when the
operation completes. This is implemented internally using ReadState and
WriteState.
§Example
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use fastwebsockets::WebSocket;
use fastwebsockets_stream::{WebSocketStream, PayloadType};
// Wrap the websocket and apply a line-based codec:
async fn example<S>(_ws: WebSocket<S>)
where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static {
// This example is illustrative: constructing a real `WebSocket` requires
// an underlying transport (e.g. a `TcpStream`) and the fastwebsockets
// connection/handshake. Assume `ws` is a valid WebSocket<TcpStream>.
let ws: WebSocket<S> = unimplemented!();
let mut ws_stream = WebSocketStream::new(ws, PayloadType::Binary);
// Write bytes -> sends a Binary frame
let _n = ws_stream.write(b"hello").await;
// Read bytes
let mut buf = vec![0_u8; 1024];
let _ = ws_stream.read(&mut buf).await;
// Shutdown (sends Close)
let _ = ws_stream.shutdown().await;
}Another common usage is to use tokio_util::codec::Framed to apply a codec
on top of WebSocketStream (for example a length-delimited or line-based
codec). Example:
use tokio_util::codec::{Framed, LinesCodec};
use fastwebsockets::WebSocket;
use fastwebsockets_stream::{WebSocketStream, PayloadType};
// Wrap the websocket and apply a line-based codec:
async fn example<S>(_ws: WebSocket<S>)
where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static {
let ws: WebSocket<S> = unimplemented!();
let stream = WebSocketStream::new(ws, PayloadType::Text);
let mut framed = Framed::new(stream, LinesCodec::new());
// Now you can use framed.read() / framed.send() to work with String frames.
}Implementations§
Source§impl<S> WebSocketStream<S>
impl<S> WebSocketStream<S>
Sourcepub fn new(websocket: WebSocket<S>, payload_type: PayloadType) -> Self
pub fn new(websocket: WebSocket<S>, payload_type: PayloadType) -> Self
Create a new WebSocketStream wrapping the provided WebSocket.
This will enable automatic Pong replies and automatic Close handling on
the wrapped WebSocket and initialize internal buffers and state.
payload_type selects whether this stream should read/write Text or
Binary data. If the peer sends data frames with an opcode that does not
match payload_type, reads will return an error.
Sourcepub fn into_inner(self) -> Option<WebSocket<S>>
pub fn into_inner(self) -> Option<WebSocket<S>>
Consume the adapter and attempt to return the inner WebSocket.
This returns Some(WebSocket<S>) if the websocket currently resides in
the adapter. If there is an outstanding future that currently owns the
websocket (i.e. a read or write in progress) this method will return
None because the adapter cannot recover the websocket until that
future completes.
Trait Implementations§
Source§impl<S> AsyncRead for WebSocketStream<S>
impl<S> AsyncRead for WebSocketStream<S>
Source§impl<S> AsyncWrite for WebSocketStream<S>
impl<S> AsyncWrite for WebSocketStream<S>
Source§fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>>
fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>>
buf into the object. Read moreSource§fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
Source§fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>
Source§fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, Error>>
fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize, Error>>
poll_write, except that it writes from a slice of buffers. Read moreSource§fn is_write_vectored(&self) -> bool
fn is_write_vectored(&self) -> bool
poll_write_vectored
implementation. Read moreAuto Trait Implementations§
impl<S> Freeze for WebSocketStream<S>where
S: Freeze,
impl<S> !RefUnwindSafe for WebSocketStream<S>
impl<S> Send for WebSocketStream<S>where
S: Send,
impl<S> !Sync for WebSocketStream<S>
impl<S> Unpin for WebSocketStream<S>where
S: Unpin,
impl<S> !UnwindSafe for WebSocketStream<S>
Blanket Implementations§
Source§impl<R> AsyncReadExt for R
impl<R> AsyncReadExt for R
Source§fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>where
Self: Unpin,
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>where
Self: Unpin,
Source§fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
Source§fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>where
Self: Unpin,
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>where
Self: Unpin,
buf. Read moreSource§fn read_u8(&mut self) -> ReadU8<&mut Self>where
Self: Unpin,
fn read_u8(&mut self) -> ReadU8<&mut Self>where
Self: Unpin,
Source§fn read_i8(&mut self) -> ReadI8<&mut Self>where
Self: Unpin,
fn read_i8(&mut self) -> ReadI8<&mut Self>where
Self: Unpin,
Source§fn read_u16(&mut self) -> ReadU16<&mut Self>where
Self: Unpin,
fn read_u16(&mut self) -> ReadU16<&mut Self>where
Self: Unpin,
Source§fn read_i16(&mut self) -> ReadI16<&mut Self>where
Self: Unpin,
fn read_i16(&mut self) -> ReadI16<&mut Self>where
Self: Unpin,
Source§fn read_u32(&mut self) -> ReadU32<&mut Self>where
Self: Unpin,
fn read_u32(&mut self) -> ReadU32<&mut Self>where
Self: Unpin,
Source§fn read_i32(&mut self) -> ReadI32<&mut Self>where
Self: Unpin,
fn read_i32(&mut self) -> ReadI32<&mut Self>where
Self: Unpin,
Source§fn read_u64(&mut self) -> ReadU64<&mut Self>where
Self: Unpin,
fn read_u64(&mut self) -> ReadU64<&mut Self>where
Self: Unpin,
Source§fn read_i64(&mut self) -> ReadI64<&mut Self>where
Self: Unpin,
fn read_i64(&mut self) -> ReadI64<&mut Self>where
Self: Unpin,
Source§fn read_u128(&mut self) -> ReadU128<&mut Self>where
Self: Unpin,
fn read_u128(&mut self) -> ReadU128<&mut Self>where
Self: Unpin,
Source§fn read_i128(&mut self) -> ReadI128<&mut Self>where
Self: Unpin,
fn read_i128(&mut self) -> ReadI128<&mut Self>where
Self: Unpin,
Source§fn read_f32(&mut self) -> ReadF32<&mut Self>where
Self: Unpin,
fn read_f32(&mut self) -> ReadF32<&mut Self>where
Self: Unpin,
Source§fn read_f64(&mut self) -> ReadF64<&mut Self>where
Self: Unpin,
fn read_f64(&mut self) -> ReadF64<&mut Self>where
Self: Unpin,
Source§fn read_u16_le(&mut self) -> ReadU16Le<&mut Self>where
Self: Unpin,
fn read_u16_le(&mut self) -> ReadU16Le<&mut Self>where
Self: Unpin,
Source§fn read_i16_le(&mut self) -> ReadI16Le<&mut Self>where
Self: Unpin,
fn read_i16_le(&mut self) -> ReadI16Le<&mut Self>where
Self: Unpin,
Source§fn read_u32_le(&mut self) -> ReadU32Le<&mut Self>where
Self: Unpin,
fn read_u32_le(&mut self) -> ReadU32Le<&mut Self>where
Self: Unpin,
Source§fn read_i32_le(&mut self) -> ReadI32Le<&mut Self>where
Self: Unpin,
fn read_i32_le(&mut self) -> ReadI32Le<&mut Self>where
Self: Unpin,
Source§fn read_u64_le(&mut self) -> ReadU64Le<&mut Self>where
Self: Unpin,
fn read_u64_le(&mut self) -> ReadU64Le<&mut Self>where
Self: Unpin,
Source§fn read_i64_le(&mut self) -> ReadI64Le<&mut Self>where
Self: Unpin,
fn read_i64_le(&mut self) -> ReadI64Le<&mut Self>where
Self: Unpin,
Source§fn read_u128_le(&mut self) -> ReadU128Le<&mut Self>where
Self: Unpin,
fn read_u128_le(&mut self) -> ReadU128Le<&mut Self>where
Self: Unpin,
Source§fn read_i128_le(&mut self) -> ReadI128Le<&mut Self>where
Self: Unpin,
fn read_i128_le(&mut self) -> ReadI128Le<&mut Self>where
Self: Unpin,
Source§fn read_f32_le(&mut self) -> ReadF32Le<&mut Self>where
Self: Unpin,
fn read_f32_le(&mut self) -> ReadF32Le<&mut Self>where
Self: Unpin,
Source§fn read_f64_le(&mut self) -> ReadF64Le<&mut Self>where
Self: Unpin,
fn read_f64_le(&mut self) -> ReadF64Le<&mut Self>where
Self: Unpin,
Source§fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>where
Self: Unpin,
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>where
Self: Unpin,
buf. Read more