crypto_ws_client/clients/
coinbase_pro.rs1use async_trait::async_trait;
2use std::collections::{BTreeMap, HashMap};
3use tokio_tungstenite::tungstenite::Message;
4
5use crate::{
6 clients::common_traits::{
7 Candlestick, Level3OrderBook, OrderBook, OrderBookTopK, Ticker, Trade, BBO,
8 },
9 common::{
10 command_translator::CommandTranslator,
11 message_handler::{MessageHandler, MiscMessage},
12 ws_client_internal::WSClientInternal,
13 },
14 WSClient,
15};
16use log::*;
17use serde_json::Value;
18
19pub(super) const EXCHANGE_NAME: &str = "coinbase_pro";
20
21const WEBSOCKET_URL: &str = "wss://ws-feed.exchange.coinbase.com";
22
23pub struct CoinbaseProWSClient {
30 client: WSClientInternal<CoinbaseProMessageHandler>,
31 translator: CoinbaseProCommandTranslator,
32}
33
34impl_new_constructor!(
35 CoinbaseProWSClient,
36 EXCHANGE_NAME,
37 WEBSOCKET_URL,
38 CoinbaseProMessageHandler {},
39 CoinbaseProCommandTranslator {}
40);
41
42impl_trait!(Trade, CoinbaseProWSClient, subscribe_trade, "matches");
43impl_trait!(Ticker, CoinbaseProWSClient, subscribe_ticker, "ticker");
44#[rustfmt::skip]
45impl_trait!(OrderBook, CoinbaseProWSClient, subscribe_orderbook, "level2");
46#[rustfmt::skip]
47impl_trait!(Level3OrderBook, CoinbaseProWSClient, subscribe_l3_orderbook, "full");
48
49panic_bbo!(CoinbaseProWSClient);
50panic_candlestick!(CoinbaseProWSClient);
51panic_l2_topk!(CoinbaseProWSClient);
52
53impl_ws_client_trait!(CoinbaseProWSClient);
54
55struct CoinbaseProMessageHandler {}
56struct CoinbaseProCommandTranslator {}
57
58impl MessageHandler for CoinbaseProMessageHandler {
59 fn handle_message(&mut self, msg: &str) -> MiscMessage {
60 let resp = serde_json::from_str::<HashMap<String, Value>>(msg);
61 if resp.is_err() {
62 error!("{} is not a JSON string, {}", msg, EXCHANGE_NAME);
63 return MiscMessage::Other;
64 }
65 let obj = resp.unwrap();
66
67 match obj.get("type").unwrap().as_str().unwrap() {
68 "error" => {
69 error!("Received {} from {}", msg, EXCHANGE_NAME);
70 if obj.contains_key("reason")
71 && obj
72 .get("reason")
73 .unwrap()
74 .as_str()
75 .unwrap()
76 .contains("is not a valid product")
77 {
78 panic!("Received {msg} from {EXCHANGE_NAME}");
79 } else {
80 MiscMessage::Other
81 }
82 }
83 "subscriptions" => {
84 info!("Received {} from {}", msg, EXCHANGE_NAME);
85 MiscMessage::Other
86 }
87 "heartbeat" => {
88 debug!("Received {} from {}", msg, EXCHANGE_NAME);
89 MiscMessage::Other
90 }
91 _ => MiscMessage::Normal,
92 }
93 }
94
95 fn get_ping_msg_and_interval(&self) -> Option<(Message, u64)> {
96 None
97 }
98}
99
100impl CommandTranslator for CoinbaseProCommandTranslator {
101 fn translate_to_commands(&self, subscribe: bool, topics: &[(String, String)]) -> Vec<String> {
102 let mut commands: Vec<String> = Vec::new();
103
104 let mut channel_symbols = BTreeMap::<String, Vec<String>>::new();
105 for (channel, symbol) in topics {
106 match channel_symbols.get_mut(channel) {
107 Some(symbols) => symbols.push(symbol.to_string()),
108 None => {
109 channel_symbols.insert(channel.to_string(), vec![symbol.to_string()]);
110 }
111 }
112 }
113
114 if !channel_symbols.is_empty() {
115 let mut command = String::new();
116 command.push_str(
117 format!(
118 r#"{{"type":"{}","channels": ["#,
119 if subscribe { "subscribe" } else { "unsubscribe" }
120 )
121 .as_str(),
122 );
123 for (channel, symbols) in channel_symbols.iter() {
124 command.push_str(
125 format!(
126 r#"{{"name":"{}","product_ids":{}}}"#,
127 channel,
128 serde_json::to_string(symbols).unwrap(),
129 )
130 .as_str(),
131 );
132 command.push(',')
133 }
134 command.pop();
135 command.push_str("]}");
136
137 commands.push(command);
138 }
139
140 commands
141 }
142
143 fn translate_to_candlestick_commands(
144 &self,
145 _subscribe: bool,
146 _symbol_interval_list: &[(String, usize)],
147 ) -> Vec<String> {
148 panic!("CoinbasePro does NOT have candlestick channel");
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use crate::common::command_translator::CommandTranslator;
155
156 #[test]
157 fn test_two_symbols() {
158 let translator = super::CoinbaseProCommandTranslator {};
159 let commands = translator.translate_to_commands(
160 true,
161 &[
162 ("matches".to_string(), "BTC-USD".to_string()),
163 ("matches".to_string(), "ETH-USD".to_string()),
164 ],
165 );
166
167 assert_eq!(1, commands.len());
168 assert_eq!(
169 r#"{"type":"subscribe","channels": [{"name":"matches","product_ids":["BTC-USD","ETH-USD"]}]}"#,
170 commands[0]
171 );
172 }
173
174 #[test]
175 fn test_two_channels() {
176 let translator = super::CoinbaseProCommandTranslator {};
177 let commands = translator.translate_to_commands(
178 true,
179 &[
180 ("matches".to_string(), "BTC-USD".to_string()),
181 ("level2".to_string(), "BTC-USD".to_string()),
182 ],
183 );
184
185 assert_eq!(1, commands.len());
186 assert_eq!(
187 r#"{"type":"subscribe","channels": [{"name":"level2","product_ids":["BTC-USD"]},{"name":"matches","product_ids":["BTC-USD"]}]}"#,
188 commands[0]
189 );
190 }
191}