binance_client/websocket/
mod.rs1pub mod event;
6
7use std::sync::mpsc;
8use std::thread;
9
10use websocket::client::ClientBuilder;
11use websocket::ws::dataframe::DataFrame;
12use websocket::OwnedMessage;
13
14use crate::error::Error;
15
16use self::event::depth::Depth;
17use self::event::trade::Trade;
18use self::event::Event;
19
20#[derive(Debug, Clone)]
24pub struct Client {}
25
26impl Client {
27 pub fn run(symbol: &str, depth_period: Option<u64>) -> Result<mpsc::Receiver<Event>, Error> {
33 let (tx, rx) = mpsc::channel();
34
35 Self::subscribe::<Trade>(
36 format!(
37 "wss://stream.binance.com:9443/ws/{}@trade",
38 symbol.to_ascii_lowercase()
39 )
40 .as_str(),
41 tx.clone(),
42 )?;
43
44 if let Some(depth_period) = depth_period {
45 Self::subscribe::<Depth>(
46 format!(
47 "wss://stream.binance.com:9443/ws/{}@depth@{depth_period}ms",
48 symbol.to_ascii_lowercase()
49 )
50 .as_str(),
51 tx.clone(),
52 )?;
53 }
54
55 Ok(rx)
56 }
57
58 fn subscribe<E>(url: &str, tx: mpsc::Sender<Event>) -> Result<(), Error>
62 where
63 E: Into<Event> + serde::de::DeserializeOwned,
64 {
65 let mut client = ClientBuilder::new(url)
66 .expect("WebSocket address is valid")
67 .connect_secure(None)
68 .map_err(Error::WebSocket)?;
69
70 thread::spawn(move || loop {
71 let message = match client.recv_message() {
72 Ok(message) => {
73 if message.is_ping() {
74 log::debug!("Received ping");
75 match client.send_message(&OwnedMessage::Pong(b"pong frame".to_vec())) {
76 Ok(()) => log::debug!("Sent pong"),
77 Err(error) => log::warn!("Pong sending error: {}", error),
78 }
79 continue;
80 }
81
82 message.take_payload()
83 }
84 Err(error) => {
85 log::error!("Websocket error: {}", error);
86 return;
87 }
88 };
89
90 if message.is_empty() {
91 continue;
92 }
93
94 match serde_json::from_slice::<E>(&message) {
95 Ok(event) => match tx.send(event.into()) {
96 Ok(()) => {}
97 Err(_) => break,
98 },
99 Err(error) => log::warn!("Parsing error: {} ({:?})", error, message),
100 }
101 });
102
103 Ok(())
104 }
105}