1use anyhow::Result;
2use futures_util::{SinkExt, StreamExt};
3use std::time::Duration;
4use tokio::sync::{mpsc, watch};
5use tokio_tungstenite::tungstenite;
6use tungstenite::error::{Error as WsError, ProtocolError, UrlError};
7use tungstenite::protocol::frame::coding::CloseCode;
8
9use super::types::BinanceTradeEvent;
10use crate::event::{AppEvent, LogDomain, LogLevel, LogRecord, WsConnectionStatus};
11use crate::model::tick::Tick;
12
13struct ExponentialBackoff {
15 current: Duration,
16 max: Duration,
17 factor: f64,
18}
19
20impl ExponentialBackoff {
21 fn new(initial: Duration, max: Duration, factor: f64) -> Self {
22 Self {
23 current: initial,
24 max,
25 factor,
26 }
27 }
28
29 fn next_delay(&mut self) -> Duration {
30 let delay = self.current;
31 self.current = Duration::from_secs_f64(
32 (self.current.as_secs_f64() * self.factor).min(self.max.as_secs_f64()),
33 );
34 delay
35 }
36}
37
38#[derive(Clone)]
39pub struct BinanceWsClient {
40 spot_url: String,
41 futures_url: String,
42}
43
44impl BinanceWsClient {
45 pub fn new(ws_base_url: &str, futures_ws_base_url: &str) -> Self {
49 Self {
50 spot_url: ws_base_url.to_string(),
51 futures_url: futures_ws_base_url.to_string(),
52 }
53 }
54
55 pub async fn connect_and_run(
58 &self,
59 tick_tx: mpsc::Sender<Tick>,
60 status_tx: mpsc::Sender<AppEvent>,
61 mut symbol_rx: watch::Receiver<String>,
62 mut shutdown: watch::Receiver<bool>,
63 ) -> Result<()> {
64 let mut backoff =
65 ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
66 let mut attempt: u32 = 0;
67
68 loop {
69 attempt += 1;
70 let instrument = symbol_rx.borrow().clone();
71 let (symbol, is_futures) = parse_instrument_symbol(&instrument);
72 let streams = vec![format!("{}@trade", symbol.to_lowercase())];
73 let ws_url = if is_futures {
74 &self.futures_url
75 } else {
76 &self.spot_url
77 };
78 match self
79 .connect_once(
80 ws_url,
81 &streams,
82 &instrument,
83 &tick_tx,
84 &status_tx,
85 &mut symbol_rx,
86 &mut shutdown,
87 )
88 .await
89 {
90 Ok(()) => {
91 let _ = status_tx
93 .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
94 .await;
95 break;
96 }
97 Err(e) => {
98 let _ = status_tx
99 .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
100 .await;
101 tracing::warn!(attempt, error = %e, "WS connection attempt failed");
102 let _ = status_tx
103 .send(AppEvent::LogRecord(ws_log(
104 LogLevel::Warn,
105 "connect.fail",
106 &instrument,
107 format!("attempt={} error={}", attempt, e),
108 )))
109 .await;
110
111 let delay = backoff.next_delay();
112 let _ = status_tx
113 .send(AppEvent::WsStatus(WsConnectionStatus::Reconnecting {
114 attempt,
115 delay_ms: delay.as_millis() as u64,
116 }))
117 .await;
118
119 tokio::select! {
120 _ = tokio::time::sleep(delay) => continue,
121 _ = shutdown.changed() => {
122 let _ = status_tx
123 .send(AppEvent::LogRecord(ws_log(
124 LogLevel::Info,
125 "shutdown.during_reconnect",
126 &instrument,
127 "shutdown signal received during reconnect wait".to_string(),
128 )))
129 .await;
130 break;
131 }
132 }
133 }
134 }
135 }
136 Ok(())
137 }
138
139 async fn connect_once(
140 &self,
141 ws_url: &str,
142 streams: &[String],
143 display_symbol: &str,
144 tick_tx: &mpsc::Sender<Tick>,
145 status_tx: &mpsc::Sender<AppEvent>,
146 symbol_rx: &mut watch::Receiver<String>,
147 shutdown: &mut watch::Receiver<bool>,
148 ) -> Result<()> {
149 let _ = status_tx
150 .send(AppEvent::LogRecord(ws_log(
151 LogLevel::Info,
152 "connect.start",
153 display_symbol,
154 format!("url={}", ws_url),
155 )))
156 .await;
157
158 let (ws_stream, resp) = tokio_tungstenite::connect_async(ws_url)
159 .await
160 .map_err(|e| {
161 let detail = format_ws_error(&e);
162 let _ = status_tx.try_send(AppEvent::LogRecord(ws_log(
163 LogLevel::Warn,
164 "connect.detail",
165 display_symbol,
166 detail.clone(),
167 )));
168 anyhow::anyhow!("WebSocket connect failed: {}", detail)
169 })?;
170
171 tracing::debug!(status = %resp.status(), "WebSocket HTTP upgrade response");
172
173 let (mut write, mut read) = ws_stream.split();
174
175 let subscribe_msg = serde_json::json!({
177 "method": "SUBSCRIBE",
178 "params": streams,
179 "id": 1
180 });
181 write
182 .send(tungstenite::Message::Text(subscribe_msg.to_string()))
183 .await
184 .map_err(|e| {
185 let detail = format_ws_error(&e);
186 anyhow::anyhow!("Failed to send SUBSCRIBE: {}", detail)
187 })?;
188
189 let _ = status_tx
190 .send(AppEvent::LogRecord(ws_log(
191 LogLevel::Info,
192 "subscribe.ok",
193 display_symbol,
194 format!("streams={}", streams.join(",")),
195 )))
196 .await;
197
198 let _ = status_tx
200 .send(AppEvent::WsStatus(WsConnectionStatus::Connected))
201 .await;
202
203 loop {
204 tokio::select! {
205 msg = read.next() => {
206 match msg {
207 Some(Ok(tungstenite::Message::Text(text))) => {
208 self.handle_text_message(&text, display_symbol, tick_tx, status_tx).await;
209 }
210 Some(Ok(tungstenite::Message::Ping(_))) => {
211 }
213 Some(Ok(tungstenite::Message::Close(frame))) => {
214 let detail = match &frame {
215 Some(cf) => format!(
216 "Server closed: code={} reason=\"{}\"",
217 format_close_code(&cf.code),
218 cf.reason
219 ),
220 None => "Server closed: no close frame".to_string(),
221 };
222 let _ = status_tx
223 .send(AppEvent::LogRecord(ws_log(
224 LogLevel::Warn,
225 "server.closed",
226 display_symbol,
227 detail.clone(),
228 )))
229 .await;
230 return Err(anyhow::anyhow!("{}", detail));
231 }
232 Some(Ok(other)) => {
233 tracing::trace!(msg_type = ?other, "Unhandled WS message type");
234 }
235 Some(Err(e)) => {
236 let detail = format_ws_error(&e);
237 let _ = status_tx
238 .send(AppEvent::LogRecord(ws_log(
239 LogLevel::Warn,
240 "read.error",
241 display_symbol,
242 detail.clone(),
243 )))
244 .await;
245 return Err(anyhow::anyhow!("WebSocket read error: {}", detail));
246 }
247 None => {
248 return Err(anyhow::anyhow!(
249 "WebSocket stream ended unexpectedly (connection dropped)"
250 ));
251 }
252 }
253 }
254 _ = shutdown.changed() => {
255 let unsub_msg = serde_json::json!({
257 "method": "UNSUBSCRIBE",
258 "params": streams,
259 "id": 2
260 });
261 let _ = write
262 .send(tungstenite::Message::Text(unsub_msg.to_string()))
263 .await;
264 let _ = write.send(tungstenite::Message::Close(None)).await;
265 return Ok(());
266 }
267 _ = symbol_rx.changed() => {
268 let _ = write.send(tungstenite::Message::Close(None)).await;
269 let _ = status_tx
271 .send(AppEvent::LogRecord(ws_log(
272 LogLevel::Info,
273 "worker.retired",
274 display_symbol,
275 "symbol channel closed".to_string(),
276 )))
277 .await;
278 return Ok(());
279 }
280 }
281 }
282 }
283
284 async fn handle_text_message(
285 &self,
286 text: &str,
287 display_symbol: &str,
288 tick_tx: &mpsc::Sender<Tick>,
289 status_tx: &mpsc::Sender<AppEvent>,
290 ) {
291 if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
293 if val.get("result").is_some() && val.get("id").is_some() {
294 tracing::debug!(id = %val["id"], "Subscription response received");
295 return;
296 }
297 }
298
299 match serde_json::from_str::<BinanceTradeEvent>(text) {
300 Ok(event) => {
301 let tick = Tick {
302 symbol: display_symbol.to_string(),
303 price: event.price,
304 qty: event.qty,
305 timestamp_ms: event.event_time,
306 is_buyer_maker: event.is_buyer_maker,
307 trade_id: event.trade_id,
308 };
309 if tick_tx.try_send(tick).is_err() {
310 tracing::warn!("Tick channel full, dropping tick");
311 let _ = status_tx.try_send(AppEvent::TickDropped);
312 }
313 }
314 Err(e) => {
315 tracing::debug!(error = %e, raw = %text, "Failed to parse WS message");
316 let _ = status_tx
317 .send(AppEvent::LogRecord(ws_log(
318 LogLevel::Debug,
319 "parse.skip",
320 display_symbol,
321 format!("payload={}", &text[..text.len().min(80)]),
322 )))
323 .await;
324 }
325 }
326 }
327}
328
329fn parse_instrument_symbol(instrument: &str) -> (String, bool) {
330 let trimmed = instrument.trim();
331 if let Some(symbol) = trimmed.strip_suffix(" (FUT)") {
332 return (symbol.to_ascii_uppercase(), true);
333 }
334 (trimmed.to_ascii_uppercase(), false)
335}
336
337fn ws_log(level: LogLevel, event: &'static str, symbol: &str, msg: String) -> LogRecord {
338 let mut record = LogRecord::new(level, LogDomain::Ws, event, msg);
339 record.symbol = Some(symbol.to_string());
340 record
341}
342
343fn format_ws_error(err: &WsError) -> String {
345 match err {
346 WsError::ConnectionClosed => "Connection closed normally".to_string(),
347 WsError::AlreadyClosed => "Attempted operation on already-closed connection".to_string(),
348 WsError::Io(io_err) => {
349 format!("IO error [kind={}]: {}", io_err.kind(), io_err)
350 }
351 WsError::Tls(tls_err) => format!("TLS error: {}", tls_err),
352 WsError::Capacity(cap_err) => format!("Capacity error: {}", cap_err),
353 WsError::Protocol(proto_err) => {
354 let detail = match proto_err {
355 ProtocolError::ResetWithoutClosingHandshake => {
356 "connection reset without closing handshake (server may have dropped)"
357 }
358 ProtocolError::SendAfterClosing => "tried to send after close frame",
359 ProtocolError::ReceivedAfterClosing => "received data after close frame",
360 ProtocolError::HandshakeIncomplete => "handshake incomplete",
361 _ => "",
362 };
363 if detail.is_empty() {
364 format!("Protocol error: {}", proto_err)
365 } else {
366 format!("Protocol error: {} ({})", proto_err, detail)
367 }
368 }
369 WsError::WriteBufferFull(_) => "Write buffer full (backpressure)".to_string(),
370 WsError::Utf8 => "UTF-8 encoding error in frame data".to_string(),
371 WsError::AttackAttempt => "Attack attempt detected by WebSocket library".to_string(),
372 WsError::Url(url_err) => {
373 let hint = match url_err {
374 UrlError::TlsFeatureNotEnabled => "TLS feature not compiled in",
375 UrlError::NoHostName => "no host name in URL",
376 UrlError::UnableToConnect(addr) => {
377 return format!(
378 "URL error: unable to connect to {} (DNS/network failure?)",
379 addr
380 );
381 }
382 UrlError::UnsupportedUrlScheme => "only ws:// or wss:// are supported",
383 UrlError::EmptyHostName => "empty host name in URL",
384 UrlError::NoPathOrQuery => "no path/query in URL",
385 };
386 format!("URL error: {} — {}", url_err, hint)
387 }
388 WsError::Http(resp) => {
389 let status = resp.status();
390 let body_preview = resp
391 .body()
392 .as_ref()
393 .and_then(|b| std::str::from_utf8(b).ok())
394 .unwrap_or("")
395 .chars()
396 .take(200)
397 .collect::<String>();
398 format!(
399 "HTTP error: status={} ({}), body=\"{}\"",
400 status.as_u16(),
401 status.canonical_reason().unwrap_or("unknown"),
402 body_preview
403 )
404 }
405 WsError::HttpFormat(e) => format!("HTTP format error: {}", e),
406 }
407}
408
409fn format_close_code(code: &CloseCode) -> String {
411 let (num, label) = match code {
412 CloseCode::Normal => (1000, "Normal"),
413 CloseCode::Away => (1001, "Going Away"),
414 CloseCode::Protocol => (1002, "Protocol Error"),
415 CloseCode::Unsupported => (1003, "Unsupported Data"),
416 CloseCode::Status => (1005, "No Status"),
417 CloseCode::Abnormal => (1006, "Abnormal Closure"),
418 CloseCode::Invalid => (1007, "Invalid Payload"),
419 CloseCode::Policy => (1008, "Policy Violation"),
420 CloseCode::Size => (1009, "Message Too Big"),
421 CloseCode::Extension => (1010, "Extension Required"),
422 CloseCode::Error => (1011, "Internal Error"),
423 CloseCode::Restart => (1012, "Service Restart"),
424 CloseCode::Again => (1013, "Try Again Later"),
425 CloseCode::Tls => (1015, "TLS Handshake Failure"),
426 CloseCode::Reserved(n) => (*n, "Reserved"),
427 CloseCode::Iana(n) => (*n, "IANA"),
428 CloseCode::Library(n) => (*n, "Library"),
429 CloseCode::Bad(n) => (*n, "Bad"),
430 };
431 format!("{} ({})", num, label)
432}