pub struct WebSocketStream<S> { /* private fields */ }Expand description
A WebSocket stream over an async transport
This type implements both Stream<Item = Result<Message>> for receiving
and Sink<Message> for sending messages.
§Backpressure
The stream supports backpressure monitoring through is_backpressured() and
write_buffer_len() methods. When the write buffer exceeds the high water mark,
producers should pause sending until the buffer drains below the low water mark.
§Example
use futures_util::{SinkExt, StreamExt};
use sockudo_ws::WebSocketStream;
async fn handle(mut ws: WebSocketStream<TcpStream>) {
while let Some(msg) = ws.next().await {
match msg {
Ok(Message::Text(text)) => {
// Check backpressure before sending
if ws.is_backpressured() {
ws.flush().await?;
}
ws.send(Message::Text(text)).await?;
}
Ok(Message::Close(_)) => break,
_ => {}
}
}
}Implementations§
Source§impl<S> WebSocketStream<S>
impl<S> WebSocketStream<S>
Sourcepub fn from_raw(inner: S, role: Role, config: Config) -> Self
pub fn from_raw(inner: S, role: Role, config: Config) -> Self
Create a new WebSocket stream from an already-upgraded connection
Sourcepub fn into_inner(self) -> S
pub fn into_inner(self) -> S
Consume the WebSocket stream and return the underlying stream
Sourcepub fn is_backpressured(&self) -> bool
pub fn is_backpressured(&self) -> bool
Check if the write buffer is backpressured
Returns true when the write buffer has exceeded the high water mark.
Producers should pause sending new messages until is_write_buffer_low()
returns true or until the buffer is flushed.
§Example
if ws.is_backpressured() {
// Wait for buffer to drain before sending more
ws.flush().await?;
}Sourcepub fn is_write_buffer_low(&self) -> bool
pub fn is_write_buffer_low(&self) -> bool
Check if the write buffer is below the low water mark
Returns true when the write buffer has drained below the low water mark.
This can be used to resume sending after backpressure was detected.
Sourcepub fn write_buffer_len(&self) -> usize
pub fn write_buffer_len(&self) -> usize
Get the current write buffer size in bytes
Useful for monitoring and debugging backpressure issues.
Sourcepub fn read_buffer_len(&self) -> usize
pub fn read_buffer_len(&self) -> usize
Get the current read buffer size in bytes
Useful for monitoring memory usage and debugging.
Sourcepub fn set_high_water_mark(&mut self, size: usize)
pub fn set_high_water_mark(&mut self, size: usize)
Set the high water mark for backpressure
When the write buffer exceeds this threshold, is_backpressured() returns true.
Default is 64KB.
Sourcepub fn set_low_water_mark(&mut self, size: usize)
pub fn set_low_water_mark(&mut self, size: usize)
Set the low water mark for backpressure
When the write buffer drops below this threshold, is_write_buffer_low() returns true.
Default is 16KB.
Sourcepub fn high_water_mark(&self) -> usize
pub fn high_water_mark(&self) -> usize
Get the current high water mark
Sourcepub fn low_water_mark(&self) -> usize
pub fn low_water_mark(&self) -> usize
Get the current low water mark
Source§impl<S> WebSocketStream<S>
impl<S> WebSocketStream<S>
Sourcepub fn split(self) -> (SplitReader<S>, SplitWriter<S>)
pub fn split(self) -> (SplitReader<S>, SplitWriter<S>)
Split the WebSocket stream into separate read and write halves
This allows TRUE concurrent reading and writing from different tasks with ZERO lock contention. The underlying TCP stream is split at the OS level for maximum performance.
§Example
let (mut reader, mut writer) = ws.split();
// Read in one task - NEVER blocks writer
tokio::spawn(async move {
while let Some(msg) = reader.next().await {
println!("Got: {:?}", msg);
}
});
// Write in another - NEVER blocks reader
writer.send(Message::Text("Hello".into())).await?;Trait Implementations§
Source§impl<S> Sink<Message> for WebSocketStream<S>
impl<S> Sink<Message> for WebSocketStream<S>
Source§fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>>
Sink to receive a value. Read moreSource§fn start_send(self: Pin<&mut Self>, item: Message) -> Result<()>
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<()>
poll_ready which returned Poll::Ready(Ok(())). Read more