crypto_ws_client/clients/
coinbase_pro.rs

1use async_trait::async_trait;
2use std::collections::{BTreeMap, HashMap};
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6    clients::common_traits::{
7        Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8    },
9    common::{
10        command_translator::CommandTranslator,
11        message_handler::{MessageHandler, MiscMessage},
12        ws_client_internal::WSClientInternal,
13    },
14    WSClient,
15};
16use log::*;
17use serde_json::Value;
18
19pub(super) const EXCHANGE_NAME: &str = "coinbase_pro";
20
21const WEBSOCKET_URL: &str = "wss://ws-feed.exchange.coinbase.com";
22
23/// The WebSocket client for CoinbasePro.
24///
25/// CoinbasePro has only Spot market.
26///
27///   * WebSocket API doc: <https://docs.cloud.coinbase.com/exchange/docs/websocket-overview>
28///   * Trading at: <https://pro.coinbase.com/>
29pub struct CoinbaseProWSClient {
30    client: WSClientInternal<CoinbaseProMessageHandler>,
31    translator: CoinbaseProCommandTranslator,
32}
33
34impl_new_constructor!(
35    CoinbaseProWSClient,
36    EXCHANGE_NAME,
37    WEBSOCKET_URL,
38    CoinbaseProMessageHandler {},
39    CoinbaseProCommandTranslator {}
40);
41
42impl_trait!(Trade, CoinbaseProWSClient, subscribe_trade, "matches");
43impl_trait!(Ticker, CoinbaseProWSClient, subscribe_ticker, "ticker");
44#[rustfmt::skip]
45impl_trait!(OrderBook, CoinbaseProWSClient, subscribe_orderbook, "level2");
46#[rustfmt::skip]
47impl_trait!(Level3OrderBook, CoinbaseProWSClient, subscribe_l3_orderbook, "full");
48
49panic_bbo!(CoinbaseProWSClient);
50panic_candlestick!(CoinbaseProWSClient);
51panic_l2_topk!(CoinbaseProWSClient);
52
53impl_ws_client_trait!(CoinbaseProWSClient);
54
55struct CoinbaseProMessageHandler {}
56struct CoinbaseProCommandTranslator {}
57
58impl MessageHandler for CoinbaseProMessageHandler {
59    fn handle_message(&mut self, msg: &str) -> MiscMessage {
60        let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
61        if resp.is_err() {
62            error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
63            return MiscMessage::Other;
64        }
65        let obj = resp.unwrap();
66
67        match obj.get("type").unwrap().as_str().unwrap() {
68            "error" => {
69                error!("Received {} from {}", msg, EXCHANGE_NAME);
70                if obj.contains_key("reason")
71                    && obj
72                        .get("reason")
73                        .unwrap()
74                        .as_str()
75                        .unwrap()
76                        .contains("is not a valid product")
77                {
78                    panic!("Received {msg} from {EXCHANGE_NAME}");
79                } else {
80                    MiscMessage::Other
81                }
82            }
83            "subscriptions" => {
84                info!("Received {} from {}", msg, EXCHANGE_NAME);
85                MiscMessage::Other
86            }
87            "heartbeat" => {
88                debug!("Received {} from {}", msg, EXCHANGE_NAME);
89                MiscMessage::Other
90            }
91            _ => MiscMessage::Normal,
92        }
93    }
94
95    fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
96        None
97    }
98}
99
100impl CommandTranslator for CoinbaseProCommandTranslator {
101    fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
102        let mut commands: Vec<String> = Vec::new();
103
104        let mut channel_symbols = BTreeMap::<String, Vec<String>>::new();
105        for (channel, symbol) in topics {
106            match channel_symbols.get_mut(channel) {
107                Some(symbols) => symbols.push(symbol.to_string()),
108                None => {
109                    channel_symbols.insert(channel.to_string(), vec![symbol.to_string()]);
110                }
111            }
112        }
113
114        if !channel_symbols.is_empty() {
115            let mut command = String::new();
116            command.push_str(
117                format!(
118                    r#"{{"type":"{}","channels": ["#,
119                    if subscribe { "subscribe" } else { "unsubscribe" }
120                )
121                .as_str(),
122            );
123            for (channel, symbols) in channel_symbols.iter() {
124                command.push_str(
125                    format!(
126                        r#"{{"name":"{}","product_ids":{}}}"#,
127                        channel,
128                        serde_json::to_string(symbols).unwrap(),
129                    )
130                    .as_str(),
131                );
132                command.push(',')
133            }
134            command.pop();
135            command.push_str("]}");
136
137            commands.push(command);
138        }
139
140        commands
141    }
142
143    fn translate_to_candlestick_commands(
144        &self,
145        _subscribe: bool,
146        _symbol_interval_list: &[(String, usize)],
147    ) -> Vec<String> {
148        panic!("CoinbasePro does NOT have candlestick channel");
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use crate::common::command_translator::CommandTranslator;
155
156    #[test]
157    fn test_two_symbols() {
158        let translator = super::CoinbaseProCommandTranslator {};
159        let commands = translator.translate_to_commands(
160            true,
161            &[
162                ("matches".to_string(), "BTC-USD".to_string()),
163                ("matches".to_string(), "ETH-USD".to_string()),
164            ],
165        );
166
167        assert_eq!(1, commands.len());
168        assert_eq!(
169            r#"{"type":"subscribe","channels": [{"name":"matches","product_ids":["BTC-USD","ETH-USD"]}]}"#,
170            commands[0]
171        );
172    }
173
174    #[test]
175    fn test_two_channels() {
176        let translator = super::CoinbaseProCommandTranslator {};
177        let commands = translator.translate_to_commands(
178            true,
179            &[
180                ("matches".to_string(), "BTC-USD".to_string()),
181                ("level2".to_string(), "BTC-USD".to_string()),
182            ],
183        );
184
185        assert_eq!(1, commands.len());
186        assert_eq!(
187            r#"{"type":"subscribe","channels": [{"name":"level2","product_ids":["BTC-USD"]},{"name":"matches","product_ids":["BTC-USD"]}]}"#,
188            commands[0]
189        );
190    }
191}