lib/binance/
ticker.rs

1use failure::Fail;
2use futures_util::StreamExt;
3use log;
4use serde::de::DeserializeOwned;
5use serde_json::Result as SerdeResult;
6use tokio_tungstenite::connect_async;
7use tokio_tungstenite::tungstenite::error::Error as WebSocketError;
8use tokio_tungstenite::tungstenite::protocol::Message as WebSocketMessage;
9use url::Url;
10
11#[derive(Debug, Clone, Fail)]
12pub enum TickerError {
13  #[fail(display = "Error when deserializing message {}", message)]
14  ParsingError { message: WebSocketMessage },
15
16  #[fail(display = "Error when receiving message {}", message)]
17  SocketError { message: String },
18}
19
20pub async fn start<OnMessage, OnError, T>(
21  symbol_pairs: Vec<String>,
22  on_message: OnMessage,
23  on_error: OnError,
24) -> Result<(), Box<dyn std::error::Error>>
25where
26  T: DeserializeOwned, // https://serde.rs/lifetimes.html#trait-bounds
27  OnMessage: Fn(T) -> (),
28  OnError: Fn(TickerError) -> (),
29{
30  let symbol_pairs: Vec<String> = symbol_pairs
31    .iter()
32    .map(|symbol_pair| {
33      return format!("{}@ticker", symbol_pair);
34    })
35    .collect();
36
37  let stream_params: String = symbol_pairs.join("/");
38
39  let url = format!(
40    "wss://stream.binance.com:9443/stream?streams={}",
41    stream_params
42  );
43
44  log::debug!("Connecting to {} ...", url);
45
46  let url = Url::parse(&url).unwrap();
47
48  log::debug!("Opening websocket...");
49
50  let (ws_stream, res) = connect_async(url).await?;
51
52  log::debug!("Connected to {:?}", res);
53
54  let (_, ws_stream_reader) = ws_stream.split();
55
56  ws_stream_reader
57    .for_each(|message: Result<WebSocketMessage, WebSocketError>| {
58      async {
59        log::debug!("Got message {:?}", message);
60
61        if let Err(error) = message {
62          on_error(TickerError::SocketError {
63            message: format!("{}", error),
64          });
65
66          return;
67        }
68
69        let message = message.unwrap();
70
71        // We'll just ignore ping message.
72        // If I'm not mistaken the websocket library will automatically
73        // send pong frame back.
74        if message.is_ping() {
75          log::debug!("Receive ping {}", message);
76          return;
77        }
78
79        let response_text = message.to_text();
80
81        if response_text.is_err() {
82          on_error(TickerError::ParsingError { message });
83          return;
84        }
85
86        let body: SerdeResult<T> = serde_json::from_str(response_text.unwrap());
87
88        if body.is_err() {
89          on_error(TickerError::ParsingError { message });
90          return;
91        }
92
93        let body = body.unwrap();
94        on_message(body);
95      }
96    })
97    .await;
98
99  return Ok(());
100}