Skip to main content

sandbox_quant/binance/
ws.rs

1use anyhow::Result;
2use futures_util::{SinkExt, StreamExt};
3use std::time::Duration;
4use tokio::sync::{mpsc, watch};
5use tokio_tungstenite::tungstenite;
6use tungstenite::error::{Error as WsError, ProtocolError, UrlError};
7use tungstenite::protocol::frame::coding::CloseCode;
8
9use super::types::BinanceTradeEvent;
10use crate::event::{AppEvent, WsConnectionStatus};
11use crate::model::tick::Tick;
12
13/// Exponential backoff for reconnection.
14struct ExponentialBackoff {
15    current: Duration,
16    initial: Duration,
17    max: Duration,
18    factor: f64,
19}
20
21impl ExponentialBackoff {
22    fn new(initial: Duration, max: Duration, factor: f64) -> Self {
23        Self {
24            current: initial,
25            initial,
26            max,
27            factor,
28        }
29    }
30
31    fn next_delay(&mut self) -> Duration {
32        let delay = self.current;
33        self.current = Duration::from_secs_f64(
34            (self.current.as_secs_f64() * self.factor).min(self.max.as_secs_f64()),
35        );
36        delay
37    }
38
39    fn reset(&mut self) {
40        self.current = self.initial;
41    }
42}
43
44#[derive(Clone)]
45pub struct BinanceWsClient {
46    spot_url: String,
47    futures_url: String,
48}
49
50impl BinanceWsClient {
51    /// Create a new WebSocket client.
52    ///
53    /// `ws_base_url` — e.g. `wss://stream.testnet.binance.vision/ws`
54    pub fn new(ws_base_url: &str, futures_ws_base_url: &str) -> Self {
55        Self {
56            spot_url: ws_base_url.to_string(),
57            futures_url: futures_ws_base_url.to_string(),
58        }
59    }
60
61    /// Connect and run the WebSocket loop with automatic reconnection.
62    /// Sends WsStatus events through `status_tx` and ticks through `tick_tx`.
63    pub async fn connect_and_run(
64        &self,
65        tick_tx: mpsc::Sender<Tick>,
66        status_tx: mpsc::Sender<AppEvent>,
67        mut symbol_rx: watch::Receiver<String>,
68        mut shutdown: watch::Receiver<bool>,
69    ) -> Result<()> {
70        let mut backoff =
71            ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
72        let mut attempt: u32 = 0;
73
74        loop {
75            attempt += 1;
76            let instrument = symbol_rx.borrow().clone();
77            let (symbol, is_futures) = parse_instrument_symbol(&instrument);
78            let streams = vec![format!("{}@trade", symbol.to_lowercase())];
79            let ws_url = if is_futures {
80                &self.futures_url
81            } else {
82                &self.spot_url
83            };
84            match self
85                .connect_once(
86                    ws_url,
87                    &streams,
88                    &instrument,
89                    &tick_tx,
90                    &status_tx,
91                    &mut symbol_rx,
92                    &mut shutdown,
93                )
94                .await
95            {
96                Ok(()) => {
97                    // Clean shutdown requested
98                    let _ = status_tx
99                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
100                        .await;
101                    break;
102                }
103                Err(e) => {
104                    let _ = status_tx
105                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
106                        .await;
107                    tracing::warn!(attempt, error = %e, "WS connection attempt failed");
108                    let _ = status_tx
109                        .send(AppEvent::LogMessage(format!(
110                            "WS error (attempt #{}): {}",
111                            attempt, e
112                        )))
113                        .await;
114
115                    let delay = backoff.next_delay();
116                    let _ = status_tx
117                        .send(AppEvent::WsStatus(WsConnectionStatus::Reconnecting {
118                            attempt,
119                            delay_ms: delay.as_millis() as u64,
120                        }))
121                        .await;
122
123                    tokio::select! {
124                        _ = tokio::time::sleep(delay) => continue,
125                        _ = shutdown.changed() => {
126                            let _ = status_tx
127                                .send(AppEvent::LogMessage("Shutdown during reconnect".to_string()))
128                                .await;
129                            break;
130                        }
131                    }
132                }
133            }
134        }
135        Ok(())
136    }
137
138    async fn connect_once(
139        &self,
140        ws_url: &str,
141        streams: &[String],
142        display_symbol: &str,
143        tick_tx: &mpsc::Sender<Tick>,
144        status_tx: &mpsc::Sender<AppEvent>,
145        symbol_rx: &mut watch::Receiver<String>,
146        shutdown: &mut watch::Receiver<bool>,
147    ) -> Result<()> {
148        let _ = status_tx
149            .send(AppEvent::LogMessage(format!("Connecting to {}", ws_url)))
150            .await;
151
152        let (ws_stream, resp) = tokio_tungstenite::connect_async(ws_url)
153            .await
154            .map_err(|e| {
155                let detail = format_ws_error(&e);
156                let _ = status_tx.try_send(AppEvent::LogMessage(detail.clone()));
157                anyhow::anyhow!("WebSocket connect failed: {}", detail)
158            })?;
159
160        tracing::debug!(status = %resp.status(), "WebSocket HTTP upgrade response");
161
162        let (mut write, mut read) = ws_stream.split();
163
164        // Send SUBSCRIBE message per Binance WebSocket API spec
165        let subscribe_msg = serde_json::json!({
166            "method": "SUBSCRIBE",
167            "params": streams,
168            "id": 1
169        });
170        write
171            .send(tungstenite::Message::Text(subscribe_msg.to_string()))
172            .await
173            .map_err(|e| {
174                let detail = format_ws_error(&e);
175                anyhow::anyhow!("Failed to send SUBSCRIBE: {}", detail)
176            })?;
177
178        let _ = status_tx
179            .send(AppEvent::LogMessage(format!(
180                "Subscribed to: {}",
181                streams.join(", ")
182            )))
183            .await;
184
185        // Send Connected AFTER successful subscription
186        let _ = status_tx
187            .send(AppEvent::WsStatus(WsConnectionStatus::Connected))
188            .await;
189
190        loop {
191            tokio::select! {
192                msg = read.next() => {
193                    match msg {
194                        Some(Ok(tungstenite::Message::Text(text))) => {
195                            self.handle_text_message(&text, display_symbol, tick_tx, status_tx).await;
196                        }
197                        Some(Ok(tungstenite::Message::Ping(_))) => {
198                            // tokio-tungstenite handles pong automatically
199                        }
200                        Some(Ok(tungstenite::Message::Close(frame))) => {
201                            let detail = match &frame {
202                                Some(cf) => format!(
203                                    "Server closed: code={} reason=\"{}\"",
204                                    format_close_code(&cf.code),
205                                    cf.reason
206                                ),
207                                None => "Server closed: no close frame".to_string(),
208                            };
209                            let _ = status_tx
210                                .send(AppEvent::LogMessage(detail.clone()))
211                                .await;
212                            return Err(anyhow::anyhow!("{}", detail));
213                        }
214                        Some(Ok(other)) => {
215                            tracing::trace!(msg_type = ?other, "Unhandled WS message type");
216                        }
217                        Some(Err(e)) => {
218                            let detail = format_ws_error(&e);
219                            let _ = status_tx
220                                .send(AppEvent::LogMessage(format!("WS read error: {}", detail)))
221                                .await;
222                            return Err(anyhow::anyhow!("WebSocket read error: {}", detail));
223                        }
224                        None => {
225                            return Err(anyhow::anyhow!(
226                                "WebSocket stream ended unexpectedly (connection dropped)"
227                            ));
228                        }
229                    }
230                }
231                _ = shutdown.changed() => {
232                    // Send UNSUBSCRIBE before closing
233                    let unsub_msg = serde_json::json!({
234                        "method": "UNSUBSCRIBE",
235                        "params": streams,
236                        "id": 2
237                    });
238                    let _ = write
239                        .send(tungstenite::Message::Text(unsub_msg.to_string()))
240                        .await;
241                    let _ = write.send(tungstenite::Message::Close(None)).await;
242                    return Ok(());
243                }
244                _ = symbol_rx.changed() => {
245                    let _ = write.send(tungstenite::Message::Close(None)).await;
246                    // In multi-worker mode, symbol channel closure means this worker is being retired.
247                    return Ok(());
248                }
249            }
250        }
251    }
252
253    async fn handle_text_message(
254        &self,
255        text: &str,
256        display_symbol: &str,
257        tick_tx: &mpsc::Sender<Tick>,
258        status_tx: &mpsc::Sender<AppEvent>,
259    ) {
260        // Skip subscription confirmation responses like {"result":null,"id":1}
261        if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
262            if val.get("result").is_some() && val.get("id").is_some() {
263                tracing::debug!(id = %val["id"], "Subscription response received");
264                return;
265            }
266        }
267
268        match serde_json::from_str::<BinanceTradeEvent>(text) {
269            Ok(event) => {
270                let tick = Tick {
271                    symbol: display_symbol.to_string(),
272                    price: event.price,
273                    qty: event.qty,
274                    timestamp_ms: event.event_time,
275                    is_buyer_maker: event.is_buyer_maker,
276                    trade_id: event.trade_id,
277                };
278                if tick_tx.try_send(tick).is_err() {
279                    tracing::warn!("Tick channel full, dropping tick");
280                    let _ = status_tx.try_send(AppEvent::TickDropped);
281                }
282            }
283            Err(e) => {
284                tracing::debug!(error = %e, raw = %text, "Failed to parse WS message");
285                let _ = status_tx
286                    .send(AppEvent::LogMessage(format!(
287                        "WS parse skip: {}",
288                        &text[..text.len().min(80)]
289                    )))
290                    .await;
291            }
292        }
293    }
294}
295
296fn parse_instrument_symbol(instrument: &str) -> (String, bool) {
297    let trimmed = instrument.trim();
298    if let Some(symbol) = trimmed.strip_suffix(" (FUT)") {
299        return (symbol.to_ascii_uppercase(), true);
300    }
301    (trimmed.to_ascii_uppercase(), false)
302}
303
304/// Format a tungstenite WebSocket error into a detailed, human-readable string.
305fn format_ws_error(err: &WsError) -> String {
306    match err {
307        WsError::ConnectionClosed => "Connection closed normally".to_string(),
308        WsError::AlreadyClosed => "Attempted operation on already-closed connection".to_string(),
309        WsError::Io(io_err) => {
310            format!("IO error [kind={}]: {}", io_err.kind(), io_err)
311        }
312        WsError::Tls(tls_err) => format!("TLS error: {}", tls_err),
313        WsError::Capacity(cap_err) => format!("Capacity error: {}", cap_err),
314        WsError::Protocol(proto_err) => {
315            let detail = match proto_err {
316                ProtocolError::ResetWithoutClosingHandshake => {
317                    "connection reset without closing handshake (server may have dropped)"
318                }
319                ProtocolError::SendAfterClosing => "tried to send after close frame",
320                ProtocolError::ReceivedAfterClosing => "received data after close frame",
321                ProtocolError::HandshakeIncomplete => "handshake incomplete",
322                _ => "",
323            };
324            if detail.is_empty() {
325                format!("Protocol error: {}", proto_err)
326            } else {
327                format!("Protocol error: {} ({})", proto_err, detail)
328            }
329        }
330        WsError::WriteBufferFull(_) => "Write buffer full (backpressure)".to_string(),
331        WsError::Utf8 => "UTF-8 encoding error in frame data".to_string(),
332        WsError::AttackAttempt => "Attack attempt detected by WebSocket library".to_string(),
333        WsError::Url(url_err) => {
334            let hint = match url_err {
335                UrlError::TlsFeatureNotEnabled => "TLS feature not compiled in",
336                UrlError::NoHostName => "no host name in URL",
337                UrlError::UnableToConnect(addr) => {
338                    return format!(
339                        "URL error: unable to connect to {} (DNS/network failure?)",
340                        addr
341                    );
342                }
343                UrlError::UnsupportedUrlScheme => "only ws:// or wss:// are supported",
344                UrlError::EmptyHostName => "empty host name in URL",
345                UrlError::NoPathOrQuery => "no path/query in URL",
346            };
347            format!("URL error: {} — {}", url_err, hint)
348        }
349        WsError::Http(resp) => {
350            let status = resp.status();
351            let body_preview = resp
352                .body()
353                .as_ref()
354                .and_then(|b| std::str::from_utf8(b).ok())
355                .unwrap_or("")
356                .chars()
357                .take(200)
358                .collect::<String>();
359            format!(
360                "HTTP error: status={} ({}), body=\"{}\"",
361                status.as_u16(),
362                status.canonical_reason().unwrap_or("unknown"),
363                body_preview
364            )
365        }
366        WsError::HttpFormat(e) => format!("HTTP format error: {}", e),
367    }
368}
369
370/// Format a WebSocket close code into a readable string with numeric value.
371fn format_close_code(code: &CloseCode) -> String {
372    let (num, label) = match code {
373        CloseCode::Normal => (1000, "Normal"),
374        CloseCode::Away => (1001, "Going Away"),
375        CloseCode::Protocol => (1002, "Protocol Error"),
376        CloseCode::Unsupported => (1003, "Unsupported Data"),
377        CloseCode::Status => (1005, "No Status"),
378        CloseCode::Abnormal => (1006, "Abnormal Closure"),
379        CloseCode::Invalid => (1007, "Invalid Payload"),
380        CloseCode::Policy => (1008, "Policy Violation"),
381        CloseCode::Size => (1009, "Message Too Big"),
382        CloseCode::Extension => (1010, "Extension Required"),
383        CloseCode::Error => (1011, "Internal Error"),
384        CloseCode::Restart => (1012, "Service Restart"),
385        CloseCode::Again => (1013, "Try Again Later"),
386        CloseCode::Tls => (1015, "TLS Handshake Failure"),
387        CloseCode::Reserved(n) => (*n, "Reserved"),
388        CloseCode::Iana(n) => (*n, "IANA"),
389        CloseCode::Library(n) => (*n, "Library"),
390        CloseCode::Bad(n) => (*n, "Bad"),
391    };
392    format!("{} ({})", num, label)
393}