WebSocketStream

Struct WebSocketStream 

Source
pub struct WebSocketStream<S> { /* private fields */ }
Expand description

A WebSocket stream over an async transport

This type implements both Stream<Item = Result<Message>> for receiving and Sink<Message> for sending messages.

§Backpressure

The stream supports backpressure monitoring through is_backpressured() and write_buffer_len() methods. When the write buffer exceeds the high water mark, producers should pause sending until the buffer drains below the low water mark.

§Example

use futures_util::{SinkExt, StreamExt};
use sockudo_ws::WebSocketStream;

async fn handle(mut ws: WebSocketStream<TcpStream>) {
    while let Some(msg) = ws.next().await {
        match msg {
            Ok(Message::Text(text)) => {
                // Check backpressure before sending
                if ws.is_backpressured() {
                    ws.flush().await?;
                }
                ws.send(Message::Text(text)).await?;
            }
            Ok(Message::Close(_)) => break,
            _ => {}
        }
    }
}

Implementations§

Source§

impl<S> WebSocketStream<S>
where S: AsyncRead + AsyncWrite + Unpin,

Source

pub fn from_raw(inner: S, role: Role, config: Config) -> Self

Create a new WebSocket stream from an already-upgraded connection

Source

pub fn server(inner: S, config: Config) -> Self

Create a server-side WebSocket stream

Source

pub fn client(inner: S, config: Config) -> Self

Create a client-side WebSocket stream

Source

pub fn get_ref(&self) -> &S

Get a reference to the underlying stream

Source

pub fn get_mut(&mut self) -> &mut S

Get a mutable reference to the underlying stream

Source

pub fn into_inner(self) -> S

Consume the WebSocket stream and return the underlying stream

Source

pub fn is_closed(&self) -> bool

Check if the connection is closed

Source

pub fn is_backpressured(&self) -> bool

Check if the write buffer is backpressured

Returns true when the write buffer has exceeded the high water mark. Producers should pause sending new messages until is_write_buffer_low() returns true or until the buffer is flushed.

§Example
if ws.is_backpressured() {
    // Wait for buffer to drain before sending more
    ws.flush().await?;
}
Source

pub fn is_write_buffer_low(&self) -> bool

Check if the write buffer is below the low water mark

Returns true when the write buffer has drained below the low water mark. This can be used to resume sending after backpressure was detected.

Source

pub fn write_buffer_len(&self) -> usize

Get the current write buffer size in bytes

Useful for monitoring and debugging backpressure issues.

Source

pub fn read_buffer_len(&self) -> usize

Get the current read buffer size in bytes

Useful for monitoring memory usage and debugging.

Source

pub fn set_high_water_mark(&mut self, size: usize)

Set the high water mark for backpressure

When the write buffer exceeds this threshold, is_backpressured() returns true. Default is 64KB.

Source

pub fn set_low_water_mark(&mut self, size: usize)

Set the low water mark for backpressure

When the write buffer drops below this threshold, is_write_buffer_low() returns true. Default is 16KB.

Source

pub fn high_water_mark(&self) -> usize

Get the current high water mark

Source

pub fn low_water_mark(&self) -> usize

Get the current low water mark

Source

pub async fn close(&mut self, code: u16, reason: &str) -> Result<()>

Send a close frame

Source§

impl<S> WebSocketStream<S>
where S: AsyncRead + AsyncWrite + Unpin,

Source

pub fn split(self) -> (SplitReader<S>, SplitWriter<S>)

Split the WebSocket stream into separate read and write halves

This allows TRUE concurrent reading and writing from different tasks with ZERO lock contention. The underlying TCP stream is split at the OS level for maximum performance.

§Example
let (mut reader, mut writer) = ws.split();

// Read in one task - NEVER blocks writer
tokio::spawn(async move {
    while let Some(msg) = reader.next().await {
        println!("Got: {:?}", msg);
    }
});

// Write in another - NEVER blocks reader
writer.send(Message::Text("Hello".into())).await?;

Trait Implementations§

Source§

impl<S> Sink<Message> for WebSocketStream<S>
where S: AsyncRead + AsyncWrite + Unpin,

Source§

type Error = Error

The type of value produced by the sink when an error occurs.
Source§

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>

Attempts to prepare the Sink to receive a value. Read more
Source§

fn start_send(self: Pin<&mut Self>, item: Message) -> Result<()>

Begin the process of sending a value to the sink. Each call to this function must be preceded by a successful call to poll_ready which returned Poll::Ready(Ok(())). Read more
Source§

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output from this sink. Read more
Source§

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>

Flush any remaining output and close this sink, if necessary. Read more
Source§

impl<S> Stream for WebSocketStream<S>
where S: AsyncRead + AsyncWrite + Unpin,

Source§

type Item = Result<Message, Error>

Values yielded by the stream.
Source§

fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted. Read more
Source§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the stream. Read more
Source§

impl<'__pin, S> Unpin for WebSocketStream<S>
where PinnedFieldsOf<__Origin<'__pin, S>>: Unpin,

Auto Trait Implementations§

§

impl<S> Freeze for WebSocketStream<S>
where S: Freeze,

§

impl<S> RefUnwindSafe for WebSocketStream<S>
where S: RefUnwindSafe,

§

impl<S> Send for WebSocketStream<S>
where S: Send,

§

impl<S> Sync for WebSocketStream<S>
where S: Sync,

§

impl<S> UnwindSafe for WebSocketStream<S>
where S: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<S, T, E> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

Source§

type Ok = T

The type of successful values yielded by this future
Source§

type Error = E

The type of failures yielded by this future
Source§

fn try_poll_next( self: Pin<&mut S>, cx: &mut Context<'_>, ) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

Poll this TryStream as if it were a Stream. Read more