1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
use failure::Fail;
use futures_util::StreamExt;
use log;
use serde::de::DeserializeOwned;
use serde_json::Result as SerdeResult;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::error::Error as WebSocketError;
use tokio_tungstenite::tungstenite::protocol::Message as WebSocketMessage;
use url::Url;

#[derive(Debug, Clone, Fail)]
pub enum TickerError {
  #[fail(display = "Error when deserializing message {}", message)]
  ParsingError { message: WebSocketMessage },

  #[fail(display = "Error when receiving message {}", message)]
  SocketError { message: String },
}

pub async fn start<OnMessage, OnError, T>(
  symbol_pairs: Vec<String>,
  on_message: OnMessage,
  on_error: OnError,
) -> Result<(), Box<dyn std::error::Error>>
where
  T: DeserializeOwned, // https://serde.rs/lifetimes.html#trait-bounds
  OnMessage: Fn(T) -> (),
  OnError: Fn(TickerError) -> (),
{
  let symbol_pairs: Vec<String> = symbol_pairs
    .iter()
    .map(|symbol_pair| {
      return format!("{}@ticker", symbol_pair);
    })
    .collect();

  let stream_params: String = symbol_pairs.join("/");

  let url = format!(
    "wss://stream.binance.com:9443/stream?streams={}",
    stream_params
  );

  log::debug!("Connecting to {} ...", url);

  let url = Url::parse(&url).unwrap();

  log::debug!("Opening websocket...");

  let (ws_stream, res) = connect_async(url).await?;

  log::debug!("Connected to {:?}", res);

  let (_, ws_stream_reader) = ws_stream.split();

  ws_stream_reader
    .for_each(|message: Result<WebSocketMessage, WebSocketError>| {
      async {
        log::debug!("Got message {:?}", message);

        if let Err(error) = message {
          on_error(TickerError::SocketError {
            message: format!("{}", error),
          });

          return;
        }

        let message = message.unwrap();

        // We'll just ignore ping message.
        // If I'm not mistaken the websocket library will automatically
        // send pong frame back.
        if message.is_ping() {
          log::debug!("Receive ping {}", message);
          return;
        }

        let response_text = message.to_text();

        if response_text.is_err() {
          on_error(TickerError::ParsingError { message });
          return;
        }

        let body: SerdeResult<T> = serde_json::from_str(response_text.unwrap());

        if body.is_err() {
          on_error(TickerError::ParsingError { message });
          return;
        }

        let body = body.unwrap();
        on_message(body);
      }
    })
    .await;

  return Ok(());
}