1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//!
//! The Binance WebSocket adapter.
//!

use std::sync::mpsc;
use std::thread;

use websocket::client::ClientBuilder;
use websocket::ws::dataframe::DataFrame;
use websocket::OwnedMessage;

use crate::data::event::depth::Depth;
use crate::data::event::trade::Trade;
use crate::data::event::Event;
use crate::error::Error;

///
/// The Binance WebSocket client.
///
#[derive(Debug, Clone)]
pub struct Client {}

impl Client {
    ///
    /// Subscribes to a `symbol`-dedicated trade and depth streams.
    ///
    pub fn subscribe(symbol: &str) -> Result<mpsc::Receiver<Event>, Error> {
        let (tx, rx) = mpsc::channel();

        {
            let address = format!(
                "wss://stream.binance.com:9443/ws/{}@trade",
                symbol.to_ascii_lowercase()
            );
            let mut client = ClientBuilder::new(&address)
                .expect("WebSocket address is valid")
                .connect_secure(None)
                .map_err(Error::WebSocket)?;

            let tx = tx.clone();
            thread::spawn(move || loop {
                let message = match client.recv_message() {
                    Ok(message) => {
                        if message.is_ping() {
                            log::debug!("Received ping");
                            match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
                                Ok(()) => log::debug!("Sent pong"),
                                Err(error) => log::warn!("Pong sending error: {}", error),
                            }
                            continue;
                        }

                        message.take_payload()
                    }
                    Err(error) => {
                        log::error!("Websocket error: {}", error);
                        return;
                    }
                };

                if message.is_empty() {
                    continue;
                }

                match serde_json::from_slice::<Trade>(&message) {
                    Ok(trade) => match tx.send(Event::Trade(trade)) {
                        Ok(()) => {}
                        Err(_) => break,
                    },
                    Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
                }
            });
        }

        {
            let address = format!(
                "wss://stream.binance.com:9443/ws/{}@depth@100ms",
                symbol.to_ascii_lowercase()
            );
            let mut client = ClientBuilder::new(&address)
                .expect("WebSocket address is valid")
                .connect_secure(None)
                .map_err(Error::WebSocket)?;

            thread::spawn(move || loop {
                let message = match client.recv_message() {
                    Ok(message) => {
                        if message.is_ping() {
                            log::debug!("Received ping");
                            match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
                                Ok(()) => log::debug!("Sent pong"),
                                Err(error) => log::warn!("Pong sending error: {}", error),
                            }
                            continue;
                        }

                        message.take_payload()
                    }
                    Err(error) => {
                        log::error!("Websocket error: {}", error);
                        return;
                    }
                };

                if message.is_empty() {
                    continue;
                }

                match serde_json::from_slice::<Depth>(&message) {
                    Ok(depth) => match tx.send(Event::Depth(depth)) {
                        Ok(()) => {}
                        Err(_) => break,
                    },
                    Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
                }
            });
        }

        Ok(rx)
    }
}