ccxt_core/
ws_exchange.rs

1//! # WebSocket Exchange Trait
2//!
3//! This module defines the [`WsExchange`] trait for real-time data streaming
4//! via WebSocket connections, enabling live market data and account updates.
5//!
6//! ## Overview
7//!
8//! The `WsExchange` trait complements the REST-based [`Exchange`](crate::exchange::Exchange)
9//! trait by providing real-time streaming capabilities:
10//!
11//! - **Connection Management**: Connect, disconnect, and monitor WebSocket state
12//! - **Public Data Streams**: Real-time ticker, order book, trades, and OHLCV updates
13//! - **Private Data Streams**: Live balance, order, and trade updates (requires authentication)
14//! - **Subscription Management**: Subscribe/unsubscribe to specific channels
15//!
16//! ## Architecture
17//!
18//! ```text
19//! ┌─────────────────────────────────────────────────────────────┐
20//! │                     WsExchange Trait                        │
21//! ├─────────────────────────────────────────────────────────────┤
22//! │  Connection Management                                      │
23//! │  ├── ws_connect(), ws_disconnect()                          │
24//! │  └── ws_is_connected(), ws_state()                          │
25//! ├─────────────────────────────────────────────────────────────┤
26//! │  Public Data Streams                                        │
27//! │  ├── watch_ticker(), watch_tickers()                        │
28//! │  ├── watch_order_book(), watch_trades()                     │
29//! │  └── watch_ohlcv()                                          │
30//! ├─────────────────────────────────────────────────────────────┤
31//! │  Private Data Streams (Authenticated)                       │
32//! │  ├── watch_balance()                                        │
33//! │  ├── watch_orders()                                         │
34//! │  └── watch_my_trades()                                      │
35//! ├─────────────────────────────────────────────────────────────┤
36//! │  Subscription Management                                    │
37//! │  └── subscribe(), unsubscribe(), subscriptions()            │
38//! └─────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Key Types
42//!
43//! - [`WsExchange`]: The WebSocket streaming trait
44//! - [`MessageStream<T>`]: A pinned, boxed async stream yielding `Result<T>` items
45//! - [`FullExchange`]: Combined trait for exchanges supporting both REST and WebSocket
46//!
47//! ## Usage Examples
48//!
49//! ### Watching Real-Time Ticker Updates
50//!
51//! ```rust,no_run
52//! use ccxt_core::ws_exchange::WsExchange;
53//! use futures::StreamExt;
54//!
55//! async fn watch_ticker(exchange: &dyn WsExchange, symbol: &str) {
56//!     // Connect to WebSocket
57//!     exchange.ws_connect().await.unwrap();
58//!     
59//!     // Watch ticker updates
60//!     let mut stream = exchange.watch_ticker(symbol).await.unwrap();
61//!     
62//!     while let Some(ticker) = stream.next().await {
63//!         match ticker {
64//!             Ok(t) => println!("Price: {:?}", t.last),
65//!             Err(e) => eprintln!("Error: {}", e),
66//!         }
67//!     }
68//! }
69//! ```
70//!
71//! ### Watching Order Book Depth
72//!
73//! ```rust,no_run
74//! use ccxt_core::ws_exchange::WsExchange;
75//! use futures::StreamExt;
76//!
77//! async fn watch_orderbook(exchange: &dyn WsExchange, symbol: &str) {
78//!     exchange.ws_connect().await.unwrap();
79//!     
80//!     let mut stream = exchange.watch_order_book(symbol, Some(10)).await.unwrap();
81//!     
82//!     while let Some(result) = stream.next().await {
83//!         if let Ok(orderbook) = result {
84//!             if let (Some(best_bid), Some(best_ask)) = (
85//!                 orderbook.bids.first(),
86//!                 orderbook.asks.first()
87//!             ) {
88//!                 println!("Spread: {} - {}", best_bid.price, best_ask.price);
89//!             }
90//!         }
91//!     }
92//! }
93//! ```
94//!
95//! ### Monitoring Connection State
96//!
97//! ```rust,no_run
98//! use ccxt_core::ws_exchange::WsExchange;
99//! use ccxt_core::ws_client::WsConnectionState;
100//!
101//! async fn ensure_connected(exchange: &dyn WsExchange) -> ccxt_core::Result<()> {
102//!     if !exchange.ws_is_connected() {
103//!         println!("Connecting to WebSocket...");
104//!         exchange.ws_connect().await?;
105//!     }
106//!     
107//!     match exchange.ws_state() {
108//!         WsConnectionState::Connected => println!("Connected!"),
109//!         WsConnectionState::Connecting => println!("Still connecting..."),
110//!         WsConnectionState::Disconnected => println!("Disconnected"),
111//!         WsConnectionState::Reconnecting => println!("Reconnecting..."),
112//!         WsConnectionState::Error => println!("Error state"),
113//!     }
114//!     
115//!     Ok(())
116//! }
117//! ```
118//!
119//! ### Using FullExchange for REST + WebSocket
120//!
121//! ```rust,no_run
122//! use ccxt_core::ws_exchange::FullExchange;
123//! use futures::StreamExt;
124//!
125//! async fn hybrid_trading(exchange: &dyn FullExchange, symbol: &str) {
126//!     // Use REST API to get initial state
127//!     let ticker = exchange.fetch_ticker(symbol).await.unwrap();
128//!     println!("Initial price: {:?}", ticker.last);
129//!     
130//!     // Switch to WebSocket for real-time updates
131//!     exchange.ws_connect().await.unwrap();
132//!     let mut stream = exchange.watch_ticker(symbol).await.unwrap();
133//!     
134//!     while let Some(Ok(update)) = stream.next().await {
135//!         println!("Live price: {:?}", update.last);
136//!     }
137//! }
138//! ```
139//!
140//! ### Watching Private Account Updates
141//!
142//! ```rust,no_run
143//! use ccxt_core::ws_exchange::WsExchange;
144//! use futures::StreamExt;
145//!
146//! async fn watch_account(exchange: &dyn WsExchange) {
147//!     exchange.ws_connect().await.unwrap();
148//!     
149//!     // Watch balance updates (requires authentication)
150//!     let mut balance_stream = exchange.watch_balance().await.unwrap();
151//!     
152//!     // Watch order updates
153//!     let mut order_stream = exchange.watch_orders(None).await.unwrap();
154//!     
155//!     tokio::select! {
156//!         Some(Ok(balance)) = balance_stream.next() => {
157//!             println!("Balance update: {:?}", balance);
158//!         }
159//!         Some(Ok(order)) = order_stream.next() => {
160//!             println!("Order update: {} - {:?}", order.id, order.status);
161//!         }
162//!     }
163//! }
164//! ```
165//!
166//! ## MessageStream Type
167//!
168//! The [`MessageStream<T>`] type alias represents an async stream of results:
169//!
170//! ```rust,ignore
171//! pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;
172//! ```
173//!
174//! This type is:
175//! - **Pinned**: Required for async iteration
176//! - **Boxed**: Allows for dynamic dispatch and type erasure
177//! - **Send**: Can be sent across thread boundaries
178//! - **Yields Results**: Each item is `Result<T>` to handle stream errors
179//!
180//! ## Connection States
181//!
182//! WebSocket connections can be in one of several states:
183//!
184//! - `Disconnected`: Not connected to the server
185//! - `Connecting`: Connection in progress
186//! - `Connected`: Active connection ready for use
187//! - `Reconnecting`: Automatic reconnection in progress
188//!
189//! ## Error Handling
190//!
191//! WebSocket operations can fail with:
192//!
193//! - `WebSocket`: Connection or protocol errors
194//! - `Authentication`: Invalid credentials for private streams
195//! - `Network`: Network connectivity issues
196//! - `Timeout`: Connection or subscription timeout
197//!
198//! ## Thread Safety
199//!
200//! Like the `Exchange` trait, `WsExchange` requires `Send + Sync` bounds,
201//! ensuring compatibility with async runtimes and multi-threaded applications.
202//!
203//! ## See Also
204//!
205//! - [`crate::exchange::Exchange`]: REST API trait
206//! - [`crate::ws_client::WsClient`]: Low-level WebSocket client
207//! - [`crate::ws_client::WsConnectionState`]: Connection state enum
208
209use async_trait::async_trait;
210use futures::Stream;
211use std::pin::Pin;
212
213use crate::error::Result;
214use crate::exchange::Exchange;
215use crate::types::*;
216use crate::ws_client::WsConnectionState;
217
218// ============================================================================
219// Type Aliases
220// ============================================================================
221
222/// WebSocket message stream type
223///
224/// A pinned, boxed stream that yields `Result<T>` items.
225/// This type is used for all WebSocket data streams.
226pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;
227
228// ============================================================================
229// WsExchange Trait
230// ============================================================================
231
232/// WebSocket exchange trait for real-time data streaming
233///
234/// This trait defines the WebSocket API that exchanges can implement
235/// for real-time market data and account updates.
236///
237/// # Thread Safety
238///
239/// All implementations must be `Send + Sync` to allow safe usage across
240/// thread boundaries.
241///
242/// # Example
243///
244/// ```rust,no_run
245/// use ccxt_core::ws_exchange::WsExchange;
246///
247/// async fn check_connection(exchange: &dyn WsExchange) {
248///     if !exchange.ws_is_connected() {
249///         exchange.ws_connect().await.unwrap();
250///     }
251///     println!("Connection state: {:?}", exchange.ws_state());
252/// }
253/// ```
254#[async_trait]
255pub trait WsExchange: Send + Sync {
256    // ==================== Connection Management ====================
257
258    /// Connect to the WebSocket server
259    ///
260    /// Establishes a WebSocket connection to the exchange.
261    /// If already connected, this may be a no-op or reconnect.
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the connection fails.
266    async fn ws_connect(&self) -> Result<()>;
267
268    /// Disconnect from the WebSocket server
269    ///
270    /// Closes the WebSocket connection gracefully.
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if disconnection fails.
275    async fn ws_disconnect(&self) -> Result<()>;
276
277    /// Check if WebSocket is connected
278    ///
279    /// # Returns
280    ///
281    /// True if the WebSocket connection is active.
282    fn ws_is_connected(&self) -> bool;
283
284    /// Get WebSocket connection state
285    ///
286    /// # Returns
287    ///
288    /// The current connection state.
289    fn ws_state(&self) -> WsConnectionState;
290
291    // ==================== Public Data Streams ====================
292
293    /// Watch ticker updates for a symbol
294    ///
295    /// # Arguments
296    ///
297    /// * `symbol` - Trading pair symbol (e.g., "BTC/USDT")
298    ///
299    /// # Returns
300    ///
301    /// A stream of ticker updates.
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if subscription fails.
306    async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>>;
307
308    /// Watch tickers for multiple symbols
309    ///
310    /// # Arguments
311    ///
312    /// * `symbols` - List of trading pair symbols
313    ///
314    /// # Returns
315    ///
316    /// A stream of ticker vectors.
317    async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>>;
318
319    /// Watch order book updates
320    ///
321    /// # Arguments
322    ///
323    /// * `symbol` - Trading pair symbol
324    /// * `limit` - Optional depth limit
325    ///
326    /// # Returns
327    ///
328    /// A stream of order book updates.
329    async fn watch_order_book(
330        &self,
331        symbol: &str,
332        limit: Option<u32>,
333    ) -> Result<MessageStream<OrderBook>>;
334
335    /// Watch public trades
336    ///
337    /// # Arguments
338    ///
339    /// * `symbol` - Trading pair symbol
340    ///
341    /// # Returns
342    ///
343    /// A stream of trade vectors.
344    async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>>;
345
346    /// Watch OHLCV candlestick updates
347    ///
348    /// # Arguments
349    ///
350    /// * `symbol` - Trading pair symbol
351    /// * `timeframe` - Candlestick timeframe
352    ///
353    /// # Returns
354    ///
355    /// A stream of OHLCV updates.
356    async fn watch_ohlcv(&self, symbol: &str, timeframe: Timeframe)
357    -> Result<MessageStream<Ohlcv>>;
358
359    // ==================== Private Data Streams ====================
360
361    /// Watch account balance updates
362    ///
363    /// Requires authentication.
364    ///
365    /// # Returns
366    ///
367    /// A stream of balance updates.
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if not authenticated or subscription fails.
372    async fn watch_balance(&self) -> Result<MessageStream<Balance>>;
373
374    /// Watch order updates
375    ///
376    /// Requires authentication.
377    ///
378    /// # Arguments
379    ///
380    /// * `symbol` - Optional symbol to filter orders
381    ///
382    /// # Returns
383    ///
384    /// A stream of order updates.
385    async fn watch_orders(&self, symbol: Option<&str>) -> Result<MessageStream<Order>>;
386
387    /// Watch user trade updates
388    ///
389    /// Requires authentication.
390    ///
391    /// # Arguments
392    ///
393    /// * `symbol` - Optional symbol to filter trades
394    ///
395    /// # Returns
396    ///
397    /// A stream of trade updates.
398    async fn watch_my_trades(&self, symbol: Option<&str>) -> Result<MessageStream<Trade>>;
399
400    // ==================== Subscription Management ====================
401
402    /// Subscribe to a channel
403    ///
404    /// # Arguments
405    ///
406    /// * `channel` - Channel name to subscribe to
407    /// * `symbol` - Optional symbol for the subscription
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if subscription fails.
412    async fn subscribe(&self, channel: &str, symbol: Option<&str>) -> Result<()>;
413
414    /// Unsubscribe from a channel
415    ///
416    /// # Arguments
417    ///
418    /// * `channel` - Channel name to unsubscribe from
419    /// * `symbol` - Optional symbol for the subscription
420    ///
421    /// # Errors
422    ///
423    /// Returns an error if unsubscription fails.
424    async fn unsubscribe(&self, channel: &str, symbol: Option<&str>) -> Result<()>;
425
426    /// Get list of active subscriptions
427    ///
428    /// # Returns
429    ///
430    /// A vector of subscription identifiers.
431    fn subscriptions(&self) -> Vec<String>;
432}
433
434// ============================================================================
435// FullExchange Trait
436// ============================================================================
437
438/// Combined trait for exchanges that support both REST and WebSocket
439///
440/// This trait is automatically implemented for any type that implements
441/// both `Exchange` and `WsExchange`.
442///
443/// # Example
444///
445/// ```rust,no_run
446/// use ccxt_core::ws_exchange::FullExchange;
447///
448/// async fn use_full_exchange(exchange: &dyn FullExchange) {
449///     // Use REST API
450///     let ticker = exchange.fetch_ticker("BTC/USDT").await.unwrap();
451///     
452///     // Use WebSocket API
453///     exchange.ws_connect().await.unwrap();
454///     let stream = exchange.watch_ticker("BTC/USDT").await.unwrap();
455/// }
456/// ```
457pub trait FullExchange: Exchange + WsExchange {}
458
459// Blanket implementation for any type that implements both traits
460impl<T: Exchange + WsExchange> FullExchange for T {}
461
462// ============================================================================
463// Tests
464// ============================================================================
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_ws_connection_state_debug() {
472        let state = WsConnectionState::Disconnected;
473        assert_eq!(format!("{:?}", state), "Disconnected");
474    }
475}