pub struct WsClient { /* private fields */ }Expand description
Async WebSocket client for exchange streaming APIs.
Implementations§
Source§impl WsClient
impl WsClient
Sourcepub async fn set_event_callback(
&self,
callback: Arc<dyn Fn(WsEvent) + Sync + Send>,
)
pub async fn set_event_callback( &self, callback: Arc<dyn Fn(WsEvent) + Sync + Send>, )
Sets the event callback for connection lifecycle events.
The callback will be invoked for events like Shutdown, Connected, etc.
This allows the application to react to connection state changes.
§Arguments
callback- The event callback function
§Example
use ccxt_core::ws_client::{WsClient, WsConfig, WsEvent};
use std::sync::Arc;
let client = WsClient::new(WsConfig::default());
client.set_event_callback(Arc::new(|event| {
match event {
WsEvent::Shutdown => println!("Client shutdown"),
WsEvent::Connected => println!("Client connected"),
_ => {}
}
})).await;Sourcepub async fn clear_event_callback(&self)
pub async fn clear_event_callback(&self)
Clears the event callback.
After calling this method, no events will be emitted to the callback.
Sourcepub async fn set_cancel_token(&self, token: CancellationToken)
pub async fn set_cancel_token(&self, token: CancellationToken)
Sets the cancellation token for this client.
The cancellation token can be used to cancel long-running operations
like connect, reconnect, and subscribe. When the token is cancelled,
these operations will return Error::Cancelled.
§Arguments
token- The cancellation token to use
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;
let client = WsClient::new(WsConfig::default());
let token = CancellationToken::new();
// Set the cancellation token
client.set_cancel_token(token.clone()).await;
// Later, cancel all operations
token.cancel();Sourcepub async fn clear_cancel_token(&self)
pub async fn clear_cancel_token(&self)
Clears the cancellation token.
After calling this method, operations will no longer be cancellable via the previously set token.
Sourcepub async fn get_cancel_token(&self) -> Option<CancellationToken>
pub async fn get_cancel_token(&self) -> Option<CancellationToken>
Returns a clone of the current cancellation token, if set.
This is useful for sharing the token with other components or for checking if a token is currently set.
§Returns
Some(CancellationToken) if a token is set, None otherwise.
Sourcepub async fn connect(&self) -> Result<(), Error>
pub async fn connect(&self) -> Result<(), Error>
Establishes connection to the WebSocket server.
Returns immediately if already connected. Automatically starts message processing loop and resubscribes to previous channels on success.
§Errors
Returns error if:
- Connection timeout exceeded
- Network error occurs
- Server rejects connection
Sourcepub async fn connect_with_cancel(
&self,
cancel_token: Option<CancellationToken>,
) -> Result<(), Error>
pub async fn connect_with_cancel( &self, cancel_token: Option<CancellationToken>, ) -> Result<(), Error>
Establishes connection to the WebSocket server with cancellation support.
This method is similar to connect, but accepts an optional
CancellationToken that can be used to cancel the connection attempt.
If no token is provided, the method will use the client’s internal token
(if set via set_cancel_token).
§Arguments
cancel_token- Optional cancellation token. IfNone, uses the client’s internal token (if set).
§Errors
Returns error if:
- Connection timeout exceeded
- Network error occurs
- Server rejects connection
- Operation was cancelled via the cancellation token
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;
let client = WsClient::new(WsConfig {
url: "wss://stream.example.com/ws".to_string(),
..Default::default()
});
let token = CancellationToken::new();
let token_clone = token.clone();
// Spawn a task to cancel after 5 seconds
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
token_clone.cancel();
});
// Connect with cancellation support
match client.connect_with_cancel(Some(token)).await {
Ok(()) => println!("Connected!"),
Err(e) if e.as_cancelled().is_some() => println!("Connection cancelled"),
Err(e) => println!("Connection failed: {}", e),
}Sourcepub async fn disconnect(&self) -> Result<(), Error>
pub async fn disconnect(&self) -> Result<(), Error>
Closes the WebSocket connection gracefully.
Sends shutdown signal to background tasks and clears internal state.
Sourcepub async fn shutdown(&self)
pub async fn shutdown(&self)
Gracefully shuts down the WebSocket client.
This method performs a complete shutdown of the WebSocket client:
- Cancels all pending reconnection attempts
- Sends WebSocket close frame to the server
- Waits for pending operations to complete (with timeout)
- Clears all resources (subscriptions, channels, etc.)
- Emits a
Shutdownevent
§Behavior
- If a cancellation token is set, it will be cancelled to stop any in-progress reconnection attempts.
- The method waits up to
shutdown_timeoutmilliseconds for pending operations to complete before proceeding with cleanup. - All subscriptions are cleared during shutdown.
- The connection state is set to
Disconnected.
§Errors
This method does not return errors. All cleanup operations are performed on a best-effort basis.
§Example
use ccxt_core::ws_client::{WsClient, WsConfig, WsEvent};
use std::sync::Arc;
let client = WsClient::new(WsConfig {
url: "wss://stream.example.com/ws".to_string(),
shutdown_timeout: 5000, // 5 seconds
..Default::default()
});
// Set up event callback to know when shutdown completes
client.set_event_callback(Arc::new(|event| {
if let WsEvent::Shutdown = event {
println!("Shutdown completed");
}
})).await;
// Connect and do work...
client.connect().await?;
// Gracefully shutdown
client.shutdown().await;Sourcepub async fn reconnect(&self) -> Result<(), Error>
pub async fn reconnect(&self) -> Result<(), Error>
Attempts to reconnect to the WebSocket server.
Respects max_reconnect_attempts configuration and waits for
reconnect_interval before attempting connection.
§Errors
Returns error if maximum reconnection attempts exceeded or connection fails.
Sourcepub async fn reconnect_with_cancel(
&self,
cancel_token: Option<CancellationToken>,
) -> Result<(), Error>
pub async fn reconnect_with_cancel( &self, cancel_token: Option<CancellationToken>, ) -> Result<(), Error>
Attempts to reconnect to the WebSocket server with cancellation support.
This method uses exponential backoff for retry delays and classifies errors
to determine if reconnection should continue. It supports cancellation via
an optional CancellationToken.
§Arguments
cancel_token- Optional cancellation token. IfNone, uses the client’s internal token (if set viaset_cancel_token).
§Behavior
- Calculates retry delay using exponential backoff strategy
- Waits for the calculated delay (can be cancelled)
- Attempts to connect (can be cancelled)
- On success: resets reconnect counter and returns
Ok(()) - On transient error: continues retry loop
- On permanent error: stops retrying and returns error
- On max attempts reached: returns error
§Errors
Returns error if:
- Maximum reconnection attempts exceeded
- Permanent error occurs (authentication failure, protocol error)
- Operation was cancelled via the cancellation token
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;
let client = WsClient::new(WsConfig {
url: "wss://stream.example.com/ws".to_string(),
max_reconnect_attempts: 5,
..Default::default()
});
let token = CancellationToken::new();
// Reconnect with cancellation support
match client.reconnect_with_cancel(Some(token)).await {
Ok(()) => println!("Reconnected!"),
Err(e) if e.as_cancelled().is_some() => println!("Reconnection cancelled"),
Err(e) => println!("Reconnection failed: {}", e),
}Sourcepub fn reconnect_count(&self) -> u32
pub fn reconnect_count(&self) -> u32
Returns the current reconnection attempt count (lock-free).
Sourcepub fn reset_reconnect_count(&self)
pub fn reset_reconnect_count(&self)
Resets the reconnection attempt counter to zero (lock-free).
Sourcepub fn stats(&self) -> WsStatsSnapshot
pub fn stats(&self) -> WsStatsSnapshot
Returns a snapshot of connection statistics.
This method is lock-free and can be called from any context without blocking or risking deadlocks.
§Returns
A WsStatsSnapshot containing the current values of all statistics.
Sourcepub fn reset_stats(&self)
pub fn reset_stats(&self)
Resets all connection statistics to default values.
This method is lock-free and can be called from any context.
Sourcepub fn latency(&self) -> Option<i64>
pub fn latency(&self) -> Option<i64>
Calculates current connection latency in milliseconds.
This method is lock-free and can be called from any context.
§Returns
Time difference between last pong and ping, or None if no data available.
Sourcepub fn create_auto_reconnect_coordinator(
self: Arc<WsClient>,
) -> AutoReconnectCoordinator
pub fn create_auto_reconnect_coordinator( self: Arc<WsClient>, ) -> AutoReconnectCoordinator
Creates an automatic reconnection coordinator.
§Returns
A new AutoReconnectCoordinator instance for managing reconnection logic.
Sourcepub async fn subscribe(
&self,
channel: String,
symbol: Option<String>,
params: Option<HashMap<String, Value>>,
) -> Result<(), Error>
pub async fn subscribe( &self, channel: String, symbol: Option<String>, params: Option<HashMap<String, Value>>, ) -> Result<(), Error>
Subscribes to a WebSocket channel.
Subscription is persisted and automatically reestablished on reconnection.
The subscription count is limited by max_subscriptions in WsConfig.
§Arguments
channel- Channel name to subscribe tosymbol- Optional trading pair symbolparams- Optional additional subscription parameters
§Errors
Returns error if:
- Maximum subscription limit is reached (
Error::ResourceExhausted) - Subscription message fails to send
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
let client = WsClient::new(WsConfig {
url: "wss://stream.example.com/ws".to_string(),
max_subscriptions: 50,
..Default::default()
});
// Subscribe to a channel
client.subscribe("ticker".to_string(), Some("BTC/USDT".to_string()), None).await?;
// Check remaining capacity
println!("Remaining capacity: {}", client.remaining_capacity());Sourcepub async fn unsubscribe(
&self,
channel: String,
symbol: Option<String>,
) -> Result<(), Error>
pub async fn unsubscribe( &self, channel: String, symbol: Option<String>, ) -> Result<(), Error>
Unsubscribes from a WebSocket channel.
Removes subscription from internal state and sends unsubscribe message if connected. The subscription slot is immediately freed for new subscriptions (Requirements 4.5).
§Arguments
channel- Channel name to unsubscribe fromsymbol- Optional trading pair symbol
§Errors
Returns error if unsubscribe message fails to send.
Sourcepub async fn receive(&self) -> Option<Value>
pub async fn receive(&self) -> Option<Value>
Receives the next available message from the WebSocket stream.
§Returns
The received JSON message, or None if the channel is closed.
Sourcepub fn state(&self) -> WsConnectionState
pub fn state(&self) -> WsConnectionState
Returns the current connection state (lock-free).
Sourcepub fn config(&self) -> &WsConfig
pub fn config(&self) -> &WsConfig
Returns a reference to the WebSocket configuration.
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
let config = WsConfig {
url: "wss://stream.example.com/ws".to_string(),
max_reconnect_attempts: 10,
..Default::default()
};
let client = WsClient::new(config);
assert_eq!(client.config().max_reconnect_attempts, 10);Sourcepub fn set_state(&self, state: WsConnectionState)
pub fn set_state(&self, state: WsConnectionState)
Sets the connection state (lock-free).
This method is used internally and by the AutoReconnectCoordinator
to update the connection state.
§Arguments
state- The new connection state
Sourcepub fn is_connected(&self) -> bool
pub fn is_connected(&self) -> bool
Checks whether the WebSocket is currently connected (lock-free).
Sourcepub fn subscription_count(&self) -> usize
pub fn subscription_count(&self) -> usize
Returns the number of active subscriptions (lock-free).
This method delegates to the internal SubscriptionManager to get
the current subscription count.
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
let client = WsClient::new(WsConfig::default());
assert_eq!(client.subscription_count(), 0);Sourcepub fn remaining_capacity(&self) -> usize
pub fn remaining_capacity(&self) -> usize
Returns the remaining capacity for new subscriptions (lock-free).
This is calculated as max_subscriptions - current_count.
Use this method to check if there’s room for more subscriptions
before attempting to subscribe.
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
let client = WsClient::new(WsConfig {
max_subscriptions: 50,
..Default::default()
});
assert_eq!(client.remaining_capacity(), 50);