Skip to main content

sandbox_quant/binance/
ws.rs

1use anyhow::Result;
2use futures_util::{SinkExt, StreamExt};
3use std::time::Duration;
4use tokio::sync::{mpsc, watch};
5use tokio_tungstenite::tungstenite;
6use tungstenite::error::{Error as WsError, ProtocolError, UrlError};
7use tungstenite::protocol::frame::coding::CloseCode;
8
9use super::types::BinanceTradeEvent;
10use crate::event::{AppEvent, WsConnectionStatus};
11use crate::model::tick::Tick;
12
13/// Exponential backoff for reconnection.
14struct ExponentialBackoff {
15    current: Duration,
16    initial: Duration,
17    max: Duration,
18    factor: f64,
19}
20
21impl ExponentialBackoff {
22    fn new(initial: Duration, max: Duration, factor: f64) -> Self {
23        Self {
24            current: initial,
25            initial,
26            max,
27            factor,
28        }
29    }
30
31    fn next_delay(&mut self) -> Duration {
32        let delay = self.current;
33        self.current = Duration::from_secs_f64(
34            (self.current.as_secs_f64() * self.factor).min(self.max.as_secs_f64()),
35        );
36        delay
37    }
38
39    fn reset(&mut self) {
40        self.current = self.initial;
41    }
42}
43
44#[derive(Clone)]
45pub struct BinanceWsClient {
46    spot_url: String,
47    futures_url: String,
48}
49
50impl BinanceWsClient {
51    /// Create a new WebSocket client.
52    ///
53    /// `ws_base_url` — e.g. `wss://stream.testnet.binance.vision/ws`
54    pub fn new(ws_base_url: &str, futures_ws_base_url: &str) -> Self {
55        Self {
56            spot_url: ws_base_url.to_string(),
57            futures_url: futures_ws_base_url.to_string(),
58        }
59    }
60
61    /// Connect and run the WebSocket loop with automatic reconnection.
62    /// Sends WsStatus events through `status_tx` and ticks through `tick_tx`.
63    pub async fn connect_and_run(
64        &self,
65        tick_tx: mpsc::Sender<Tick>,
66        status_tx: mpsc::Sender<AppEvent>,
67        mut symbol_rx: watch::Receiver<String>,
68        mut shutdown: watch::Receiver<bool>,
69    ) -> Result<()> {
70        let mut backoff =
71            ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
72        let mut attempt: u32 = 0;
73
74        loop {
75            attempt += 1;
76            let instrument = symbol_rx.borrow().clone();
77            let (symbol, is_futures) = parse_instrument_symbol(&instrument);
78            let streams = vec![format!("{}@trade", symbol.to_lowercase())];
79            let ws_url = if is_futures {
80                &self.futures_url
81            } else {
82                &self.spot_url
83            };
84            match self
85                .connect_once(
86                    ws_url,
87                    &streams,
88                    &instrument,
89                    &tick_tx,
90                    &status_tx,
91                    &mut symbol_rx,
92                    &mut shutdown,
93                )
94                .await
95            {
96                Ok(()) => {
97                    // Clean shutdown requested
98                    let _ = status_tx
99                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
100                        .await;
101                    break;
102                }
103                Err(e) => {
104                    let _ = status_tx
105                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
106                        .await;
107                    tracing::warn!(attempt, error = %e, "WS connection attempt failed");
108                    let _ = status_tx
109                        .send(AppEvent::LogMessage(format!(
110                            "WS error (attempt #{}): {}",
111                            attempt, e
112                        )))
113                        .await;
114
115                    let delay = backoff.next_delay();
116                    let _ = status_tx
117                        .send(AppEvent::WsStatus(WsConnectionStatus::Reconnecting {
118                            attempt,
119                            delay_ms: delay.as_millis() as u64,
120                        }))
121                        .await;
122
123                    tokio::select! {
124                        _ = tokio::time::sleep(delay) => continue,
125                        _ = shutdown.changed() => {
126                            let _ = status_tx
127                                .send(AppEvent::LogMessage("Shutdown during reconnect".to_string()))
128                                .await;
129                            break;
130                        }
131                    }
132                }
133            }
134        }
135        Ok(())
136    }
137
138    async fn connect_once(
139        &self,
140        ws_url: &str,
141        streams: &[String],
142        display_symbol: &str,
143        tick_tx: &mpsc::Sender<Tick>,
144        status_tx: &mpsc::Sender<AppEvent>,
145        symbol_rx: &mut watch::Receiver<String>,
146        shutdown: &mut watch::Receiver<bool>,
147    ) -> Result<()> {
148        let _ = status_tx
149            .send(AppEvent::LogMessage(format!("Connecting to {}", ws_url)))
150            .await;
151
152        let (ws_stream, resp) = tokio_tungstenite::connect_async(ws_url)
153            .await
154            .map_err(|e| {
155                let detail = format_ws_error(&e);
156                let _ = status_tx.try_send(AppEvent::LogMessage(detail.clone()));
157                anyhow::anyhow!("WebSocket connect failed: {}", detail)
158            })?;
159
160        tracing::debug!(status = %resp.status(), "WebSocket HTTP upgrade response");
161
162        let (mut write, mut read) = ws_stream.split();
163
164        // Send SUBSCRIBE message per Binance WebSocket API spec
165        let subscribe_msg = serde_json::json!({
166            "method": "SUBSCRIBE",
167            "params": streams,
168            "id": 1
169        });
170        write
171            .send(tungstenite::Message::Text(subscribe_msg.to_string()))
172            .await
173            .map_err(|e| {
174                let detail = format_ws_error(&e);
175                anyhow::anyhow!("Failed to send SUBSCRIBE: {}", detail)
176            })?;
177
178        let _ = status_tx
179            .send(AppEvent::LogMessage(format!(
180                "Subscribed to: {}",
181                streams.join(", ")
182            )))
183            .await;
184
185        // Send Connected AFTER successful subscription
186        let _ = status_tx
187            .send(AppEvent::WsStatus(WsConnectionStatus::Connected))
188            .await;
189        let _ = status_tx
190            .send(AppEvent::LogMessage("WebSocket connected".to_string()))
191            .await;
192
193        loop {
194            tokio::select! {
195                msg = read.next() => {
196                    match msg {
197                        Some(Ok(tungstenite::Message::Text(text))) => {
198                            self.handle_text_message(&text, display_symbol, tick_tx, status_tx).await;
199                        }
200                        Some(Ok(tungstenite::Message::Ping(_))) => {
201                            // tokio-tungstenite handles pong automatically
202                        }
203                        Some(Ok(tungstenite::Message::Close(frame))) => {
204                            let detail = match &frame {
205                                Some(cf) => format!(
206                                    "Server closed: code={} reason=\"{}\"",
207                                    format_close_code(&cf.code),
208                                    cf.reason
209                                ),
210                                None => "Server closed: no close frame".to_string(),
211                            };
212                            let _ = status_tx
213                                .send(AppEvent::LogMessage(detail.clone()))
214                                .await;
215                            return Err(anyhow::anyhow!("{}", detail));
216                        }
217                        Some(Ok(other)) => {
218                            tracing::trace!(msg_type = ?other, "Unhandled WS message type");
219                        }
220                        Some(Err(e)) => {
221                            let detail = format_ws_error(&e);
222                            let _ = status_tx
223                                .send(AppEvent::LogMessage(format!("WS read error: {}", detail)))
224                                .await;
225                            return Err(anyhow::anyhow!("WebSocket read error: {}", detail));
226                        }
227                        None => {
228                            return Err(anyhow::anyhow!(
229                                "WebSocket stream ended unexpectedly (connection dropped)"
230                            ));
231                        }
232                    }
233                }
234                _ = shutdown.changed() => {
235                    // Send UNSUBSCRIBE before closing
236                    let unsub_msg = serde_json::json!({
237                        "method": "UNSUBSCRIBE",
238                        "params": streams,
239                        "id": 2
240                    });
241                    let _ = write
242                        .send(tungstenite::Message::Text(unsub_msg.to_string()))
243                        .await;
244                    let _ = write.send(tungstenite::Message::Close(None)).await;
245                    return Ok(());
246                }
247                _ = symbol_rx.changed() => {
248                    let _ = write.send(tungstenite::Message::Close(None)).await;
249                    return Err(anyhow::anyhow!("Symbol changed, reconnecting WebSocket"));
250                }
251            }
252        }
253    }
254
255    async fn handle_text_message(
256        &self,
257        text: &str,
258        display_symbol: &str,
259        tick_tx: &mpsc::Sender<Tick>,
260        status_tx: &mpsc::Sender<AppEvent>,
261    ) {
262        // Skip subscription confirmation responses like {"result":null,"id":1}
263        if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
264            if val.get("result").is_some() && val.get("id").is_some() {
265                tracing::debug!(id = %val["id"], "Subscription response received");
266                return;
267            }
268        }
269
270        match serde_json::from_str::<BinanceTradeEvent>(text) {
271            Ok(event) => {
272                let tick = Tick {
273                    symbol: display_symbol.to_string(),
274                    price: event.price,
275                    qty: event.qty,
276                    timestamp_ms: event.event_time,
277                    is_buyer_maker: event.is_buyer_maker,
278                    trade_id: event.trade_id,
279                };
280                if tick_tx.try_send(tick).is_err() {
281                    tracing::warn!("Tick channel full, dropping tick");
282                }
283            }
284            Err(e) => {
285                tracing::debug!(error = %e, raw = %text, "Failed to parse WS message");
286                let _ = status_tx
287                    .send(AppEvent::LogMessage(format!(
288                        "WS parse skip: {}",
289                        &text[..text.len().min(80)]
290                    )))
291                    .await;
292            }
293        }
294    }
295}
296
297fn parse_instrument_symbol(instrument: &str) -> (String, bool) {
298    let trimmed = instrument.trim();
299    if let Some(symbol) = trimmed.strip_suffix(" (FUT)") {
300        return (symbol.to_ascii_uppercase(), true);
301    }
302    (trimmed.to_ascii_uppercase(), false)
303}
304
305/// Format a tungstenite WebSocket error into a detailed, human-readable string.
306fn format_ws_error(err: &WsError) -> String {
307    match err {
308        WsError::ConnectionClosed => "Connection closed normally".to_string(),
309        WsError::AlreadyClosed => "Attempted operation on already-closed connection".to_string(),
310        WsError::Io(io_err) => {
311            format!("IO error [kind={}]: {}", io_err.kind(), io_err)
312        }
313        WsError::Tls(tls_err) => format!("TLS error: {}", tls_err),
314        WsError::Capacity(cap_err) => format!("Capacity error: {}", cap_err),
315        WsError::Protocol(proto_err) => {
316            let detail = match proto_err {
317                ProtocolError::ResetWithoutClosingHandshake => {
318                    "connection reset without closing handshake (server may have dropped)"
319                }
320                ProtocolError::SendAfterClosing => "tried to send after close frame",
321                ProtocolError::ReceivedAfterClosing => "received data after close frame",
322                ProtocolError::HandshakeIncomplete => "handshake incomplete",
323                _ => "",
324            };
325            if detail.is_empty() {
326                format!("Protocol error: {}", proto_err)
327            } else {
328                format!("Protocol error: {} ({})", proto_err, detail)
329            }
330        }
331        WsError::WriteBufferFull(_) => "Write buffer full (backpressure)".to_string(),
332        WsError::Utf8 => "UTF-8 encoding error in frame data".to_string(),
333        WsError::AttackAttempt => "Attack attempt detected by WebSocket library".to_string(),
334        WsError::Url(url_err) => {
335            let hint = match url_err {
336                UrlError::TlsFeatureNotEnabled => "TLS feature not compiled in",
337                UrlError::NoHostName => "no host name in URL",
338                UrlError::UnableToConnect(addr) => {
339                    return format!(
340                        "URL error: unable to connect to {} (DNS/network failure?)",
341                        addr
342                    );
343                }
344                UrlError::UnsupportedUrlScheme => "only ws:// or wss:// are supported",
345                UrlError::EmptyHostName => "empty host name in URL",
346                UrlError::NoPathOrQuery => "no path/query in URL",
347            };
348            format!("URL error: {} — {}", url_err, hint)
349        }
350        WsError::Http(resp) => {
351            let status = resp.status();
352            let body_preview = resp
353                .body()
354                .as_ref()
355                .and_then(|b| std::str::from_utf8(b).ok())
356                .unwrap_or("")
357                .chars()
358                .take(200)
359                .collect::<String>();
360            format!(
361                "HTTP error: status={} ({}), body=\"{}\"",
362                status.as_u16(),
363                status.canonical_reason().unwrap_or("unknown"),
364                body_preview
365            )
366        }
367        WsError::HttpFormat(e) => format!("HTTP format error: {}", e),
368    }
369}
370
371/// Format a WebSocket close code into a readable string with numeric value.
372fn format_close_code(code: &CloseCode) -> String {
373    let (num, label) = match code {
374        CloseCode::Normal => (1000, "Normal"),
375        CloseCode::Away => (1001, "Going Away"),
376        CloseCode::Protocol => (1002, "Protocol Error"),
377        CloseCode::Unsupported => (1003, "Unsupported Data"),
378        CloseCode::Status => (1005, "No Status"),
379        CloseCode::Abnormal => (1006, "Abnormal Closure"),
380        CloseCode::Invalid => (1007, "Invalid Payload"),
381        CloseCode::Policy => (1008, "Policy Violation"),
382        CloseCode::Size => (1009, "Message Too Big"),
383        CloseCode::Extension => (1010, "Extension Required"),
384        CloseCode::Error => (1011, "Internal Error"),
385        CloseCode::Restart => (1012, "Service Restart"),
386        CloseCode::Again => (1013, "Try Again Later"),
387        CloseCode::Tls => (1015, "TLS Handshake Failure"),
388        CloseCode::Reserved(n) => (*n, "Reserved"),
389        CloseCode::Iana(n) => (*n, "IANA"),
390        CloseCode::Library(n) => (*n, "Library"),
391        CloseCode::Bad(n) => (*n, "Bad"),
392    };
393    format!("{} ({})", num, label)
394}