# stream-tungstenite
A robust Rust WebSocket client library with automatic reconnection, customizable retry strategies, and an extensible architecture.
## Features
- **Automatic Reconnection**: Multiple retry strategies including exponential backoff
- **Connection State Management**: Real-time tracking of connection status and health
- **Extension System**: Hook into lifecycle events and message processing
- **Application-Level Handshakes**: Support for authentication and subscriptions
- **Builder Pattern API**: Fluent, type-safe configuration
- **Backpressure-Aware Sending**: Bounded send queue with non-blocking, blocking, and timeout APIs
## Architecture
```text
┌─────────────────────────────────────────────────────────────────┐
│ WebSocketClient │
│ - High-level API for WebSocket connections │
│ - Automatic reconnection with retry strategies │
│ - Message broadcasting via channels │
└─────────────────────────────────────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Handshake │ │ Extension │ │ Connection │
│ - Auth │ │ - Lifecycle │ │ - State │
│ - Subscribe │ │ - Messages │ │ - Retry │
│ - Chained │ │ - Logging │ │ - Supervisor │
└─────────────────┘ └─────────────────┘ └─────────────────┘
```
## Quick Start
### Basic Usage
```rust
use stream_tungstenite::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client with builder pattern
let client = WebSocketClient::builder("wss://echo.websocket.org")
.receive_timeout(std::time::Duration::from_secs(30))
.build();
// Subscribe to messages before running
let mut messages = client.subscribe();
// Run client in background
let client = Arc::new(client);
tokio::spawn({
let client = client.clone();
async move { client.run().await }
});
// Receive messages (Arc<Message> for zero-copy)
while let Ok(msg) = messages.recv().await {
println!("Received: {:?}", msg);
// Access message: msg.as_ref() or &*msg
// Clone if needed: (*msg).clone()
}
Ok(())
}
```
### Sending Messages
```rust
use stream_tungstenite::prelude::*;
use std::sync::Arc;
use tokio_tungstenite::tungstenite::Message;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(WebSocketClient::new("wss://echo.websocket.org"));
// Get a sender handle
if let Some(sender) = client.sender().await {
// Send text message
sender.send_text("Hello, WebSocket!")?;
// Send binary message
sender.send_binary(vec![1, 2, 3])?;
// Send ping
sender.ping(vec![])?;
}
Ok(())
}
```
### Backpressure and Timeout
- Non-blocking (default): `client.send(msg).await`
Returns immediately; if the queue is full, returns `SendError::ChannelFull`.
- Blocking: `client.send_async(msg).await`
Awaits capacity; returns `SendError::ChannelClosed` if not connected.
- Timeout: `client.send_timeout(msg, Duration::from_secs(1)).await`
Returns `SendError::Timeout(_)` if it expires.
Configure queue capacity:
```rust
let cfg = ClientConfig::new().with_send_queue_capacity(512);
```
Configure handshake retry delay:
```rust
let cfg = ClientConfig::new().with_handshake_retry_delay(std::time::Duration::from_millis(500));
```
## Configuration
### Using ClientConfig
```rust
use stream_tungstenite::prelude::*;
use std::time::Duration;
// Create custom configuration
let config = ClientConfig::new()
.with_receive_timeout(Duration::from_secs(30))
.with_exit_on_first_failure(false)
.with_nodelay(true)
.with_channel_buffer(512);
let client = WebSocketClient::builder("wss://example.com/ws")
.config(config)
.build();
```
### Configuration Presets
The library provides preset configurations for common scenarios:
#### Fast Reconnect
For low-latency scenarios requiring quick recovery:
```rust
let config = ClientConfig::fast_reconnect();
// - Receive timeout: 10 seconds
// - Connect timeout: 10 seconds
// - Nagle disabled: true
// - Channel buffer: 512
```
#### Stable Connection
For long-running, stable connections:
```rust
let config = ClientConfig::stable_connection();
// - Receive timeout: 60 seconds
// - Connect timeout: 60 seconds
// - Nagle disabled: false
// - Channel buffer: 128
```
## Retry Strategies
### Exponential Backoff (Default)
```rust
use stream_tungstenite::prelude::*;
use std::time::Duration;
let client = WebSocketClient::builder("wss://example.com/ws")
.exponential_backoff(
Duration::from_millis(100), // Initial delay
Duration::from_secs(30), // Maximum delay
2.0, // Multiplier
)
.build();
```
### Preset Strategies
```rust
// Fast reconnection - quick recovery
let strategy = ExponentialBackoff::fast();
// - 100ms initial, 5s max, 1.5x factor
// Standard - balanced approach
let strategy = ExponentialBackoff::standard();
// - 1s initial, 60s max, 2x factor
// Conservative - longer waits
let strategy = ExponentialBackoff::conservative();
// - 2s initial, 120s max, 2x factor
```
### Fixed Delay
```rust
use stream_tungstenite::prelude::*;
use std::time::Duration;
let strategy = FixedDelay::new(Duration::from_secs(5))
.with_max_attempts(10);
let client = WebSocketClient::builder("wss://example.com/ws")
.retry_strategy(strategy)
.build();
```
### No Retry
```rust
let client = WebSocketClient::builder("wss://example.com/ws")
.no_retry()
.build();
```
## Handshakes
Perform application-level handshakes after WebSocket connection is established.
### Authentication
```rust
use stream_tungstenite::prelude::*;
// Plain text token
let auth = AuthHandshaker::new("my-api-key");
// JSON format: {"type":"auth","token":"..."}
let auth = AuthHandshaker::new("my-api-key").json();
// Custom format
let auth = AuthHandshaker::new("my-api-key")
.custom_format("AUTH {}");
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(auth)
.build();
```
### Connection Timeout and Handshake Retry
```rust
use stream_tungstenite::prelude::*;
use std::time::Duration;
let cfg = ClientConfig::new()
.with_connect_timeout(Duration::from_secs(15)) // TCP/WebSocket connect timeout
.with_handshake_retry_delay(Duration::from_millis(500)); // wait before retrying failed handshakes
```
## Extension Message Processing (Processor)
Extensions can transform or filter messages; the dispatcher applies them before broadcasting:
```rust
use async_trait::async_trait;
use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use tungstenite::Message;
struct UppercaseExt;
#[async_trait]
impl Extension for UppercaseExt {
fn name(&self) -> &'static str { "upper" }
fn handles_messages(&self) -> bool { true }
// Receives &Message for zero-copy efficiency
async fn on_message(
&self,
msg: &Message,
_ctx: &ConnectionContext,
) -> Result<Option<Message>, stream_tungstenite::error::ExtensionError> {
if let Message::Text(t) = msg {
Ok(Some(Message::Text(t.to_uppercase().into())))
} else {
Ok(Some(msg.clone()))
}
}
}
```
### Channel Subscription
```rust
use stream_tungstenite::prelude::*;
let subscribe = SubscribeHandshaker::new(vec!["trades", "orderbook"])
.wait_confirmation();
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(subscribe)
.build();
```
### Chained Handshakes
```rust
use stream_tungstenite::prelude::*;
// Chain multiple handshakers
let handshaker = ChainedHandshaker::new()
.then(AuthHandshaker::new("my-api-key").json())
.then(SubscribeHandshaker::new(vec!["trades", "orderbook"]));
let client = WebSocketClient::builder("wss://example.com/ws")
.handshaker(handshaker)
.build();
// Or use the macro
let handshaker = chain_handshakers!(
AuthHandshaker::new("my-api-key").json(),
SubscribeHandshaker::new(vec!["trades"]),
);
```
## Extension System
Add custom functionality through the extension system.
### Built-in Extensions
#### Logging Extension
```rust
use stream_tungstenite::prelude::*;
// Basic logging
let logging = LoggingExtension::new();
// Verbose with message logging
let logging = LoggingExtension::verbose();
// Custom configuration
let config = LoggingConfig::new()
.with_level(LogLevel::Debug)
.with_messages()
.with_prefix("ws-client");
let logging = LoggingExtension::with_config(config);
// Register with client
client.register_extension(logging).await?;
```
#### Status Viewer
```rust
use stream_tungstenite::prelude::*;
let status = StatusViewer::new();
// Check connection status
if status.is_connected().await {
println!("Connected!");
}
// Advanced status with history
let advanced = AdvancedStatusViewer::new();
let uptime = advanced.get_uptime().await;
let connection_count = advanced.get_connection_count().await;
```
### Custom Extensions
```rust
use stream_tungstenite::extension::Extension;
use stream_tungstenite::context::ConnectionContext;
use stream_tungstenite::error::ExtensionError;
use async_trait::async_trait;
use tokio_tungstenite::tungstenite::Message;
struct MyExtension;
#[async_trait]
impl Extension for MyExtension {
fn name(&self) -> &'static str {
"my_extension"
}
fn handles_lifecycle(&self) -> bool {
true
}
fn handles_messages(&self) -> bool {
true
}
async fn on_connect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
println!("Connected! ID: {}", ctx.connection_id);
Ok(())
}
async fn on_disconnect(&self, ctx: &ConnectionContext) -> Result<(), ExtensionError> {
println!("Disconnected!");
Ok(())
}
async fn on_message(
&self,
message: &Message,
_ctx: &ConnectionContext,
) -> Result<Option<Message>, ExtensionError> {
// Return Some(message) to pass through
// Return None to filter out
// Receives &Message for zero-copy efficiency
Ok(Some(message.clone()))
}
}
```
## Connection Events
Monitor connection state changes:
```rust
use stream_tungstenite::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(WebSocketClient::new("wss://example.com/ws"));
// Subscribe to connection events
let mut events = client.subscribe_events();
tokio::spawn({
let client = client.clone();
async move { client.run().await }
});
while let Ok(event) = events.recv().await {
match event {
ConnectionEvent::Connected { id } => {
println!("Connected: {}", id);
}
ConnectionEvent::Disconnected { reason } => {
println!("Disconnected: {:?}", reason);
}
ConnectionEvent::ReconnectScheduled { delay, attempt } => {
println!("Reconnecting in {:?} (attempt {})", delay, attempt);
}
ConnectionEvent::Error { error, attempt } => {
println!("Error on attempt {}: {}", attempt, error);
}
_ => {}
}
}
Ok(())
}
```
## Connection State
Access detailed connection state:
```rust
// Get current state snapshot
let state = client.state().await;
println!("Connection ID: {}", state.id);
println!("Status: {:?}", state.status);
println!("Reconnect count: {}", state.reconnect_count);
println!("Error count: {}", state.error_count);
if let Some(duration) = state.connection_duration {
println!("Connected for: {:?}", duration);
}
// Quick connection check
if client.is_connected() {
println!("Currently connected");
}
```
## Error Handling
The library provides structured error types:
```rust
use stream_tungstenite::error::*;
// Top-level errors
match client.run().await {
Ok(()) => println!("Client shutdown gracefully"),
Err(ClientError::Connect(e)) => println!("Connection error: {}", e),
Err(ClientError::AlreadyRunning) => println!("Client already running"),
Err(e) => println!("Other error: {}", e),
}
// Connection errors include retryability info
let error = ConnectError::TcpFailed("connection refused".into());
if error.is_retryable() {
println!("Will retry connection");
}
```
## TLS Configuration
The library supports multiple TLS backends via feature flags. Choose one based on your requirements:
### Available TLS Features
```toml
[dependencies]
# Default: native-tls (system TLS, cross-platform)
stream-tungstenite = "0.6"
# Rustls with system certificates (pure Rust, recommended)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-native-roots"] }
# Rustls with embedded certificates (no system dependency)
stream-tungstenite = { version = "0.6", default-features = false, features = ["rustls-tls-webpki-roots"] }
# Native TLS with vendored dependencies (for static linking)
stream-tungstenite = { version = "0.6", default-features = false, features = ["native-tls-vendored"] }
```
### Custom TLS Configuration
#### Complete Control with TLS Connector
```rust
use stream_tungstenite::prelude::*;
use tokio_tungstenite::Connector;
// Native TLS example
#[cfg(feature = "native-tls")]
{
use native_tls::TlsConnector as NativeTlsConnector;
let tls = NativeTlsConnector::builder()
.min_protocol_version(Some(native_tls::Protocol::Tlsv12))
.build()?;
let connector = DefaultConnector::new()
.with_tls_connector(Connector::NativeTls(tls));
let client = WebSocketClient::builder("wss://example.com/ws")
.connector(connector)
.build();
}
// Rustls example
#[cfg(feature = "__rustls-tls")]
{
use rustls::ClientConfig;
use std::sync::Arc;
let mut config = ClientConfig::builder()
.with_root_certificates(/* your certs */)
.with_no_client_auth();
let connector = DefaultConnector::new()
.with_tls_connector(Connector::Rustls(Arc::new(config)));
}
```
#### Disable Certificate Verification (Testing Only!)
**Security Warning**: Never use in production!
```rust
use stream_tungstenite::prelude::*;
// For testing with self-signed certificates
let connector = DefaultConnector::new()
.danger_accept_invalid_certs()?;
let client = WebSocketClient::builder("wss://localhost:8080")
.connector(connector)
.build();
```
#### Custom CA Certificates
```rust
use stream_tungstenite::prelude::*;
#[cfg(feature = "native-tls")]
{
use native_tls::Certificate;
use std::fs;
// Load custom CA certificate
let ca_cert = fs::read("internal-ca.pem")?;
let cert = Certificate::from_pem(&ca_cert)?;
let connector = DefaultConnector::new()
.with_custom_ca_cert(cert)?;
let client = WebSocketClient::builder("wss://internal.example.com/ws")
.connector(connector)
.build();
}
```
#### Client Certificate Authentication (mTLS)
```rust
use stream_tungstenite::prelude::*;
#[cfg(feature = "native-tls")]
{
use native_tls::Identity;
use std::fs;
// Load client certificate (PKCS#12 format)
let pkcs12 = fs::read("client-cert.p12")?;
let identity = Identity::from_pkcs12(&pkcs12, "password")?;
let connector = DefaultConnector::new()
.with_client_identity(identity)?;
let client = WebSocketClient::builder("wss://api.example.com/ws")
.connector(connector)
.build();
}
```
### TLS Feature Comparison
| **Backend** | System (OpenSSL/Security.framework/SChannel) | Pure Rust | Pure Rust |
| **System Certs** | ✅ Yes | ✅ Yes | ❌ No (embedded) |
| **Binary Size** | Smaller | Medium | Medium |
| **Cross-compile** | Harder | Easier | Easier |
| **FIPS** | Depends on system | ❌ No | ❌ No |
| **Performance** | Good | Excellent | Excellent |
**Recommendation**: Use `rustls-tls-native-roots` for most projects (better security, pure Rust). Use `native-tls` if you need system certificate stores or FIPS compliance.
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
stream-tungstenite = "0.6"
tokio = { version = "1.0", features = ["full"] }
```
## License
This project is licensed under the Apache License Version 2.0. See the LICENSE file for details.
## Contributing
Contributions are welcome! Please feel free to submit Pull Requests or create Issues.