binance_client/websocket/
mod.rs

1//!
2//! The Binance WebSocket adapter.
3//!
4
5pub mod event;
6
7use std::sync::mpsc;
8use std::thread;
9
10use websocket::client::ClientBuilder;
11use websocket::ws::dataframe::DataFrame;
12use websocket::OwnedMessage;
13
14use crate::error::Error;
15
16use self::event::depth::Depth;
17use self::event::trade::Trade;
18use self::event::Event;
19
20///
21/// The Binance WebSocket client.
22///
23#[derive(Debug, Clone)]
24pub struct Client {}
25
26impl Client {
27    ///
28    /// Runs the `symbol`-dedicated trade and depth streams.
29    ///
30    /// If `depth` is `None`, it is not requested.
31    ///
32    pub fn run(symbol: &str, depth_period: Option<u64>) -> Result<mpsc::Receiver<Event>, Error> {
33        let (tx, rx) = mpsc::channel();
34
35        Self::subscribe::<Trade>(
36            format!(
37                "wss://stream.binance.com:9443/ws/{}@trade",
38                symbol.to_ascii_lowercase()
39            )
40            .as_str(),
41            tx.clone(),
42        )?;
43
44        if let Some(depth_period) = depth_period {
45            Self::subscribe::<Depth>(
46                format!(
47                    "wss://stream.binance.com:9443/ws/{}@depth@{depth_period}ms",
48                    symbol.to_ascii_lowercase()
49                )
50                .as_str(),
51                tx.clone(),
52            )?;
53        }
54
55        Ok(rx)
56    }
57
58    ///
59    /// Subscribes to a particular stream.
60    ///
61    fn subscribe<E>(url: &str, tx: mpsc::Sender<Event>) -> Result<(), Error>
62    where
63        E: Into<Event> + serde::de::DeserializeOwned,
64    {
65        let mut client = ClientBuilder::new(url)
66            .expect("WebSocket address is valid")
67            .connect_secure(None)
68            .map_err(Error::WebSocket)?;
69
70        thread::spawn(move || loop {
71            let message = match client.recv_message() {
72                Ok(message) => {
73                    if message.is_ping() {
74                        log::debug!("Received ping");
75                        match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
76                            Ok(()) => log::debug!("Sent pong"),
77                            Err(error) => log::warn!("Pong sending error: {}", error),
78                        }
79                        continue;
80                    }
81
82                    message.take_payload()
83                }
84                Err(error) => {
85                    log::error!("Websocket error: {}", error);
86                    return;
87                }
88            };
89
90            if message.is_empty() {
91                continue;
92            }
93
94            match serde_json::from_slice::<E>(&message) {
95                Ok(event) => match tx.send(event.into()) {
96                    Ok(()) => {}
97                    Err(_) => break,
98                },
99                Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
100            }
101        });
102
103        Ok(())
104    }
105}