binance_connect/futures_usd/client.rs
1use std::io::ErrorKind;
2use std::net::TcpStream;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::Sender;
5use std::sync::Arc;
6
7use log::{debug, info};
8use tungstenite::stream::MaybeTlsStream;
9use tungstenite::{connect, Message, WebSocket};
10use url::Url;
11
12use crate::error::BinanceConnectError;
13use crate::futures_usd::deserializer::deserialize;
14use crate::futures_usd::enums::events::Event;
15use crate::futures_usd::stream::WouldBlockConfig;
16
17/// Establishes a WebSocket connection to the provided URL, reads and processes messages,
18/// and sends events to the specified sender.
19///
20/// # Arguments
21///
22/// * `sender` - A `Sender<Event>` to send events to.
23/// * `url` - The URL to connect to.
24/// * `would_block_config` - Configuration for handling WouldBlock errors.
25/// * `subscribe_payload` - An optional JSON payload to subscribe to specific streams.
26///
27/// # Returns
28///
29/// This function returns `Ok(())` if the connection and processing were successful, or
30/// a `BinanceConnectError` if an error occurred.
31pub fn client(
32 sender: Sender<Event>,
33 url: Url,
34 stop_signal: Arc<AtomicBool>,
35 would_block_config: WouldBlockConfig,
36 subscribe_payload: Option<String>,
37) -> Result<(), BinanceConnectError> {
38 // Establish a WebSocket connection.
39 let mut socket: WebSocket<MaybeTlsStream<TcpStream>> = socket(url)?;
40
41 // If a subscribe payload is provided, send the subscription request.
42 if let Some(subscribe_payload) = subscribe_payload {
43 debug!("{:?}", subscribe_payload);
44 socket.send(Message::Text(subscribe_payload))?;
45 }
46
47 // Continuously read and process WebSocket messages.
48 while !stop_signal.load(Ordering::Relaxed) {
49 match socket.read() {
50 Ok(message) => match message {
51 // Handle incoming JSON messages.
52 Message::Text(json_response) => {
53 // Stop signal might have been called
54 if stop_signal.load(Ordering::Relaxed) {
55 return Ok(());
56 };
57
58 // Deserialize the JSON into an `Event` and send it to the sender.
59 let event: Event = deserialize(json_response)?;
60 sender.send(event)?;
61 }
62 // Handle incoming Ping messages.
63 Message::Ping(ping) => {
64 // Stop signal might have been called
65 if stop_signal.load(Ordering::Relaxed) {
66 return Ok(());
67 };
68 // Respond to Ping with Pong to keep the connection alive.
69 socket.send(Message::Pong(ping))?;
70 debug!("pong");
71 }
72 _ => {}
73 },
74 Err(err) => match err {
75 tungstenite::Error::Io(ref io_err) if io_err.kind() == ErrorKind::WouldBlock => {
76 // Stop signal might have been called
77 if stop_signal.load(Ordering::Relaxed) {
78 return Ok(());
79 };
80
81 if would_block_config.error_on_block {
82 // Return a SocketError if configured to do so.
83 Err(BinanceConnectError::SocketError(err))?;
84 }
85 // Sleep for the specified time if a WouldBlock error occurs.
86 debug!(
87 "futures_usd client thread slept {:?} because of WouldBlock error",
88 would_block_config.time_out
89 );
90 std::thread::sleep(would_block_config.time_out);
91 }
92 _ => {
93 // Stop signal might have been called
94 if stop_signal.load(Ordering::Relaxed) {
95 return Ok(());
96 };
97 // Return a SocketError for other types of errors.
98 Err(BinanceConnectError::SocketError(err))?;
99 }
100 },
101 }
102 }
103
104 Ok(())
105}
106
107/// Establishes a WebSocket connection to the provided URL.
108fn socket(url: Url) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, BinanceConnectError> {
109 let (socket, _) = connect(url)?;
110 Ok(socket)
111}