WsClient

Struct WsClient 

Source
pub struct WsClient { /* private fields */ }
Expand description

Async WebSocket client for exchange streaming APIs.

Implementations§

Source§

impl WsClient

Source

pub fn new(config: WsConfig) -> WsClient

Creates a new WebSocket client instance.

§Arguments
  • config - WebSocket connection configuration
§Returns

A new WsClient instance ready to connect

Source

pub async fn set_event_callback( &self, callback: Arc<dyn Fn(WsEvent) + Sync + Send>, )

Sets the event callback for connection lifecycle events.

The callback will be invoked for events like Shutdown, Connected, etc. This allows the application to react to connection state changes.

§Arguments
  • callback - The event callback function
§Example
use ccxt_core::ws_client::{WsClient, WsConfig, WsEvent};
use std::sync::Arc;

let client = WsClient::new(WsConfig::default());

client.set_event_callback(Arc::new(|event| {
    match event {
        WsEvent::Shutdown => println!("Client shutdown"),
        WsEvent::Connected => println!("Client connected"),
        _ => {}
    }
})).await;
Source

pub async fn clear_event_callback(&self)

Clears the event callback.

After calling this method, no events will be emitted to the callback.

Source

pub async fn set_cancel_token(&self, token: CancellationToken)

Sets the cancellation token for this client.

The cancellation token can be used to cancel long-running operations like connect, reconnect, and subscribe. When the token is cancelled, these operations will return Error::Cancelled.

§Arguments
  • token - The cancellation token to use
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;

let client = WsClient::new(WsConfig::default());
let token = CancellationToken::new();

// Set the cancellation token
client.set_cancel_token(token.clone()).await;

// Later, cancel all operations
token.cancel();
Source

pub async fn clear_cancel_token(&self)

Clears the cancellation token.

After calling this method, operations will no longer be cancellable via the previously set token.

Source

pub async fn get_cancel_token(&self) -> Option<CancellationToken>

Returns a clone of the current cancellation token, if set.

This is useful for sharing the token with other components or for checking if a token is currently set.

§Returns

Some(CancellationToken) if a token is set, None otherwise.

Source

pub async fn connect(&self) -> Result<(), Error>

Establishes connection to the WebSocket server.

Returns immediately if already connected. Automatically starts message processing loop and resubscribes to previous channels on success.

§Errors

Returns error if:

  • Connection timeout exceeded
  • Network error occurs
  • Server rejects connection
Source

pub async fn connect_with_cancel( &self, cancel_token: Option<CancellationToken>, ) -> Result<(), Error>

Establishes connection to the WebSocket server with cancellation support.

This method is similar to connect, but accepts an optional CancellationToken that can be used to cancel the connection attempt.

If no token is provided, the method will use the client’s internal token (if set via set_cancel_token).

§Arguments
  • cancel_token - Optional cancellation token. If None, uses the client’s internal token (if set).
§Errors

Returns error if:

  • Connection timeout exceeded
  • Network error occurs
  • Server rejects connection
  • Operation was cancelled via the cancellation token
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;

let client = WsClient::new(WsConfig {
    url: "wss://stream.example.com/ws".to_string(),
    ..Default::default()
});

let token = CancellationToken::new();
let token_clone = token.clone();

// Spawn a task to cancel after 5 seconds
tokio::spawn(async move {
    tokio::time::sleep(Duration::from_secs(5)).await;
    token_clone.cancel();
});

// Connect with cancellation support
match client.connect_with_cancel(Some(token)).await {
    Ok(()) => println!("Connected!"),
    Err(e) if e.as_cancelled().is_some() => println!("Connection cancelled"),
    Err(e) => println!("Connection failed: {}", e),
}
Source

pub async fn disconnect(&self) -> Result<(), Error>

Closes the WebSocket connection gracefully.

Sends shutdown signal to background tasks and clears internal state.

Source

pub async fn shutdown(&self)

Gracefully shuts down the WebSocket client.

This method performs a complete shutdown of the WebSocket client:

  1. Cancels all pending reconnection attempts
  2. Sends WebSocket close frame to the server
  3. Waits for pending operations to complete (with timeout)
  4. Clears all resources (subscriptions, channels, etc.)
  5. Emits a Shutdown event
§Behavior
  • If a cancellation token is set, it will be cancelled to stop any in-progress reconnection attempts.
  • The method waits up to shutdown_timeout milliseconds for pending operations to complete before proceeding with cleanup.
  • All subscriptions are cleared during shutdown.
  • The connection state is set to Disconnected.
§Errors

This method does not return errors. All cleanup operations are performed on a best-effort basis.

§Example
use ccxt_core::ws_client::{WsClient, WsConfig, WsEvent};
use std::sync::Arc;

let client = WsClient::new(WsConfig {
    url: "wss://stream.example.com/ws".to_string(),
    shutdown_timeout: 5000, // 5 seconds
    ..Default::default()
});

// Set up event callback to know when shutdown completes
client.set_event_callback(Arc::new(|event| {
    if let WsEvent::Shutdown = event {
        println!("Shutdown completed");
    }
})).await;

// Connect and do work...
client.connect().await?;

// Gracefully shutdown
client.shutdown().await;
Source

pub async fn reconnect(&self) -> Result<(), Error>

Attempts to reconnect to the WebSocket server.

Respects max_reconnect_attempts configuration and waits for reconnect_interval before attempting connection.

§Errors

Returns error if maximum reconnection attempts exceeded or connection fails.

Source

pub async fn reconnect_with_cancel( &self, cancel_token: Option<CancellationToken>, ) -> Result<(), Error>

Attempts to reconnect to the WebSocket server with cancellation support.

This method uses exponential backoff for retry delays and classifies errors to determine if reconnection should continue. It supports cancellation via an optional CancellationToken.

§Arguments
  • cancel_token - Optional cancellation token. If None, uses the client’s internal token (if set via set_cancel_token).
§Behavior
  1. Calculates retry delay using exponential backoff strategy
  2. Waits for the calculated delay (can be cancelled)
  3. Attempts to connect (can be cancelled)
  4. On success: resets reconnect counter and returns Ok(())
  5. On transient error: continues retry loop
  6. On permanent error: stops retrying and returns error
  7. On max attempts reached: returns error
§Errors

Returns error if:

  • Maximum reconnection attempts exceeded
  • Permanent error occurs (authentication failure, protocol error)
  • Operation was cancelled via the cancellation token
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};
use tokio_util::sync::CancellationToken;

let client = WsClient::new(WsConfig {
    url: "wss://stream.example.com/ws".to_string(),
    max_reconnect_attempts: 5,
    ..Default::default()
});

let token = CancellationToken::new();

// Reconnect with cancellation support
match client.reconnect_with_cancel(Some(token)).await {
    Ok(()) => println!("Reconnected!"),
    Err(e) if e.as_cancelled().is_some() => println!("Reconnection cancelled"),
    Err(e) => println!("Reconnection failed: {}", e),
}
Source

pub fn reconnect_count(&self) -> u32

Returns the current reconnection attempt count (lock-free).

Source

pub fn reset_reconnect_count(&self)

Resets the reconnection attempt counter to zero (lock-free).

Source

pub fn stats(&self) -> WsStatsSnapshot

Returns a snapshot of connection statistics.

This method is lock-free and can be called from any context without blocking or risking deadlocks.

§Returns

A WsStatsSnapshot containing the current values of all statistics.

Source

pub fn reset_stats(&self)

Resets all connection statistics to default values.

This method is lock-free and can be called from any context.

Source

pub fn latency(&self) -> Option<i64>

Calculates current connection latency in milliseconds.

This method is lock-free and can be called from any context.

§Returns

Time difference between last pong and ping, or None if no data available.

Source

pub fn create_auto_reconnect_coordinator( self: Arc<WsClient>, ) -> AutoReconnectCoordinator

Creates an automatic reconnection coordinator.

§Returns

A new AutoReconnectCoordinator instance for managing reconnection logic.

Source

pub async fn subscribe( &self, channel: String, symbol: Option<String>, params: Option<HashMap<String, Value>>, ) -> Result<(), Error>

Subscribes to a WebSocket channel.

Subscription is persisted and automatically reestablished on reconnection. The subscription count is limited by max_subscriptions in WsConfig.

§Arguments
  • channel - Channel name to subscribe to
  • symbol - Optional trading pair symbol
  • params - Optional additional subscription parameters
§Errors

Returns error if:

  • Maximum subscription limit is reached (Error::ResourceExhausted)
  • Subscription message fails to send
§Example
use ccxt_core::ws_client::{WsClient, WsConfig};

let client = WsClient::new(WsConfig {
    url: "wss://stream.example.com/ws".to_string(),
    max_subscriptions: 50,
    ..Default::default()
});

// Subscribe to a channel
client.subscribe("ticker".to_string(), Some("BTC/USDT".to_string()), None).await?;

// Check remaining capacity
println!("Remaining capacity: {}", client.remaining_capacity());
Source

pub async fn unsubscribe( &self, channel: String, symbol: Option<String>, ) -> Result<(), Error>

Unsubscribes from a WebSocket channel.

Removes subscription from internal state and sends unsubscribe message if connected. The subscription slot is immediately freed for new subscriptions (Requirements 4.5).

§Arguments
  • channel - Channel name to unsubscribe from
  • symbol - Optional trading pair symbol
§Errors

Returns error if unsubscribe message fails to send.

Source

pub async fn receive(&self) -> Option<Value>

Receives the next available message from the WebSocket stream.

§Returns

The received JSON message, or None if the channel is closed.

Source

pub fn state(&self) -> WsConnectionState

Returns the current connection state (lock-free).

Source

pub fn config(&self) -> &WsConfig

Returns a reference to the WebSocket configuration.

§Example
use ccxt_core::ws_client::{WsClient, WsConfig};

let config = WsConfig {
    url: "wss://stream.example.com/ws".to_string(),
    max_reconnect_attempts: 10,
    ..Default::default()
};
let client = WsClient::new(config);

assert_eq!(client.config().max_reconnect_attempts, 10);
Source

pub fn set_state(&self, state: WsConnectionState)

Sets the connection state (lock-free).

This method is used internally and by the AutoReconnectCoordinator to update the connection state.

§Arguments
  • state - The new connection state
Source

pub fn is_connected(&self) -> bool

Checks whether the WebSocket is currently connected (lock-free).

Source

pub fn is_subscribed(&self, channel: &str, symbol: Option<&String>) -> bool

Checks if subscribed to a specific channel (lock-free).

§Arguments
  • channel - Channel name to check
  • symbol - Optional trading pair symbol
§Returns

true if subscribed to the channel, false otherwise.

Source

pub fn subscription_count(&self) -> usize

Returns the number of active subscriptions (lock-free).

This method delegates to the internal SubscriptionManager to get the current subscription count.

§Example
use ccxt_core::ws_client::{WsClient, WsConfig};

let client = WsClient::new(WsConfig::default());
assert_eq!(client.subscription_count(), 0);
Source

pub fn remaining_capacity(&self) -> usize

Returns the remaining capacity for new subscriptions (lock-free).

This is calculated as max_subscriptions - current_count. Use this method to check if there’s room for more subscriptions before attempting to subscribe.

§Example
use ccxt_core::ws_client::{WsClient, WsConfig};

let client = WsClient::new(WsConfig {
    max_subscriptions: 50,
    ..Default::default()
});
assert_eq!(client.remaining_capacity(), 50);
Source

pub async fn send(&self, message: Message) -> Result<(), Error>

Sends a raw WebSocket message.

§Arguments
  • message - WebSocket message to send
§Errors

Returns error if not connected or message transmission fails.

Source

pub async fn send_text(&self, text: String) -> Result<(), Error>

Sends a text message over the WebSocket connection.

§Arguments
  • text - Text content to send
§Errors

Returns error if not connected or transmission fails.

Source

pub async fn send_json(&self, json: &Value) -> Result<(), Error>

Sends a JSON-encoded message over the WebSocket connection.

§Arguments
  • json - JSON value to serialize and send
§Errors

Returns error if serialization fails, not connected, or transmission fails.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more