1use anyhow::Result;
2use futures_util::{SinkExt, StreamExt};
3use std::time::Duration;
4use tokio::sync::{mpsc, watch};
5use tokio_tungstenite::tungstenite;
6use tungstenite::error::{Error as WsError, ProtocolError, UrlError};
7use tungstenite::protocol::frame::coding::CloseCode;
8
9use super::types::BinanceTradeEvent;
10use crate::event::{AppEvent, WsConnectionStatus};
11use crate::model::tick::Tick;
12
13struct ExponentialBackoff {
15 current: Duration,
16 initial: Duration,
17 max: Duration,
18 factor: f64,
19}
20
21impl ExponentialBackoff {
22 fn new(initial: Duration, max: Duration, factor: f64) -> Self {
23 Self {
24 current: initial,
25 initial,
26 max,
27 factor,
28 }
29 }
30
31 fn next_delay(&mut self) -> Duration {
32 let delay = self.current;
33 self.current = Duration::from_secs_f64(
34 (self.current.as_secs_f64() * self.factor).min(self.max.as_secs_f64()),
35 );
36 delay
37 }
38
39 fn reset(&mut self) {
40 self.current = self.initial;
41 }
42}
43
44#[derive(Clone)]
45pub struct BinanceWsClient {
46 spot_url: String,
47 futures_url: String,
48}
49
50impl BinanceWsClient {
51 pub fn new(ws_base_url: &str, futures_ws_base_url: &str) -> Self {
55 Self {
56 spot_url: ws_base_url.to_string(),
57 futures_url: futures_ws_base_url.to_string(),
58 }
59 }
60
61 pub async fn connect_and_run(
64 &self,
65 tick_tx: mpsc::Sender<Tick>,
66 status_tx: mpsc::Sender<AppEvent>,
67 mut symbol_rx: watch::Receiver<String>,
68 mut shutdown: watch::Receiver<bool>,
69 ) -> Result<()> {
70 let mut backoff =
71 ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(60), 2.0);
72 let mut attempt: u32 = 0;
73
74 loop {
75 attempt += 1;
76 let instrument = symbol_rx.borrow().clone();
77 let (symbol, is_futures) = parse_instrument_symbol(&instrument);
78 let streams = vec![format!("{}@trade", symbol.to_lowercase())];
79 let ws_url = if is_futures {
80 &self.futures_url
81 } else {
82 &self.spot_url
83 };
84 match self
85 .connect_once(
86 ws_url,
87 &streams,
88 &instrument,
89 &tick_tx,
90 &status_tx,
91 &mut symbol_rx,
92 &mut shutdown,
93 )
94 .await
95 {
96 Ok(()) => {
97 let _ = status_tx
99 .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
100 .await;
101 break;
102 }
103 Err(e) => {
104 let _ = status_tx
105 .send(AppEvent::WsStatus(WsConnectionStatus::Disconnected))
106 .await;
107 tracing::warn!(attempt, error = %e, "WS connection attempt failed");
108 let _ = status_tx
109 .send(AppEvent::LogMessage(format!(
110 "WS error (attempt #{}): {}",
111 attempt, e
112 )))
113 .await;
114
115 let delay = backoff.next_delay();
116 let _ = status_tx
117 .send(AppEvent::WsStatus(WsConnectionStatus::Reconnecting {
118 attempt,
119 delay_ms: delay.as_millis() as u64,
120 }))
121 .await;
122
123 tokio::select! {
124 _ = tokio::time::sleep(delay) => continue,
125 _ = shutdown.changed() => {
126 let _ = status_tx
127 .send(AppEvent::LogMessage("Shutdown during reconnect".to_string()))
128 .await;
129 break;
130 }
131 }
132 }
133 }
134 }
135 Ok(())
136 }
137
138 async fn connect_once(
139 &self,
140 ws_url: &str,
141 streams: &[String],
142 display_symbol: &str,
143 tick_tx: &mpsc::Sender<Tick>,
144 status_tx: &mpsc::Sender<AppEvent>,
145 symbol_rx: &mut watch::Receiver<String>,
146 shutdown: &mut watch::Receiver<bool>,
147 ) -> Result<()> {
148 let _ = status_tx
149 .send(AppEvent::LogMessage(format!("Connecting to {}", ws_url)))
150 .await;
151
152 let (ws_stream, resp) = tokio_tungstenite::connect_async(ws_url)
153 .await
154 .map_err(|e| {
155 let detail = format_ws_error(&e);
156 let _ = status_tx.try_send(AppEvent::LogMessage(detail.clone()));
157 anyhow::anyhow!("WebSocket connect failed: {}", detail)
158 })?;
159
160 tracing::debug!(status = %resp.status(), "WebSocket HTTP upgrade response");
161
162 let (mut write, mut read) = ws_stream.split();
163
164 let subscribe_msg = serde_json::json!({
166 "method": "SUBSCRIBE",
167 "params": streams,
168 "id": 1
169 });
170 write
171 .send(tungstenite::Message::Text(subscribe_msg.to_string()))
172 .await
173 .map_err(|e| {
174 let detail = format_ws_error(&e);
175 anyhow::anyhow!("Failed to send SUBSCRIBE: {}", detail)
176 })?;
177
178 let _ = status_tx
179 .send(AppEvent::LogMessage(format!(
180 "Subscribed to: {}",
181 streams.join(", ")
182 )))
183 .await;
184
185 let _ = status_tx
187 .send(AppEvent::WsStatus(WsConnectionStatus::Connected))
188 .await;
189 let _ = status_tx
190 .send(AppEvent::LogMessage("WebSocket connected".to_string()))
191 .await;
192
193 loop {
194 tokio::select! {
195 msg = read.next() => {
196 match msg {
197 Some(Ok(tungstenite::Message::Text(text))) => {
198 self.handle_text_message(&text, display_symbol, tick_tx, status_tx).await;
199 }
200 Some(Ok(tungstenite::Message::Ping(_))) => {
201 }
203 Some(Ok(tungstenite::Message::Close(frame))) => {
204 let detail = match &frame {
205 Some(cf) => format!(
206 "Server closed: code={} reason=\"{}\"",
207 format_close_code(&cf.code),
208 cf.reason
209 ),
210 None => "Server closed: no close frame".to_string(),
211 };
212 let _ = status_tx
213 .send(AppEvent::LogMessage(detail.clone()))
214 .await;
215 return Err(anyhow::anyhow!("{}", detail));
216 }
217 Some(Ok(other)) => {
218 tracing::trace!(msg_type = ?other, "Unhandled WS message type");
219 }
220 Some(Err(e)) => {
221 let detail = format_ws_error(&e);
222 let _ = status_tx
223 .send(AppEvent::LogMessage(format!("WS read error: {}", detail)))
224 .await;
225 return Err(anyhow::anyhow!("WebSocket read error: {}", detail));
226 }
227 None => {
228 return Err(anyhow::anyhow!(
229 "WebSocket stream ended unexpectedly (connection dropped)"
230 ));
231 }
232 }
233 }
234 _ = shutdown.changed() => {
235 let unsub_msg = serde_json::json!({
237 "method": "UNSUBSCRIBE",
238 "params": streams,
239 "id": 2
240 });
241 let _ = write
242 .send(tungstenite::Message::Text(unsub_msg.to_string()))
243 .await;
244 let _ = write.send(tungstenite::Message::Close(None)).await;
245 return Ok(());
246 }
247 _ = symbol_rx.changed() => {
248 let _ = write.send(tungstenite::Message::Close(None)).await;
249 return Err(anyhow::anyhow!("Symbol changed, reconnecting WebSocket"));
250 }
251 }
252 }
253 }
254
255 async fn handle_text_message(
256 &self,
257 text: &str,
258 display_symbol: &str,
259 tick_tx: &mpsc::Sender<Tick>,
260 status_tx: &mpsc::Sender<AppEvent>,
261 ) {
262 if let Ok(val) = serde_json::from_str::<serde_json::Value>(text) {
264 if val.get("result").is_some() && val.get("id").is_some() {
265 tracing::debug!(id = %val["id"], "Subscription response received");
266 return;
267 }
268 }
269
270 match serde_json::from_str::<BinanceTradeEvent>(text) {
271 Ok(event) => {
272 let tick = Tick {
273 symbol: display_symbol.to_string(),
274 price: event.price,
275 qty: event.qty,
276 timestamp_ms: event.event_time,
277 is_buyer_maker: event.is_buyer_maker,
278 trade_id: event.trade_id,
279 };
280 if tick_tx.try_send(tick).is_err() {
281 tracing::warn!("Tick channel full, dropping tick");
282 }
283 }
284 Err(e) => {
285 tracing::debug!(error = %e, raw = %text, "Failed to parse WS message");
286 let _ = status_tx
287 .send(AppEvent::LogMessage(format!(
288 "WS parse skip: {}",
289 &text[..text.len().min(80)]
290 )))
291 .await;
292 }
293 }
294 }
295}
296
297fn parse_instrument_symbol(instrument: &str) -> (String, bool) {
298 let trimmed = instrument.trim();
299 if let Some(symbol) = trimmed.strip_suffix(" (FUT)") {
300 return (symbol.to_ascii_uppercase(), true);
301 }
302 (trimmed.to_ascii_uppercase(), false)
303}
304
305fn format_ws_error(err: &WsError) -> String {
307 match err {
308 WsError::ConnectionClosed => "Connection closed normally".to_string(),
309 WsError::AlreadyClosed => "Attempted operation on already-closed connection".to_string(),
310 WsError::Io(io_err) => {
311 format!("IO error [kind={}]: {}", io_err.kind(), io_err)
312 }
313 WsError::Tls(tls_err) => format!("TLS error: {}", tls_err),
314 WsError::Capacity(cap_err) => format!("Capacity error: {}", cap_err),
315 WsError::Protocol(proto_err) => {
316 let detail = match proto_err {
317 ProtocolError::ResetWithoutClosingHandshake => {
318 "connection reset without closing handshake (server may have dropped)"
319 }
320 ProtocolError::SendAfterClosing => "tried to send after close frame",
321 ProtocolError::ReceivedAfterClosing => "received data after close frame",
322 ProtocolError::HandshakeIncomplete => "handshake incomplete",
323 _ => "",
324 };
325 if detail.is_empty() {
326 format!("Protocol error: {}", proto_err)
327 } else {
328 format!("Protocol error: {} ({})", proto_err, detail)
329 }
330 }
331 WsError::WriteBufferFull(_) => "Write buffer full (backpressure)".to_string(),
332 WsError::Utf8 => "UTF-8 encoding error in frame data".to_string(),
333 WsError::AttackAttempt => "Attack attempt detected by WebSocket library".to_string(),
334 WsError::Url(url_err) => {
335 let hint = match url_err {
336 UrlError::TlsFeatureNotEnabled => "TLS feature not compiled in",
337 UrlError::NoHostName => "no host name in URL",
338 UrlError::UnableToConnect(addr) => {
339 return format!(
340 "URL error: unable to connect to {} (DNS/network failure?)",
341 addr
342 );
343 }
344 UrlError::UnsupportedUrlScheme => "only ws:// or wss:// are supported",
345 UrlError::EmptyHostName => "empty host name in URL",
346 UrlError::NoPathOrQuery => "no path/query in URL",
347 };
348 format!("URL error: {} — {}", url_err, hint)
349 }
350 WsError::Http(resp) => {
351 let status = resp.status();
352 let body_preview = resp
353 .body()
354 .as_ref()
355 .and_then(|b| std::str::from_utf8(b).ok())
356 .unwrap_or("")
357 .chars()
358 .take(200)
359 .collect::<String>();
360 format!(
361 "HTTP error: status={} ({}), body=\"{}\"",
362 status.as_u16(),
363 status.canonical_reason().unwrap_or("unknown"),
364 body_preview
365 )
366 }
367 WsError::HttpFormat(e) => format!("HTTP format error: {}", e),
368 }
369}
370
371fn format_close_code(code: &CloseCode) -> String {
373 let (num, label) = match code {
374 CloseCode::Normal => (1000, "Normal"),
375 CloseCode::Away => (1001, "Going Away"),
376 CloseCode::Protocol => (1002, "Protocol Error"),
377 CloseCode::Unsupported => (1003, "Unsupported Data"),
378 CloseCode::Status => (1005, "No Status"),
379 CloseCode::Abnormal => (1006, "Abnormal Closure"),
380 CloseCode::Invalid => (1007, "Invalid Payload"),
381 CloseCode::Policy => (1008, "Policy Violation"),
382 CloseCode::Size => (1009, "Message Too Big"),
383 CloseCode::Extension => (1010, "Extension Required"),
384 CloseCode::Error => (1011, "Internal Error"),
385 CloseCode::Restart => (1012, "Service Restart"),
386 CloseCode::Again => (1013, "Try Again Later"),
387 CloseCode::Tls => (1015, "TLS Handshake Failure"),
388 CloseCode::Reserved(n) => (*n, "Reserved"),
389 CloseCode::Iana(n) => (*n, "IANA"),
390 CloseCode::Library(n) => (*n, "Library"),
391 CloseCode::Bad(n) => (*n, "Bad"),
392 };
393 format!("{} ({})", num, label)
394}