bullet_rust_sdk/ws/managed.rs
1//! Auto-reconnecting WebSocket client.
2//!
3//! [`ManagedWebsocket`] wraps the raw [`WebsocketHandle`](super::client::WebsocketHandle)
4//! in a background task that handles reconnection with exponential backoff and
5//! replays subscriptions after each reconnect.
6//!
7//! Unlike `WebsocketHandle`, `ManagedWebsocket` is `Send + Sync` — it communicates
8//! with the background task via channels, so it can be shared across async tasks
9//! without a `Mutex`.
10//!
11//! This module is portable across native and wasm32 targets: the background
12//! task is spawned via [`tokio::spawn`] on native and
13//! [`wasm_bindgen_futures::spawn_local`] on wasm, and all time/channel
14//! primitives come from the `futures` crate.
15//!
16//! # Example
17//!
18//! ```ignore
19//! use bullet_rust_sdk::{Client, Topic, OrderbookDepth};
20//! use bullet_rust_sdk::ws::managed::{ManagedWebsocket, WsEvent};
21//!
22//! let client = Client::mainnet().await?;
23//! let mut ws = ManagedWebsocket::connect(&client).call().await?;
24//!
25//! ws.subscribe([Topic::depth("BTC-USD", OrderbookDepth::D20)], None)?;
26//!
27//! while let Some(event) = ws.recv().await {
28//! match event {
29//! WsEvent::Message(msg) => { /* process msg */ }
30//! WsEvent::Reconnecting => { /* log reconnect */ }
31//! WsEvent::Disconnected(err) => { /* permanent failure */ break; }
32//! }
33//! }
34//! ```
35
36use std::collections::HashSet;
37use std::time::Duration;
38
39use bon::bon;
40use futures::channel::{mpsc, oneshot};
41use futures::future::{self, Either, pending};
42use futures::{FutureExt, StreamExt};
43use futures_timer::Delay;
44use thiserror::Error;
45use tracing::{debug, info, warn};
46use web_time::Instant;
47
48use super::client::{WebsocketConfig, WebsocketHandle};
49use super::models::ServerMessage;
50use super::topics::Topic;
51use crate::Client;
52use crate::errors::WSErrors;
53use crate::types::{ClientMessage, OrderParams, RequestId};
54
55/// Errors from [`ManagedWebsocket`] operations.
56#[derive(Debug, Error)]
57pub enum ManagedWsError {
58 /// The background task has stopped (disconnected or the handle was dropped).
59 #[error("managed websocket is stopped")]
60 Stopped,
61 /// The command channel is full — the background task is not draining fast
62 /// enough. Indicates a stuck task or a pathological caller; treat as
63 /// backpressure.
64 #[error("managed websocket command channel is full")]
65 Busy,
66}
67
68/// Why a reconnect attempt gave up.
69#[derive(Debug, Error)]
70enum ReconnectError {
71 /// The user-facing handle was dropped while reconnecting.
72 #[error("managed websocket handle dropped")]
73 HandleDropped,
74 /// Ran out of retry attempts.
75 #[error("exhausted {0} reconnect attempts")]
76 RetriesExhausted(u32),
77 /// Subscription replay failed after reconnect with a non-transport error.
78 /// The underlying [`WSErrors`] is preserved so callers can distinguish
79 /// transient network failures from protocol-level problems (bad topic,
80 /// too many topics).
81 #[error("subscription replay failed: {0}")]
82 ReplayFailed(#[source] WSErrors),
83}
84
85/// Events delivered to the user from the managed WebSocket.
86#[derive(Debug)]
87pub enum WsEvent {
88 /// A message from the server.
89 Message(Box<ServerMessage>),
90 /// The connection was lost and a reconnect is in progress.
91 /// Subscriptions will be replayed automatically.
92 Reconnecting,
93 /// The connection was permanently lost after exhausting retries.
94 Disconnected(String),
95}
96
97/// Minimum backoff floor. A zero `initial_backoff` would otherwise make
98/// `backoff * 2` stay zero forever, producing a tight reconnect spin loop.
99const MIN_BACKOFF: Duration = Duration::from_millis(10);
100
101/// Default command channel capacity. Commands (subscribe, unsubscribe, order
102/// send) are intrinsically low-rate; if you're queueing more than this, the
103/// background task is stuck and the right answer is to surface
104/// [`ManagedWsError::Busy`] rather than silently buffer.
105const CMD_CHANNEL_CAPACITY: usize = 256;
106
107/// Configuration for managed WebSocket reconnection behavior.
108///
109/// # Example
110///
111/// ```ignore
112/// use bullet_rust_sdk::ManagedWsConfig;
113/// use std::time::Duration;
114///
115/// let config = ManagedWsConfig::builder()
116/// .max_retries(10)
117/// .initial_backoff(Duration::from_millis(500))
118/// .build();
119/// ```
120#[derive(bon::Builder, Clone, Debug)]
121pub struct ManagedWsConfig {
122 /// Initial delay before the first reconnect attempt.
123 ///
124 /// Default: 1 second
125 #[builder(default = Duration::from_secs(1))]
126 pub initial_backoff: Duration,
127
128 /// Maximum delay between reconnect attempts.
129 ///
130 /// Default: 30 seconds
131 #[builder(default = Duration::from_secs(30))]
132 pub max_backoff: Duration,
133
134 /// Maximum number of consecutive reconnect attempts before giving up.
135 /// `None` means retry forever.
136 ///
137 /// Default: `None` (infinite retries)
138 pub max_retries: Option<u32>,
139
140 /// Event channel buffer size. When the buffer is full and the consumer
141 /// isn't keeping up, new events are dropped to keep the WebSocket
142 /// connection alive. A warning is logged when this happens.
143 ///
144 /// Default: 10_000
145 #[builder(default = 10_000)]
146 pub channel_capacity: usize,
147
148 /// Underlying WebSocket connection config (e.g. handshake timeout).
149 pub ws_config: Option<WebsocketConfig>,
150
151 /// Force a reconnect if no server message arrives within this window.
152 ///
153 /// Protects against zombie connections — TCP keepalives and WebSocket
154 /// ping/pong keep the socket nominally alive, but the server can stop
155 /// sending data without closing. Without this, the handle sits on a dead
156 /// stream indefinitely.
157 ///
158 /// `Duration::ZERO` disables the timer. Cmd-path acks (subscribe, order
159 /// responses) DO count as server-pushed messages and reset the clock.
160 ///
161 /// Default: 60 seconds
162 #[builder(default = Duration::from_secs(60))]
163 pub idle_timeout: Duration,
164
165 /// How long a connection must stay up before the backoff state is
166 /// considered "stable" and the next disconnect starts from
167 /// [`initial_backoff`](Self::initial_backoff) again.
168 ///
169 /// Without this, a zombie that accepts connections and immediately drops
170 /// would be hammered at `initial_backoff` forever — the server never gets
171 /// the exponential-backoff relief.
172 ///
173 /// Default: 30 seconds
174 #[builder(default = Duration::from_secs(30))]
175 pub backoff_reset_after: Duration,
176}
177
178impl Default for ManagedWsConfig {
179 fn default() -> Self {
180 Self::builder().build()
181 }
182}
183
184/// Command sent from the user handle to the background task.
185///
186/// Subscribe/unsubscribe commands carry already-serialized topic strings so
187/// the background task doesn't need to re-serialize, and callers that already
188/// have string topics (notably the WASM bindings) don't need to round-trip
189/// through a typed [`Topic`].
190enum WsCommand {
191 Subscribe(Vec<String>, Option<RequestId>),
192 Unsubscribe(Vec<String>, Option<RequestId>),
193 Send(ClientMessage),
194}
195
196/// Auto-reconnecting WebSocket handle.
197///
198/// `Send + Sync` — safe to share across async tasks without a `Mutex`.
199///
200/// Subscribe/unsubscribe/order sends are fire-and-forget: the call queues a
201/// command to the background task and returns. Server acknowledgements arrive
202/// as [`WsEvent::Message`] on the event stream, matching standard CEX WS
203/// conventions.
204///
205/// Dropping the handle (or calling [`stop`](Self::stop)) terminates the
206/// background task immediately — even if it is mid-reconnect — via a separate
207/// cancellation signal that bypasses the command queue.
208pub struct ManagedWebsocket {
209 event_rx: mpsc::Receiver<WsEvent>,
210 cmd_tx: mpsc::Sender<WsCommand>,
211 /// Held, never sent on. Dropping signals shutdown to the background task.
212 _shutdown_tx: oneshot::Sender<()>,
213}
214
215impl ManagedWebsocket {
216 /// Open a managed (auto-reconnecting) WebSocket connection for the given
217 /// [`Client`].
218 ///
219 /// The returned handle manages reconnection with exponential backoff and
220 /// replays subscriptions automatically.
221 ///
222 /// # Example
223 ///
224 /// ```ignore
225 /// let mut ws = ManagedWebsocket::connect(&client).call().await?;
226 /// ws.subscribe([Topic::agg_trade("BTC-USD")], None)?;
227 /// ```
228 #[cfg_attr(not(target_arch = "wasm32"), doc = "Uses [`tokio::spawn`] on native targets.")]
229 #[cfg_attr(
230 target_arch = "wasm32",
231 doc = "Uses [`wasm_bindgen_futures::spawn_local`] on wasm targets."
232 )]
233 pub async fn connect(client: &Client) -> Result<ManagedWebsocket, WSErrors> {
234 Self::connect_with(client, ManagedWsConfig::default()).await
235 }
236
237 /// Like [`connect`](Self::connect) but takes an explicit [`ManagedWsConfig`].
238 pub async fn connect_with(
239 client: &Client,
240 config: ManagedWsConfig,
241 ) -> Result<ManagedWebsocket, WSErrors> {
242 let ws = client.connect_ws().maybe_config(config.ws_config.clone()).call().await?;
243
244 let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
245 let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_CAPACITY);
246 let (shutdown_tx, shutdown_rx) = oneshot::channel();
247
248 let inner = ManagedWsClient::from_client(client);
249
250 spawn(async move {
251 run_managed_ws(inner, ws, config, event_tx, cmd_rx, shutdown_rx).await;
252 });
253
254 Ok(ManagedWebsocket { event_rx, cmd_tx, _shutdown_tx: shutdown_tx })
255 }
256
257 /// Receive the next event from the WebSocket.
258 ///
259 /// Returns `None` when the background task has stopped (after permanent
260 /// disconnection or [`stop`](Self::stop)).
261 pub async fn recv(&mut self) -> Option<WsEvent> {
262 self.event_rx.next().await
263 }
264
265 /// Subscribe to topics. The subscription is tracked and replayed on reconnect.
266 ///
267 /// This is fire-and-forget — it queues the command to the background task.
268 /// The server's subscribe acknowledgement arrives as a [`WsEvent::Message`].
269 pub fn subscribe(
270 &self,
271 topics: impl IntoIterator<Item = Topic>,
272 id: Option<RequestId>,
273 ) -> Result<(), ManagedWsError> {
274 let params: Vec<String> = topics.into_iter().map(|t| t.to_string()).collect();
275 self.try_send_cmd(WsCommand::Subscribe(params, id))
276 }
277
278 /// Subscribe using pre-serialized topic strings (e.g. `"BTC-USD@aggTrade"`).
279 ///
280 /// Prefer [`subscribe`](Self::subscribe) with typed [`Topic`] values from
281 /// native Rust — this overload exists for binding layers (WASM/JS) that
282 /// already hold string topics.
283 pub fn subscribe_raw(
284 &self,
285 topics: impl IntoIterator<Item = String>,
286 id: Option<RequestId>,
287 ) -> Result<(), ManagedWsError> {
288 let params: Vec<String> = topics.into_iter().collect();
289 self.try_send_cmd(WsCommand::Subscribe(params, id))
290 }
291
292 /// Unsubscribe from topics. Removes them from the replay list.
293 pub fn unsubscribe(
294 &self,
295 topics: impl IntoIterator<Item = Topic>,
296 id: Option<RequestId>,
297 ) -> Result<(), ManagedWsError> {
298 let params: Vec<String> = topics.into_iter().map(|t| t.to_string()).collect();
299 self.try_send_cmd(WsCommand::Unsubscribe(params, id))
300 }
301
302 /// Raw-string counterpart of [`unsubscribe`](Self::unsubscribe).
303 pub fn unsubscribe_raw(
304 &self,
305 topics: impl IntoIterator<Item = String>,
306 id: Option<RequestId>,
307 ) -> Result<(), ManagedWsError> {
308 let params: Vec<String> = topics.into_iter().collect();
309 self.try_send_cmd(WsCommand::Unsubscribe(params, id))
310 }
311
312 /// Place an order via WebSocket.
313 pub fn order_place(
314 &self,
315 tx: impl Into<String>,
316 id: Option<RequestId>,
317 ) -> Result<(), ManagedWsError> {
318 self.try_send_cmd(WsCommand::Send(ClientMessage::OrderPlace {
319 id,
320 params: OrderParams { tx: tx.into() },
321 }))
322 }
323
324 /// Cancel an order via WebSocket.
325 pub fn order_cancel(
326 &self,
327 tx: impl Into<String>,
328 id: Option<RequestId>,
329 ) -> Result<(), ManagedWsError> {
330 self.try_send_cmd(WsCommand::Send(ClientMessage::OrderCancel {
331 id,
332 params: OrderParams { tx: tx.into() },
333 }))
334 }
335
336 /// Place an order using a signed [`Transaction`]. Base64-encodes internally.
337 ///
338 /// Returns a `SDKResult`-style error instead of `ManagedWsError` because
339 /// encoding can fail independently of the channel state.
340 ///
341 /// [`Transaction`]: bullet_exchange_interface::transaction::Transaction
342 pub fn place_order(
343 &self,
344 signed: &bullet_exchange_interface::transaction::Transaction,
345 id: Option<RequestId>,
346 ) -> Result<(), WSErrors> {
347 let base64 =
348 crate::Transaction::to_base64(signed).map_err(|e| WSErrors::WsError(e.to_string()))?;
349 self.order_place(base64, id).map_err(|e| WSErrors::WsError(e.to_string()))
350 }
351
352 /// Cancel an order using a signed [`Transaction`]. Base64-encodes internally.
353 ///
354 /// [`Transaction`]: bullet_exchange_interface::transaction::Transaction
355 pub fn cancel_order(
356 &self,
357 signed: &bullet_exchange_interface::transaction::Transaction,
358 id: Option<RequestId>,
359 ) -> Result<(), WSErrors> {
360 let base64 =
361 crate::Transaction::to_base64(signed).map_err(|e| WSErrors::WsError(e.to_string()))?;
362 self.order_cancel(base64, id).map_err(|e| WSErrors::WsError(e.to_string()))
363 }
364
365 /// Stop the managed WebSocket and its background task.
366 ///
367 /// After this returns the background task has been signaled; it will
368 /// terminate at its next await point without draining pending commands.
369 /// The event stream will end (`recv()` returns `None`) shortly after.
370 pub fn stop(self) {
371 // Drop self — `_shutdown_tx` is dropped, closing the oneshot. Task sees
372 // the signal and exits without going through the cmd queue.
373 }
374
375 fn try_send_cmd(&self, cmd: WsCommand) -> Result<(), ManagedWsError> {
376 // `Sender::clone` is an Arc bump, so try_send (which needs &mut self)
377 // can be called without requiring `&mut self` on the public API.
378 let mut tx = self.cmd_tx.clone();
379 tx.try_send(cmd)
380 .map_err(|e| if e.is_full() { ManagedWsError::Busy } else { ManagedWsError::Stopped })
381 }
382}
383
384// `mpsc::Sender`/`Receiver` are `Send`, and `oneshot::Sender<()>` is `Send + Sync`.
385// The handle is explicitly `Send + Sync` on all targets so callers can share it
386// across async tasks without a `Mutex`.
387
388/// Convenience wrapper on [`Client`] that forwards to [`ManagedWebsocket::connect`].
389///
390/// Kept as a thin helper so the common case (`client.connect_ws_managed()`) is
391/// discoverable; the real dependency still flows `managed → client`.
392#[bon]
393impl Client {
394 #[builder]
395 pub async fn connect_ws_managed(
396 &self,
397 config: Option<ManagedWsConfig>,
398 ) -> Result<ManagedWebsocket, WSErrors> {
399 match config {
400 Some(c) => ManagedWebsocket::connect_with(self, c).await,
401 None => ManagedWebsocket::connect(self).await,
402 }
403 }
404}
405
406/// Minimal client data needed by the background task for reconnection.
407///
408/// Constructed via [`ManagedWsClient::from_client`] so `Client` has no
409/// compile-time dependency on this struct.
410struct ManagedWsClient {
411 ws_client: reqwest::Client,
412 ws_url: String,
413}
414
415impl ManagedWsClient {
416 fn from_client(client: &Client) -> Self {
417 Self { ws_client: client.ws_client.clone(), ws_url: client.ws_url().to_string() }
418 }
419
420 async fn connect(
421 &self,
422 ws_config: &Option<WebsocketConfig>,
423 ) -> Result<WebsocketHandle, WSErrors> {
424 let timeout = ws_config
425 .as_ref()
426 .map(|c| c.connection_timeout)
427 .unwrap_or(web_time::Duration::from_secs(10));
428 WebsocketHandle::connect(&self.ws_client, &self.ws_url, timeout).await
429 }
430}
431
432/// Spawn a background future on the target's executor.
433///
434/// Native uses [`tokio::spawn`] (requires `Send`); wasm uses
435/// [`wasm_bindgen_futures::spawn_local`] (no `Send` required since JS is
436/// single-threaded).
437#[cfg(not(target_arch = "wasm32"))]
438fn spawn<F>(fut: F)
439where
440 F: std::future::Future<Output = ()> + Send + 'static,
441{
442 tokio::spawn(fut);
443}
444
445#[cfg(target_arch = "wasm32")]
446fn spawn<F>(fut: F)
447where
448 F: std::future::Future<Output = ()> + 'static,
449{
450 wasm_bindgen_futures::spawn_local(fut);
451}
452
453/// Persistent state carried across reconnect cycles.
454///
455/// The backoff duration persists across disconnect cycles so a zombie that
456/// accepts connections and immediately drops gets proper exponential relief
457/// instead of being hammered at `initial_backoff` forever. It resets to
458/// `initial_backoff` only after a connection has stayed up for
459/// `backoff_reset_after`.
460struct ReconnectState {
461 backoff: Duration,
462 /// When the *current* connection was established. `None` only before the
463 /// very first successful reconnect (the initial connect sets this on
464 /// entry to [`run_managed_ws`]).
465 connected_since: Option<Instant>,
466}
467
468/// Background task that manages the WebSocket lifecycle.
469async fn run_managed_ws(
470 client: ManagedWsClient,
471 mut ws: WebsocketHandle,
472 config: ManagedWsConfig,
473 mut event_tx: mpsc::Sender<WsEvent>,
474 mut cmd_rx: mpsc::Receiver<WsCommand>,
475 mut shutdown_rx: oneshot::Receiver<()>,
476) {
477 let mut active_topics: HashSet<String> = HashSet::new();
478 let mut last_msg = Instant::now();
479 let mut state = ReconnectState {
480 backoff: config.initial_backoff.max(MIN_BACKOFF),
481 connected_since: Some(Instant::now()),
482 };
483
484 /// One completed branch of the per-iteration select.
485 ///
486 /// `Recv` boxes the server message so this enum stays small — `ServerMessage`
487 /// is ~340 bytes and sits on the stack every iteration otherwise.
488 enum Branch {
489 Shutdown,
490 Recv(Result<Box<ServerMessage>, WSErrors>),
491 Cmd(Option<WsCommand>),
492 Idle,
493 }
494
495 loop {
496 // Idle-timeout future: fires if no server-pushed message arrives within
497 // the window. `Duration::ZERO` disables the timer.
498 let idle_remaining = if config.idle_timeout.is_zero() {
499 None
500 } else {
501 Some(config.idle_timeout.saturating_sub(last_msg.elapsed()))
502 };
503
504 // Run the select in its own scope so the fused recv/cmd futures (which
505 // hold `&mut ws` / `&mut cmd_rx`) are dropped before we touch those
506 // receivers again in the match arms below.
507 let branch = {
508 let recv_fut = ws.recv().fuse();
509 let cmd_fut = cmd_rx.next().fuse();
510 let idle_fut = match idle_remaining {
511 Some(d) => Either::Left(Delay::new(d)),
512 None => Either::Right(pending::<()>()),
513 }
514 .fuse();
515 futures::pin_mut!(recv_fut, cmd_fut, idle_fut);
516
517 futures::select! {
518 _ = (&mut shutdown_rx).fuse() => Branch::Shutdown,
519 r = recv_fut => Branch::Recv(r.map(Box::new)),
520 c = cmd_fut => Branch::Cmd(c),
521 _ = idle_fut => Branch::Idle,
522 }
523 };
524
525 match branch {
526 Branch::Shutdown => {
527 debug!("shutdown signaled, stopping managed ws");
528 return;
529 }
530 Branch::Recv(Ok(msg)) => {
531 // Server proved it's alive — reset the idle timer.
532 last_msg = Instant::now();
533 match event_tx.try_send(WsEvent::Message(msg)) {
534 Ok(()) => {}
535 Err(e) if e.is_full() => {
536 warn!("event channel full, dropping message — consumer too slow");
537 }
538 Err(_) => {
539 debug!("event receiver dropped, stopping managed ws");
540 return;
541 }
542 }
543 }
544 Branch::Recv(Err(e)) => {
545 match &e {
546 WSErrors::WsClosed { code, reason } => {
547 warn!(?code, %reason, "WebSocket disconnected, reconnecting");
548 }
549 WSErrors::WsStreamEnded => {
550 warn!("WebSocket stream ended, reconnecting");
551 }
552 _ => {
553 warn!(?e, "WebSocket error, reconnecting");
554 }
555 }
556 if do_reconnect(
557 &client,
558 &config,
559 &active_topics,
560 &mut event_tx,
561 &mut ws,
562 &mut shutdown_rx,
563 &mut state,
564 )
565 .await
566 {
567 return;
568 }
569 last_msg = Instant::now();
570 }
571 Branch::Idle => {
572 let elapsed = last_msg.elapsed();
573 warn!(?elapsed, "no server messages within idle timeout, forcing reconnect");
574 if do_reconnect(
575 &client,
576 &config,
577 &active_topics,
578 &mut event_tx,
579 &mut ws,
580 &mut shutdown_rx,
581 &mut state,
582 )
583 .await
584 {
585 return;
586 }
587 last_msg = Instant::now();
588 }
589 Branch::Cmd(Some(WsCommand::Subscribe(params, id))) => {
590 // Dedup: only send for topics we aren't already subscribed to.
591 // The server may reject duplicates with an unhelpful error, and
592 // the topic set is the source of truth for replay.
593 let new_params: Vec<String> =
594 params.into_iter().filter(|p| active_topics.insert(p.clone())).collect();
595 if new_params.is_empty() {
596 debug!("subscribe: all topics already active, skipping wire send");
597 } else if let Err(e) =
598 ws.send(ClientMessage::Subscribe { id, params: new_params }).await
599 {
600 debug!(?e, "subscribe send failed, will replay after reconnect");
601 }
602 }
603 Branch::Cmd(Some(WsCommand::Unsubscribe(params, id))) => {
604 // Dedup: only send for topics we're actually subscribed to.
605 let to_send: Vec<String> =
606 params.into_iter().filter(|p| active_topics.remove(p)).collect();
607 if to_send.is_empty() {
608 debug!("unsubscribe: no matching active topics, skipping wire send");
609 } else if let Err(e) =
610 ws.send(ClientMessage::Unsubscribe { id, params: to_send }).await
611 {
612 debug!(?e, "unsubscribe send failed");
613 }
614 }
615 Branch::Cmd(Some(WsCommand::Send(msg))) => {
616 if let Err(e) = ws.send(msg.clone()).await {
617 warn!(?e, "failed to send order message, reconnecting");
618 if do_reconnect(
619 &client,
620 &config,
621 &active_topics,
622 &mut event_tx,
623 &mut ws,
624 &mut shutdown_rx,
625 &mut state,
626 )
627 .await
628 {
629 return;
630 }
631 // Retry the message once on the new connection. If it fails
632 // again the connection is still broken and do_reconnect will
633 // run again on the next loop iteration.
634 if let Err(e) = ws.send(msg).await {
635 warn!(?e, "retry after reconnect also failed");
636 }
637 last_msg = Instant::now();
638 }
639 }
640 Branch::Cmd(None) => {
641 debug!("command channel closed, stopping managed ws");
642 return;
643 }
644 }
645 }
646}
647
648/// Handle reconnection. Returns `true` if the task should stop.
649///
650/// Emits `WsEvent::Reconnecting`, reuses/resets backoff per
651/// [`ReconnectState`], and on success updates `state.connected_since` and
652/// replaces `*ws` with the new handle.
653async fn do_reconnect(
654 client: &ManagedWsClient,
655 config: &ManagedWsConfig,
656 active_topics: &HashSet<String>,
657 event_tx: &mut mpsc::Sender<WsEvent>,
658 ws: &mut WebsocketHandle,
659 shutdown_rx: &mut oneshot::Receiver<()>,
660 state: &mut ReconnectState,
661) -> bool {
662 match event_tx.try_send(WsEvent::Reconnecting) {
663 Ok(()) => {}
664 Err(e) if e.is_full() => {}
665 Err(_) => return true,
666 }
667
668 // If the previous connection was stable for long enough, reset the
669 // exponential backoff. Otherwise carry it forward so we don't hammer a
670 // zombie that accepts-then-drops at `initial_backoff` forever.
671 if let Some(t) = state.connected_since
672 && t.elapsed() >= config.backoff_reset_after
673 {
674 debug!(
675 uptime = ?t.elapsed(),
676 "previous connection was stable; resetting backoff"
677 );
678 state.backoff = config.initial_backoff.max(MIN_BACKOFF);
679 }
680 state.connected_since = None;
681
682 match reconnect(client, config, active_topics, event_tx, shutdown_rx, state).await {
683 Ok(new_ws) => {
684 *ws = new_ws;
685 state.connected_since = Some(Instant::now());
686 info!("reconnected successfully");
687 false
688 }
689 Err(ReconnectError::HandleDropped) => true,
690 Err(err) => {
691 let _ = event_tx.try_send(WsEvent::Disconnected(err.to_string()));
692 true
693 }
694 }
695}
696
697/// Reconnect with exponential backoff + jitter and replay subscriptions.
698///
699/// Observes `shutdown_rx` during every sleep and between connect attempts so
700/// dropping the handle terminates the loop promptly. `state.backoff` is
701/// mutated in place so growth persists across calls (see [`do_reconnect`]).
702async fn reconnect(
703 client: &ManagedWsClient,
704 config: &ManagedWsConfig,
705 active_topics: &HashSet<String>,
706 event_tx: &mpsc::Sender<WsEvent>,
707 shutdown_rx: &mut oneshot::Receiver<()>,
708 state: &mut ReconnectState,
709) -> Result<WebsocketHandle, ReconnectError> {
710 let max_backoff = config.max_backoff.max(MIN_BACKOFF);
711 let mut attempts = 0u32;
712
713 loop {
714 if shutdown_observed(shutdown_rx) || event_tx.is_closed() {
715 return Err(ReconnectError::HandleDropped);
716 }
717
718 attempts += 1;
719 if let Some(max) = config.max_retries
720 && attempts > max
721 {
722 return Err(ReconnectError::RetriesExhausted(max));
723 }
724
725 // Jitter: add 0..50% of backoff to avoid thundering herd.
726 let jitter_ms = rand::random::<u64>() % (state.backoff.as_millis() as u64 / 2 + 1);
727 let delay = state.backoff + Duration::from_millis(jitter_ms);
728
729 info!(attempt = attempts, delay = ?delay, backoff = ?state.backoff, "attempting reconnect");
730
731 match future::select(Delay::new(delay), &mut *shutdown_rx).await {
732 Either::Left(_) => {}
733 Either::Right(_) => return Err(ReconnectError::HandleDropped),
734 }
735
736 let connect_fut = client.connect(&config.ws_config);
737 let connect_result = match future::select(Box::pin(connect_fut), &mut *shutdown_rx).await {
738 Either::Left((r, _)) => r,
739 Either::Right(_) => return Err(ReconnectError::HandleDropped),
740 };
741
742 match connect_result {
743 Ok(mut ws) => {
744 if !active_topics.is_empty() {
745 let params: Vec<String> = active_topics.iter().cloned().collect();
746 debug!(count = params.len(), "replaying subscriptions");
747 if let Err(e) = ws.send(ClientMessage::Subscribe { id: None, params }).await {
748 // Distinguish protocol errors (bad topic, oversize) from
749 // transport errors (connection died mid-replay).
750 if matches!(&e, WSErrors::WsStreamEnded | WSErrors::WsClosed { .. }) {
751 warn!(?e, "replay send lost connection, retrying");
752 state.backoff = (state.backoff * 2).min(max_backoff);
753 continue;
754 }
755 return Err(ReconnectError::ReplayFailed(e));
756 }
757 }
758 return Ok(ws);
759 }
760 Err(e) => {
761 warn!(?e, attempt = attempts, "reconnect failed");
762 state.backoff = (state.backoff * 2).min(max_backoff);
763 }
764 }
765 }
766}
767
768/// Returns `true` if shutdown has been signaled (sender sent, dropped, or the
769/// receiver is otherwise resolved).
770fn shutdown_observed(rx: &mut oneshot::Receiver<()>) -> bool {
771 !matches!(rx.try_recv(), Ok(None))
772}