ccxt_core/ws_exchange.rs
1//! # WebSocket Exchange Trait
2//!
3//! This module defines the [`WsExchange`] trait for real-time data streaming
4//! via WebSocket connections, enabling live market data and account updates.
5//!
6//! ## Overview
7//!
8//! The `WsExchange` trait complements the REST-based [`Exchange`](crate::exchange::Exchange)
9//! trait by providing real-time streaming capabilities:
10//!
11//! - **Connection Management**: Connect, disconnect, and monitor WebSocket state
12//! - **Public Data Streams**: Real-time ticker, order book, trades, and OHLCV updates
13//! - **Private Data Streams**: Live balance, order, and trade updates (requires authentication)
14//! - **Subscription Management**: Subscribe/unsubscribe to specific channels
15//!
16//! ## Architecture
17//!
18//! ```text
19//! ┌─────────────────────────────────────────────────────────────┐
20//! │ WsExchange Trait │
21//! ├─────────────────────────────────────────────────────────────┤
22//! │ Connection Management │
23//! │ ├── ws_connect(), ws_disconnect() │
24//! │ └── ws_is_connected(), ws_state() │
25//! ├─────────────────────────────────────────────────────────────┤
26//! │ Public Data Streams │
27//! │ ├── watch_ticker(), watch_tickers() │
28//! │ ├── watch_order_book(), watch_trades() │
29//! │ └── watch_ohlcv() │
30//! ├─────────────────────────────────────────────────────────────┤
31//! │ Private Data Streams (Authenticated) │
32//! │ ├── watch_balance() │
33//! │ ├── watch_orders() │
34//! │ └── watch_my_trades() │
35//! ├─────────────────────────────────────────────────────────────┤
36//! │ Subscription Management │
37//! │ └── subscribe(), unsubscribe(), subscriptions() │
38//! └─────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Key Types
42//!
43//! - [`WsExchange`]: The WebSocket streaming trait
44//! - [`MessageStream<T>`]: A pinned, boxed async stream yielding `Result<T>` items
45//! - [`FullExchange`]: Combined trait for exchanges supporting both REST and WebSocket
46//!
47//! ## Usage Examples
48//!
49//! ### Watching Real-Time Ticker Updates
50//!
51//! ```rust,no_run
52//! use ccxt_core::ws_exchange::WsExchange;
53//! use futures::StreamExt;
54//!
55//! async fn watch_ticker(exchange: &dyn WsExchange, symbol: &str) {
56//! // Connect to WebSocket
57//! exchange.ws_connect().await.unwrap();
58//!
59//! // Watch ticker updates
60//! let mut stream = exchange.watch_ticker(symbol).await.unwrap();
61//!
62//! while let Some(ticker) = stream.next().await {
63//! match ticker {
64//! Ok(t) => println!("Price: {:?}", t.last),
65//! Err(e) => eprintln!("Error: {}", e),
66//! }
67//! }
68//! }
69//! ```
70//!
71//! ### Watching Order Book Depth
72//!
73//! ```rust,no_run
74//! use ccxt_core::ws_exchange::WsExchange;
75//! use futures::StreamExt;
76//!
77//! async fn watch_orderbook(exchange: &dyn WsExchange, symbol: &str) {
78//! exchange.ws_connect().await.unwrap();
79//!
80//! let mut stream = exchange.watch_order_book(symbol, Some(10)).await.unwrap();
81//!
82//! while let Some(result) = stream.next().await {
83//! if let Ok(orderbook) = result {
84//! if let (Some(best_bid), Some(best_ask)) = (
85//! orderbook.bids.first(),
86//! orderbook.asks.first()
87//! ) {
88//! println!("Spread: {} - {}", best_bid.price, best_ask.price);
89//! }
90//! }
91//! }
92//! }
93//! ```
94//!
95//! ### Monitoring Connection State
96//!
97//! ```rust,no_run
98//! use ccxt_core::ws_exchange::WsExchange;
99//! use ccxt_core::ws_client::WsConnectionState;
100//!
101//! async fn ensure_connected(exchange: &dyn WsExchange) -> ccxt_core::Result<()> {
102//! if !exchange.ws_is_connected() {
103//! println!("Connecting to WebSocket...");
104//! exchange.ws_connect().await?;
105//! }
106//!
107//! match exchange.ws_state() {
108//! WsConnectionState::Connected => println!("Connected!"),
109//! WsConnectionState::Connecting => println!("Still connecting..."),
110//! WsConnectionState::Disconnected => println!("Disconnected"),
111//! WsConnectionState::Reconnecting => println!("Reconnecting..."),
112//! WsConnectionState::Error => println!("Error state"),
113//! }
114//!
115//! Ok(())
116//! }
117//! ```
118//!
119//! ### Using FullExchange for REST + WebSocket
120//!
121//! ```rust,no_run
122//! use ccxt_core::ws_exchange::FullExchange;
123//! use futures::StreamExt;
124//!
125//! async fn hybrid_trading(exchange: &dyn FullExchange, symbol: &str) {
126//! // Use REST API to get initial state
127//! let ticker = exchange.fetch_ticker(symbol).await.unwrap();
128//! println!("Initial price: {:?}", ticker.last);
129//!
130//! // Switch to WebSocket for real-time updates
131//! exchange.ws_connect().await.unwrap();
132//! let mut stream = exchange.watch_ticker(symbol).await.unwrap();
133//!
134//! while let Some(Ok(update)) = stream.next().await {
135//! println!("Live price: {:?}", update.last);
136//! }
137//! }
138//! ```
139//!
140//! ### Watching Private Account Updates
141//!
142//! ```rust,no_run
143//! use ccxt_core::ws_exchange::WsExchange;
144//! use futures::StreamExt;
145//!
146//! async fn watch_account(exchange: &dyn WsExchange) {
147//! exchange.ws_connect().await.unwrap();
148//!
149//! // Watch balance updates (requires authentication)
150//! let mut balance_stream = exchange.watch_balance().await.unwrap();
151//!
152//! // Watch order updates
153//! let mut order_stream = exchange.watch_orders(None).await.unwrap();
154//!
155//! tokio::select! {
156//! Some(Ok(balance)) = balance_stream.next() => {
157//! println!("Balance update: {:?}", balance);
158//! }
159//! Some(Ok(order)) = order_stream.next() => {
160//! println!("Order update: {} - {:?}", order.id, order.status);
161//! }
162//! }
163//! }
164//! ```
165//!
166//! ## MessageStream Type
167//!
168//! The [`MessageStream<T>`] type alias represents an async stream of results:
169//!
170//! ```rust,ignore
171//! pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;
172//! ```
173//!
174//! This type is:
175//! - **Pinned**: Required for async iteration
176//! - **Boxed**: Allows for dynamic dispatch and type erasure
177//! - **Send**: Can be sent across thread boundaries
178//! - **Yields Results**: Each item is `Result<T>` to handle stream errors
179//!
180//! ## Connection States
181//!
182//! WebSocket connections can be in one of several states:
183//!
184//! - `Disconnected`: Not connected to the server
185//! - `Connecting`: Connection in progress
186//! - `Connected`: Active connection ready for use
187//! - `Reconnecting`: Automatic reconnection in progress
188//!
189//! ## Error Handling
190//!
191//! WebSocket operations can fail with:
192//!
193//! - `WebSocket`: Connection or protocol errors
194//! - `Authentication`: Invalid credentials for private streams
195//! - `Network`: Network connectivity issues
196//! - `Timeout`: Connection or subscription timeout
197//!
198//! ## Thread Safety
199//!
200//! Like the `Exchange` trait, `WsExchange` requires `Send + Sync` bounds,
201//! ensuring compatibility with async runtimes and multi-threaded applications.
202//!
203//! ## See Also
204//!
205//! - [`crate::exchange::Exchange`]: REST API trait
206//! - [`crate::ws_client::WsClient`]: Low-level WebSocket client
207//! - [`crate::ws_client::WsConnectionState`]: Connection state enum
208
209use async_trait::async_trait;
210use futures::Stream;
211use std::pin::Pin;
212
213use crate::error::Result;
214use crate::exchange::Exchange;
215use crate::types::*;
216use crate::ws_client::WsConnectionState;
217
218// ============================================================================
219// Type Aliases
220// ============================================================================
221
222/// WebSocket message stream type
223///
224/// A pinned, boxed stream that yields `Result<T>` items.
225/// This type is used for all WebSocket data streams.
226pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;
227
228// ============================================================================
229// WsExchange Trait
230// ============================================================================
231
232/// WebSocket exchange trait for real-time data streaming
233///
234/// This trait defines the WebSocket API that exchanges can implement
235/// for real-time market data and account updates.
236///
237/// # Thread Safety
238///
239/// All implementations must be `Send + Sync` to allow safe usage across
240/// thread boundaries.
241///
242/// # Example
243///
244/// ```rust,no_run
245/// use ccxt_core::ws_exchange::WsExchange;
246///
247/// async fn check_connection(exchange: &dyn WsExchange) {
248/// if !exchange.ws_is_connected() {
249/// exchange.ws_connect().await.unwrap();
250/// }
251/// println!("Connection state: {:?}", exchange.ws_state());
252/// }
253/// ```
254#[async_trait]
255pub trait WsExchange: Send + Sync {
256 // ==================== Connection Management ====================
257
258 /// Connect to the WebSocket server
259 ///
260 /// Establishes a WebSocket connection to the exchange.
261 /// If already connected, this may be a no-op or reconnect.
262 ///
263 /// # Errors
264 ///
265 /// Returns an error if the connection fails.
266 async fn ws_connect(&self) -> Result<()>;
267
268 /// Disconnect from the WebSocket server
269 ///
270 /// Closes the WebSocket connection gracefully.
271 ///
272 /// # Errors
273 ///
274 /// Returns an error if disconnection fails.
275 async fn ws_disconnect(&self) -> Result<()>;
276
277 /// Check if WebSocket is connected
278 ///
279 /// # Returns
280 ///
281 /// True if the WebSocket connection is active.
282 fn ws_is_connected(&self) -> bool;
283
284 /// Get WebSocket connection state
285 ///
286 /// # Returns
287 ///
288 /// The current connection state.
289 fn ws_state(&self) -> WsConnectionState;
290
291 // ==================== Public Data Streams ====================
292
293 /// Watch ticker updates for a symbol
294 ///
295 /// # Arguments
296 ///
297 /// * `symbol` - Trading pair symbol (e.g., "BTC/USDT")
298 ///
299 /// # Returns
300 ///
301 /// A stream of ticker updates.
302 ///
303 /// # Errors
304 ///
305 /// Returns an error if subscription fails.
306 async fn watch_ticker(&self, symbol: &str) -> Result<MessageStream<Ticker>>;
307
308 /// Watch tickers for multiple symbols
309 ///
310 /// # Arguments
311 ///
312 /// * `symbols` - List of trading pair symbols
313 ///
314 /// # Returns
315 ///
316 /// A stream of ticker vectors.
317 async fn watch_tickers(&self, symbols: &[String]) -> Result<MessageStream<Vec<Ticker>>>;
318
319 /// Watch order book updates
320 ///
321 /// # Arguments
322 ///
323 /// * `symbol` - Trading pair symbol
324 /// * `limit` - Optional depth limit
325 ///
326 /// # Returns
327 ///
328 /// A stream of order book updates.
329 async fn watch_order_book(
330 &self,
331 symbol: &str,
332 limit: Option<u32>,
333 ) -> Result<MessageStream<OrderBook>>;
334
335 /// Watch public trades
336 ///
337 /// # Arguments
338 ///
339 /// * `symbol` - Trading pair symbol
340 ///
341 /// # Returns
342 ///
343 /// A stream of trade vectors.
344 async fn watch_trades(&self, symbol: &str) -> Result<MessageStream<Vec<Trade>>>;
345
346 /// Watch OHLCV candlestick updates
347 ///
348 /// # Arguments
349 ///
350 /// * `symbol` - Trading pair symbol
351 /// * `timeframe` - Candlestick timeframe
352 ///
353 /// # Returns
354 ///
355 /// A stream of OHLCV updates.
356 async fn watch_ohlcv(&self, symbol: &str, timeframe: Timeframe)
357 -> Result<MessageStream<Ohlcv>>;
358
359 // ==================== Private Data Streams ====================
360
361 /// Watch account balance updates
362 ///
363 /// Requires authentication.
364 ///
365 /// # Returns
366 ///
367 /// A stream of balance updates.
368 ///
369 /// # Errors
370 ///
371 /// Returns an error if not authenticated or subscription fails.
372 async fn watch_balance(&self) -> Result<MessageStream<Balance>>;
373
374 /// Watch order updates
375 ///
376 /// Requires authentication.
377 ///
378 /// # Arguments
379 ///
380 /// * `symbol` - Optional symbol to filter orders
381 ///
382 /// # Returns
383 ///
384 /// A stream of order updates.
385 async fn watch_orders(&self, symbol: Option<&str>) -> Result<MessageStream<Order>>;
386
387 /// Watch user trade updates
388 ///
389 /// Requires authentication.
390 ///
391 /// # Arguments
392 ///
393 /// * `symbol` - Optional symbol to filter trades
394 ///
395 /// # Returns
396 ///
397 /// A stream of trade updates.
398 async fn watch_my_trades(&self, symbol: Option<&str>) -> Result<MessageStream<Trade>>;
399
400 // ==================== Subscription Management ====================
401
402 /// Subscribe to a channel
403 ///
404 /// # Arguments
405 ///
406 /// * `channel` - Channel name to subscribe to
407 /// * `symbol` - Optional symbol for the subscription
408 ///
409 /// # Errors
410 ///
411 /// Returns an error if subscription fails.
412 async fn subscribe(&self, channel: &str, symbol: Option<&str>) -> Result<()>;
413
414 /// Unsubscribe from a channel
415 ///
416 /// # Arguments
417 ///
418 /// * `channel` - Channel name to unsubscribe from
419 /// * `symbol` - Optional symbol for the subscription
420 ///
421 /// # Errors
422 ///
423 /// Returns an error if unsubscription fails.
424 async fn unsubscribe(&self, channel: &str, symbol: Option<&str>) -> Result<()>;
425
426 /// Get list of active subscriptions
427 ///
428 /// # Returns
429 ///
430 /// A vector of subscription identifiers.
431 fn subscriptions(&self) -> Vec<String>;
432}
433
434// ============================================================================
435// FullExchange Trait
436// ============================================================================
437
438/// Combined trait for exchanges that support both REST and WebSocket
439///
440/// This trait is automatically implemented for any type that implements
441/// both `Exchange` and `WsExchange`.
442///
443/// # Example
444///
445/// ```rust,no_run
446/// use ccxt_core::ws_exchange::FullExchange;
447///
448/// async fn use_full_exchange(exchange: &dyn FullExchange) {
449/// // Use REST API
450/// let ticker = exchange.fetch_ticker("BTC/USDT").await.unwrap();
451///
452/// // Use WebSocket API
453/// exchange.ws_connect().await.unwrap();
454/// let stream = exchange.watch_ticker("BTC/USDT").await.unwrap();
455/// }
456/// ```
457pub trait FullExchange: Exchange + WsExchange {}
458
459// Blanket implementation for any type that implements both traits
460impl<T: Exchange + WsExchange> FullExchange for T {}
461
462// ============================================================================
463// Tests
464// ============================================================================
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_ws_connection_state_debug() {
472 let state = WsConnectionState::Disconnected;
473 assert_eq!(format!("{:?}", state), "Disconnected");
474 }
475}