binance_connect/futures_usd/
client.rs

1use std::io::ErrorKind;
2use std::net::TcpStream;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::mpsc::Sender;
5use std::sync::Arc;
6
7use log::{debug, info};
8use tungstenite::stream::MaybeTlsStream;
9use tungstenite::{connect, Message, WebSocket};
10use url::Url;
11
12use crate::error::BinanceConnectError;
13use crate::futures_usd::deserializer::deserialize;
14use crate::futures_usd::enums::events::Event;
15use crate::futures_usd::stream::WouldBlockConfig;
16
17/// Establishes a WebSocket connection to the provided URL, reads and processes messages,
18/// and sends events to the specified sender.
19///
20/// # Arguments
21///
22/// * `sender` - A `Sender<Event>` to send events to.
23/// * `url` - The URL to connect to.
24/// * `would_block_config` - Configuration for handling WouldBlock errors.
25/// * `subscribe_payload` - An optional JSON payload to subscribe to specific streams.
26///
27/// # Returns
28///
29/// This function returns `Ok(())` if the connection and processing were successful, or
30/// a `BinanceConnectError` if an error occurred.
31pub fn client(
32    sender: Sender<Event>,
33    url: Url,
34    stop_signal: Arc<AtomicBool>,
35    would_block_config: WouldBlockConfig,
36    subscribe_payload: Option<String>,
37) -> Result<(), BinanceConnectError> {
38    // Establish a WebSocket connection.
39    let mut socket: WebSocket<MaybeTlsStream<TcpStream>> = socket(url)?;
40
41    // If a subscribe payload is provided, send the subscription request.
42    if let Some(subscribe_payload) = subscribe_payload {
43        debug!("{:?}", subscribe_payload);
44        socket.send(Message::Text(subscribe_payload))?;
45    }
46
47    // Continuously read and process WebSocket messages.
48    while !stop_signal.load(Ordering::Relaxed) {
49        match socket.read() {
50            Ok(message) => match message {
51                // Handle incoming JSON messages.
52                Message::Text(json_response) => {
53                    // Stop signal might have been called
54                    if stop_signal.load(Ordering::Relaxed) {
55                        return Ok(());
56                    };
57
58                    // Deserialize the JSON into an `Event` and send it to the sender.
59                    let event: Event = deserialize(json_response)?;
60                    sender.send(event)?;
61                }
62                // Handle incoming Ping messages.
63                Message::Ping(ping) => {
64                    // Stop signal might have been called
65                    if stop_signal.load(Ordering::Relaxed) {
66                        return Ok(());
67                    };
68                    // Respond to Ping with Pong to keep the connection alive.
69                    socket.send(Message::Pong(ping))?;
70                    debug!("pong");
71                }
72                _ => {}
73            },
74            Err(err) => match err {
75                tungstenite::Error::Io(ref io_err) if io_err.kind() == ErrorKind::WouldBlock => {
76                    // Stop signal might have been called
77                    if stop_signal.load(Ordering::Relaxed) {
78                        return Ok(());
79                    };
80
81                    if would_block_config.error_on_block {
82                        // Return a SocketError if configured to do so.
83                        Err(BinanceConnectError::SocketError(err))?;
84                    }
85                    // Sleep for the specified time if a WouldBlock error occurs.
86                    debug!(
87                        "futures_usd client thread slept {:?} because of WouldBlock error",
88                        would_block_config.time_out
89                    );
90                    std::thread::sleep(would_block_config.time_out);
91                }
92                _ => {
93                    // Stop signal might have been called
94                    if stop_signal.load(Ordering::Relaxed) {
95                        return Ok(());
96                    };
97                    // Return a SocketError for other types of errors.
98                    Err(BinanceConnectError::SocketError(err))?;
99                }
100            },
101        }
102    }
103
104    Ok(())
105}
106
107/// Establishes a WebSocket connection to the provided URL.
108fn socket(url: Url) -> Result<WebSocket<MaybeTlsStream<TcpStream>>, BinanceConnectError> {
109    let (socket, _) = connect(url)?;
110    Ok(socket)
111}