Skip to main content

sandbox_quant/binance/
ws.rs

1use anyhow::Result;
2use futures_util::{SinkExt, StreamExt};
3use std::time::Duration;
4use tokio::sync::{mpsc, watch};
5use tokio_tungstenite::tungstenite;
6use tungstenite::error::{Error as WsError, ProtocolError, UrlError};
7use tungstenite::protocol::frame::coding::CloseCode;
8
9use super::types::BinanceTradeEvent;
10use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord, WsConnectionStatus};
11use crate::model::tick::Tick;
12
13/// Exponential backoff for reconnection.
14struct ExponentialBackoff {
15    current: Duration,
16    max: Duration,
17    factor: f64,
18}
19
20impl ExponentialBackoff {
21    fn new(initial: Duration, max: Duration, factor: f64) -> Self {
22        Self {
23            current: initial,
24            max,
25            factor,
26        }
27    }
28
29    fn next_delay(&mut self) -> Duration {
30        let delay = self.current;
31        self.current = Duration::from_secs_f64(
32            (self.current.as_secs_f64() * self.factor).min(self.max.as_secs_f64()),
33        );
34        delay
35    }
36}
37
38#[derive(Clone)]
39pub struct BinanceWsClient {
40    spot_url: String,
41    futures_url: String,
42}
43
44impl BinanceWsClient {
45    /// Create a new WebSocket client.
46    ///
47    /// `ws_base_url` — e.g. `wss://stream.testnet.binance.vision/ws`
48    pub fn new(ws_base_url: &str, futures_ws_base_url: &str) -> Self {
49        Self {
50            spot_url: ws_base_url.to_string(),
51            futures_url: futures_ws_base_url.to_string(),
52        }
53    }
54
55    /// Connect and run the WebSocket loop with automatic reconnection.
56    /// Sends WsStatus events through `status_tx` and ticks through `tick_tx`.
57    pub async fn connect_and_run(
58        &self,
59        tick_tx: mpsc::Sender<Tick>,
60        status_tx: mpsc::Sender<AppEvent>,
61        mut symbol_rx: watch::Receiver<String>,
62        mut shutdown: watch::Receiver<bool>,
63    ) -> Result<()> {
64        let mut backoff =
65            ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
66        let mut attempt: u32 = 0;
67
68        loop {
69            attempt += 1;
70            let instrument = symbol_rx.borrow().clone();
71            let (symbol, is_futures) = parse_instrument_symbol(&instrument);
72            let streams = vec![format!("{}@trade", symbol.to_lowercase())];
73            let ws_url = if is_futures {
74                &self.futures_url
75            } else {
76                &self.spot_url
77            };
78            match self
79                .connect_once(
80                    ws_url,
81                    &streams,
82                    &instrument,
83                    &tick_tx,
84                    &status_tx,
85                    &mut symbol_rx,
86                    &mut shutdown,
87                )
88                .await
89            {
90                Ok(()) => {
91                    // Clean shutdown requested
92                    let _ = status_tx
93                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
94                        .await;
95                    break;
96                }
97                Err(e) => {
98                    let _ = status_tx
99                        .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
100                        .await;
101                    tracing::warn!(attempt, error = %e, "WS connection attempt failed");
102                    let _ = status_tx
103                        .send(AppEvent::LogRecord(ws_log(
104                            LogLevel::Warn,
105                            "connect.fail",
106                            &instrument,
107                            format!("attempt={} error={}", attempt, e),
108                        )))
109                        .await;
110
111                    let delay = backoff.next_delay();
112                    let _ = status_tx
113                        .send(AppEvent::WsStatus(WsConnectionStatus::Reconnecting {
114                            attempt,
115                            delay_ms: delay.as_millis() as u64,
116                        }))
117                        .await;
118
119                    tokio::select! {
120                        _ = tokio::time::sleep(delay) => continue,
121                        _ = shutdown.changed() => {
122                            let _ = status_tx
123                                .send(AppEvent::LogRecord(ws_log(
124                                    LogLevel::Info,
125                                    "shutdown.during_reconnect",
126                                    &instrument,
127                                    "shutdown signal received during reconnect wait".to_string(),
128                                )))
129                                .await;
130                            break;
131                        }
132                    }
133                }
134            }
135        }
136        Ok(())
137    }
138
139    async fn connect_once(
140        &self,
141        ws_url: &str,
142        streams: &[String],
143        display_symbol: &str,
144        tick_tx: &mpsc::Sender<Tick>,
145        status_tx: &mpsc::Sender<AppEvent>,
146        symbol_rx: &mut watch::Receiver<String>,
147        shutdown: &mut watch::Receiver<bool>,
148    ) -> Result<()> {
149        let _ = status_tx
150            .send(AppEvent::LogRecord(ws_log(
151                LogLevel::Info,
152                "connect.start",
153                display_symbol,
154                format!("url={}", ws_url),
155            )))
156            .await;
157
158        let (ws_stream, resp) = tokio_tungstenite::connect_async(ws_url)
159            .await
160            .map_err(|e| {
161                let detail = format_ws_error(&e);
162                let _ = status_tx.try_send(AppEvent::LogRecord(ws_log(
163                    LogLevel::Warn,
164                    "connect.detail",
165                    display_symbol,
166                    detail.clone(),
167                )));
168                anyhow::anyhow!("WebSocket connect failed: {}", detail)
169            })?;
170
171        tracing::debug!(status = %resp.status(), "WebSocket HTTP upgrade response");
172
173        let (mut write, mut read) = ws_stream.split();
174
175        // Send SUBSCRIBE message per Binance WebSocket API spec
176        let subscribe_msg = serde_json::json!({
177            "method": "SUBSCRIBE",
178            "params": streams,
179            "id": 1
180        });
181        write
182            .send(tungstenite::Message::Text(subscribe_msg.to_string()))
183            .await
184            .map_err(|e| {
185                let detail = format_ws_error(&e);
186                anyhow::anyhow!("Failed to send SUBSCRIBE: {}", detail)
187            })?;
188
189        let _ = status_tx
190            .send(AppEvent::LogRecord(ws_log(
191                LogLevel::Info,
192                "subscribe.ok",
193                display_symbol,
194                format!("streams={}", streams.join(",")),
195            )))
196            .await;
197
198        // Send Connected AFTER successful subscription
199        let _ = status_tx
200            .send(AppEvent::WsStatus(WsConnectionStatus::Connected))
201            .await;
202
203        loop {
204            tokio::select! {
205                msg = read.next() => {
206                    match msg {
207                        Some(Ok(tungstenite::Message::Text(text))) => {
208                            self.handle_text_message(&text, display_symbol, tick_tx, status_tx).await;
209                        }
210                        Some(Ok(tungstenite::Message::Ping(_))) => {
211                            // tokio-tungstenite handles pong automatically
212                        }
213                        Some(Ok(tungstenite::Message::Close(frame))) => {
214                            let detail = match &frame {
215                                Some(cf) => format!(
216                                    "Server closed: code={} reason=\"{}\"",
217                                    format_close_code(&cf.code),
218                                    cf.reason
219                                ),
220                                None => "Server closed: no close frame".to_string(),
221                            };
222                            let _ = status_tx
223                                .send(AppEvent::LogRecord(ws_log(
224                                    LogLevel::Warn,
225                                    "server.closed",
226                                    display_symbol,
227                                    detail.clone(),
228                                )))
229                                .await;
230                            return Err(anyhow::anyhow!("{}", detail));
231                        }
232                        Some(Ok(other)) => {
233                            tracing::trace!(msg_type = ?other, "Unhandled WS message type");
234                        }
235                        Some(Err(e)) => {
236                            let detail = format_ws_error(&e);
237                            let _ = status_tx
238                                .send(AppEvent::LogRecord(ws_log(
239                                    LogLevel::Warn,
240                                    "read.error",
241                                    display_symbol,
242                                    detail.clone(),
243                                )))
244                                .await;
245                            return Err(anyhow::anyhow!("WebSocket read error: {}", detail));
246                        }
247                        None => {
248                            return Err(anyhow::anyhow!(
249                                "WebSocket stream ended unexpectedly (connection dropped)"
250                            ));
251                        }
252                    }
253                }
254                _ = shutdown.changed() => {
255                    // Send UNSUBSCRIBE before closing
256                    let unsub_msg = serde_json::json!({
257                        "method": "UNSUBSCRIBE",
258                        "params": streams,
259                        "id": 2
260                    });
261                    let _ = write
262                        .send(tungstenite::Message::Text(unsub_msg.to_string()))
263                        .await;
264                    let _ = write.send(tungstenite::Message::Close(None)).await;
265                    return Ok(());
266                }
267                _ = symbol_rx.changed() => {
268                    let _ = write.send(tungstenite::Message::Close(None)).await;
269                    // In multi-worker mode, symbol channel closure means this worker is being retired.
270                    let _ = status_tx
271                        .send(AppEvent::LogRecord(ws_log(
272                            LogLevel::Info,
273                            "worker.retired",
274                            display_symbol,
275                            "symbol channel closed".to_string(),
276                        )))
277                        .await;
278                    return Ok(());
279                }
280            }
281        }
282    }
283
284    async fn handle_text_message(
285        &self,
286        text: &str,
287        display_symbol: &str,
288        tick_tx: &mpsc::Sender<Tick>,
289        status_tx: &mpsc::Sender<AppEvent>,
290    ) {
291        // Skip subscription confirmation responses like {"result":null,"id":1}
292        if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
293            if val.get("result").is_some() && val.get("id").is_some() {
294                tracing::debug!(id = %val["id"], "Subscription response received");
295                return;
296            }
297        }
298
299        match serde_json::from_str::<BinanceTradeEvent>(text) {
300            Ok(event) => {
301                let tick = Tick {
302                    symbol: display_symbol.to_string(),
303                    price: event.price,
304                    qty: event.qty,
305                    timestamp_ms: event.event_time,
306                    is_buyer_maker: event.is_buyer_maker,
307                    trade_id: event.trade_id,
308                };
309                if tick_tx.try_send(tick).is_err() {
310                    tracing::warn!("Tick channel full, dropping tick");
311                    let _ = status_tx.try_send(AppEvent::TickDropped);
312                }
313            }
314            Err(e) => {
315                tracing::debug!(error = %e, raw = %text, "Failed to parse WS message");
316                let _ = status_tx
317                    .send(AppEvent::LogRecord(ws_log(
318                        LogLevel::Debug,
319                        "parse.skip",
320                        display_symbol,
321                        format!("payload={}", &text[..text.len().min(80)]),
322                    )))
323                    .await;
324            }
325        }
326    }
327}
328
329fn parse_instrument_symbol(instrument: &str) -> (String, bool) {
330    let trimmed = instrument.trim();
331    if let Some(symbol) = trimmed.strip_suffix(" (FUT)") {
332        return (symbol.to_ascii_uppercase(), true);
333    }
334    (trimmed.to_ascii_uppercase(), false)
335}
336
337fn ws_log(level: LogLevel, event: &'static str, symbol: &str, msg: String) -> LogRecord {
338    let mut record = LogRecord::new(level, LogDomain::Ws, event, msg);
339    record.symbol = Some(symbol.to_string());
340    record
341}
342
343/// Format a tungstenite WebSocket error into a detailed, human-readable string.
344fn format_ws_error(err: &WsError) -> String {
345    match err {
346        WsError::ConnectionClosed => "Connection closed normally".to_string(),
347        WsError::AlreadyClosed => "Attempted operation on already-closed connection".to_string(),
348        WsError::Io(io_err) => {
349            format!("IO error [kind={}]: {}", io_err.kind(), io_err)
350        }
351        WsError::Tls(tls_err) => format!("TLS error: {}", tls_err),
352        WsError::Capacity(cap_err) => format!("Capacity error: {}", cap_err),
353        WsError::Protocol(proto_err) => {
354            let detail = match proto_err {
355                ProtocolError::ResetWithoutClosingHandshake => {
356                    "connection reset without closing handshake (server may have dropped)"
357                }
358                ProtocolError::SendAfterClosing => "tried to send after close frame",
359                ProtocolError::ReceivedAfterClosing => "received data after close frame",
360                ProtocolError::HandshakeIncomplete => "handshake incomplete",
361                _ => "",
362            };
363            if detail.is_empty() {
364                format!("Protocol error: {}", proto_err)
365            } else {
366                format!("Protocol error: {} ({})", proto_err, detail)
367            }
368        }
369        WsError::WriteBufferFull(_) => "Write buffer full (backpressure)".to_string(),
370        WsError::Utf8 => "UTF-8 encoding error in frame data".to_string(),
371        WsError::AttackAttempt => "Attack attempt detected by WebSocket library".to_string(),
372        WsError::Url(url_err) => {
373            let hint = match url_err {
374                UrlError::TlsFeatureNotEnabled => "TLS feature not compiled in",
375                UrlError::NoHostName => "no host name in URL",
376                UrlError::UnableToConnect(addr) => {
377                    return format!(
378                        "URL error: unable to connect to {} (DNS/network failure?)",
379                        addr
380                    );
381                }
382                UrlError::UnsupportedUrlScheme => "only ws:// or wss:// are supported",
383                UrlError::EmptyHostName => "empty host name in URL",
384                UrlError::NoPathOrQuery => "no path/query in URL",
385            };
386            format!("URL error: {} — {}", url_err, hint)
387        }
388        WsError::Http(resp) => {
389            let status = resp.status();
390            let body_preview = resp
391                .body()
392                .as_ref()
393                .and_then(|b| std::str::from_utf8(b).ok())
394                .unwrap_or("")
395                .chars()
396                .take(200)
397                .collect::<String>();
398            format!(
399                "HTTP error: status={} ({}), body=\"{}\"",
400                status.as_u16(),
401                status.canonical_reason().unwrap_or("unknown"),
402                body_preview
403            )
404        }
405        WsError::HttpFormat(e) => format!("HTTP format error: {}", e),
406    }
407}
408
409/// Format a WebSocket close code into a readable string with numeric value.
410fn format_close_code(code: &CloseCode) -> String {
411    let (num, label) = match code {
412        CloseCode::Normal => (1000, "Normal"),
413        CloseCode::Away => (1001, "Going Away"),
414        CloseCode::Protocol => (1002, "Protocol Error"),
415        CloseCode::Unsupported => (1003, "Unsupported Data"),
416        CloseCode::Status => (1005, "No Status"),
417        CloseCode::Abnormal => (1006, "Abnormal Closure"),
418        CloseCode::Invalid => (1007, "Invalid Payload"),
419        CloseCode::Policy => (1008, "Policy Violation"),
420        CloseCode::Size => (1009, "Message Too Big"),
421        CloseCode::Extension => (1010, "Extension Required"),
422        CloseCode::Error => (1011, "Internal Error"),
423        CloseCode::Restart => (1012, "Service Restart"),
424        CloseCode::Again => (1013, "Try Again Later"),
425        CloseCode::Tls => (1015, "TLS Handshake Failure"),
426        CloseCode::Reserved(n) => (*n, "Reserved"),
427        CloseCode::Iana(n) => (*n, "IANA"),
428        CloseCode::Library(n) => (*n, "Library"),
429        CloseCode::Bad(n) => (*n, "Bad"),
430    };
431    format!("{} ({})", num, label)
432}