1use failure::Fail;
2use futures_util::StreamExt;
3use log;
4use serde::de::DeserializeOwned;
5use serde_json::Result as SerdeResult;
6use tokio_tungstenite::connect_async;
7use tokio_tungstenite::tungstenite::error::Error as WebSocketError;
8use tokio_tungstenite::tungstenite::protocol::Message as WebSocketMessage;
9use url::Url;
10
11#[derive(Debug, Clone, Fail)]
12pub enum TickerError {
13 #[fail(display = "Error when deserializing message {}", message)]
14 ParsingError { message: WebSocketMessage },
15
16 #[fail(display = "Error when receiving message {}", message)]
17 SocketError { message: String },
18}
19
20pub async fn start<OnMessage, OnError, T>(
21 symbol_pairs: Vec<String>,
22 on_message: OnMessage,
23 on_error: OnError,
24) -> Result<(), Box<dyn std::error::Error>>
25where
26 T: DeserializeOwned, OnMessage: Fn(T) -> (),
28 OnError: Fn(TickerError) -> (),
29{
30 let symbol_pairs: Vec<String> = symbol_pairs
31 .iter()
32 .map(|symbol_pair| {
33 return format!("{}@ticker", symbol_pair);
34 })
35 .collect();
36
37 let stream_params: String = symbol_pairs.join("/");
38
39 let url = format!(
40 "wss://stream.binance.com:9443/stream?streams={}",
41 stream_params
42 );
43
44 log::debug!("Connecting to {} ...", url);
45
46 let url = Url::parse(&url).unwrap();
47
48 log::debug!("Opening websocket...");
49
50 let (ws_stream, res) = connect_async(url).await?;
51
52 log::debug!("Connected to {:?}", res);
53
54 let (_, ws_stream_reader) = ws_stream.split();
55
56 ws_stream_reader
57 .for_each(|message: Result<WebSocketMessage, WebSocketError>| {
58 async {
59 log::debug!("Got message {:?}", message);
60
61 if let Err(error) = message {
62 on_error(TickerError::SocketError {
63 message: format!("{}", error),
64 });
65
66 return;
67 }
68
69 let message = message.unwrap();
70
71 if message.is_ping() {
75 log::debug!("Receive ping {}", message);
76 return;
77 }
78
79 let response_text = message.to_text();
80
81 if response_text.is_err() {
82 on_error(TickerError::ParsingError { message });
83 return;
84 }
85
86 let body: SerdeResult<T> = serde_json::from_str(response_text.unwrap());
87
88 if body.is_err() {
89 on_error(TickerError::ParsingError { message });
90 return;
91 }
92
93 let body = body.unwrap();
94 on_message(body);
95 }
96 })
97 .await;
98
99 return Ok(());
100}