Skip to main content

bullet_rust_sdk/ws/
managed.rs

1//! Auto-reconnecting WebSocket client.
2//!
3//! [`ManagedWebsocket`] wraps the raw [`WebsocketHandle`](super::client::WebsocketHandle)
4//! in a background task that handles reconnection with exponential backoff and
5//! replays subscriptions after each reconnect.
6//!
7//! Unlike `WebsocketHandle`, `ManagedWebsocket` is `Send + Sync` — it communicates
8//! with the background task via channels, so it can be shared across async tasks
9//! without a `Mutex`.
10//!
11//! This module is portable across native and wasm32 targets: the background
12//! task is spawned via [`tokio::spawn`] on native and
13//! [`wasm_bindgen_futures::spawn_local`] on wasm, and all time/channel
14//! primitives come from the `futures` crate.
15//!
16//! # Example
17//!
18//! ```ignore
19//! use bullet_rust_sdk::{Client, Topic, OrderbookDepth};
20//! use bullet_rust_sdk::ws::managed::{ManagedWebsocket, WsEvent};
21//!
22//! let client = Client::mainnet().await?;
23//! let mut ws = ManagedWebsocket::connect(&client).call().await?;
24//!
25//! ws.subscribe([Topic::depth("BTC-USD", OrderbookDepth::D20)], None)?;
26//!
27//! while let Some(event) = ws.recv().await {
28//!     match event {
29//!         WsEvent::Message(msg) => { /* process msg */ }
30//!         WsEvent::Reconnecting => { /* log reconnect */ }
31//!         WsEvent::Disconnected(err) => { /* permanent failure */ break; }
32//!     }
33//! }
34//! ```
35
36use std::collections::HashSet;
37use std::time::Duration;
38
39use bon::bon;
40use futures::channel::{mpsc, oneshot};
41use futures::future::{self, Either, pending};
42use futures::{FutureExt, StreamExt};
43use futures_timer::Delay;
44use thiserror::Error;
45use tracing::{debug, info, warn};
46use web_time::Instant;
47
48use super::client::{WebsocketConfig, WebsocketHandle};
49use super::models::ServerMessage;
50use super::topics::Topic;
51use crate::Client;
52use crate::errors::WSErrors;
53use crate::types::{ClientMessage, OrderParams, RequestId};
54
55/// Errors from [`ManagedWebsocket`] operations.
56#[derive(Debug, Error)]
57pub enum ManagedWsError {
58    /// The background task has stopped (disconnected or the handle was dropped).
59    #[error("managed websocket is stopped")]
60    Stopped,
61    /// The command channel is full — the background task is not draining fast
62    /// enough. Indicates a stuck task or a pathological caller; treat as
63    /// backpressure.
64    #[error("managed websocket command channel is full")]
65    Busy,
66}
67
68/// Why a reconnect attempt gave up.
69#[derive(Debug, Error)]
70enum ReconnectError {
71    /// The user-facing handle was dropped while reconnecting.
72    #[error("managed websocket handle dropped")]
73    HandleDropped,
74    /// Ran out of retry attempts.
75    #[error("exhausted {0} reconnect attempts")]
76    RetriesExhausted(u32),
77    /// Subscription replay failed after reconnect with a non-transport error.
78    /// The underlying [`WSErrors`] is preserved so callers can distinguish
79    /// transient network failures from protocol-level problems (bad topic,
80    /// too many topics).
81    #[error("subscription replay failed: {0}")]
82    ReplayFailed(#[source] WSErrors),
83}
84
85/// Events delivered to the user from the managed WebSocket.
86#[derive(Debug)]
87pub enum WsEvent {
88    /// A message from the server.
89    Message(Box<ServerMessage>),
90    /// The connection was lost and a reconnect is in progress.
91    /// Subscriptions will be replayed automatically.
92    Reconnecting,
93    /// The connection was permanently lost after exhausting retries.
94    Disconnected(String),
95}
96
97/// Minimum backoff floor. A zero `initial_backoff` would otherwise make
98/// `backoff * 2` stay zero forever, producing a tight reconnect spin loop.
99const MIN_BACKOFF: Duration = Duration::from_millis(10);
100
101/// Default command channel capacity. Commands (subscribe, unsubscribe, order
102/// send) are intrinsically low-rate; if you're queueing more than this, the
103/// background task is stuck and the right answer is to surface
104/// [`ManagedWsError::Busy`] rather than silently buffer.
105const CMD_CHANNEL_CAPACITY: usize = 256;
106
107/// Configuration for managed WebSocket reconnection behavior.
108///
109/// # Example
110///
111/// ```ignore
112/// use bullet_rust_sdk::ManagedWsConfig;
113/// use std::time::Duration;
114///
115/// let config = ManagedWsConfig::builder()
116///     .max_retries(10)
117///     .initial_backoff(Duration::from_millis(500))
118///     .build();
119/// ```
120#[derive(bon::Builder, Clone, Debug)]
121pub struct ManagedWsConfig {
122    /// Initial delay before the first reconnect attempt.
123    ///
124    /// Default: 1 second
125    #[builder(default = Duration::from_secs(1))]
126    pub initial_backoff: Duration,
127
128    /// Maximum delay between reconnect attempts.
129    ///
130    /// Default: 30 seconds
131    #[builder(default = Duration::from_secs(30))]
132    pub max_backoff: Duration,
133
134    /// Maximum number of consecutive reconnect attempts before giving up.
135    /// `None` means retry forever.
136    ///
137    /// Default: `None` (infinite retries)
138    pub max_retries: Option<u32>,
139
140    /// Event channel buffer size. When the buffer is full and the consumer
141    /// isn't keeping up, new events are dropped to keep the WebSocket
142    /// connection alive. A warning is logged when this happens.
143    ///
144    /// Default: 10_000
145    #[builder(default = 10_000)]
146    pub channel_capacity: usize,
147
148    /// Underlying WebSocket connection config (e.g. handshake timeout).
149    pub ws_config: Option<WebsocketConfig>,
150
151    /// Force a reconnect if no server message arrives within this window.
152    ///
153    /// Protects against zombie connections — TCP keepalives and WebSocket
154    /// ping/pong keep the socket nominally alive, but the server can stop
155    /// sending data without closing. Without this, the handle sits on a dead
156    /// stream indefinitely.
157    ///
158    /// `Duration::ZERO` disables the timer. Cmd-path acks (subscribe, order
159    /// responses) DO count as server-pushed messages and reset the clock.
160    ///
161    /// Default: 60 seconds
162    #[builder(default = Duration::from_secs(60))]
163    pub idle_timeout: Duration,
164
165    /// How long a connection must stay up before the backoff state is
166    /// considered "stable" and the next disconnect starts from
167    /// [`initial_backoff`](Self::initial_backoff) again.
168    ///
169    /// Without this, a zombie that accepts connections and immediately drops
170    /// would be hammered at `initial_backoff` forever — the server never gets
171    /// the exponential-backoff relief.
172    ///
173    /// Default: 30 seconds
174    #[builder(default = Duration::from_secs(30))]
175    pub backoff_reset_after: Duration,
176}
177
178impl Default for ManagedWsConfig {
179    fn default() -> Self {
180        Self::builder().build()
181    }
182}
183
184/// Command sent from the user handle to the background task.
185///
186/// Subscribe/unsubscribe commands carry already-serialized topic strings so
187/// the background task doesn't need to re-serialize, and callers that already
188/// have string topics (notably the WASM bindings) don't need to round-trip
189/// through a typed [`Topic`].
190enum WsCommand {
191    Subscribe(Vec<String>, Option<RequestId>),
192    Unsubscribe(Vec<String>, Option<RequestId>),
193    Send(ClientMessage),
194}
195
196/// Auto-reconnecting WebSocket handle.
197///
198/// `Send + Sync` — safe to share across async tasks without a `Mutex`.
199///
200/// Subscribe/unsubscribe/order sends are fire-and-forget: the call queues a
201/// command to the background task and returns. Server acknowledgements arrive
202/// as [`WsEvent::Message`] on the event stream, matching standard CEX WS
203/// conventions.
204///
205/// Dropping the handle (or calling [`stop`](Self::stop)) terminates the
206/// background task immediately — even if it is mid-reconnect — via a separate
207/// cancellation signal that bypasses the command queue.
208pub struct ManagedWebsocket {
209    event_rx: mpsc::Receiver<WsEvent>,
210    cmd_tx: mpsc::Sender<WsCommand>,
211    /// Held, never sent on. Dropping signals shutdown to the background task.
212    _shutdown_tx: oneshot::Sender<()>,
213}
214
215impl ManagedWebsocket {
216    /// Open a managed (auto-reconnecting) WebSocket connection for the given
217    /// [`Client`].
218    ///
219    /// The returned handle manages reconnection with exponential backoff and
220    /// replays subscriptions automatically.
221    ///
222    /// # Example
223    ///
224    /// ```ignore
225    /// let mut ws = ManagedWebsocket::connect(&client).call().await?;
226    /// ws.subscribe([Topic::agg_trade("BTC-USD")], None)?;
227    /// ```
228    #[cfg_attr(not(target_arch = "wasm32"), doc = "Uses [`tokio::spawn`] on native targets.")]
229    #[cfg_attr(
230        target_arch = "wasm32",
231        doc = "Uses [`wasm_bindgen_futures::spawn_local`] on wasm targets."
232    )]
233    pub async fn connect(client: &Client) -> Result<ManagedWebsocket, WSErrors> {
234        Self::connect_with(client, ManagedWsConfig::default()).await
235    }
236
237    /// Like [`connect`](Self::connect) but takes an explicit [`ManagedWsConfig`].
238    pub async fn connect_with(
239        client: &Client,
240        config: ManagedWsConfig,
241    ) -> Result<ManagedWebsocket, WSErrors> {
242        let ws = client.connect_ws().maybe_config(config.ws_config.clone()).call().await?;
243
244        let (event_tx, event_rx) = mpsc::channel(config.channel_capacity);
245        let (cmd_tx, cmd_rx) = mpsc::channel(CMD_CHANNEL_CAPACITY);
246        let (shutdown_tx, shutdown_rx) = oneshot::channel();
247
248        let inner = ManagedWsClient::from_client(client);
249
250        spawn(async move {
251            run_managed_ws(inner, ws, config, event_tx, cmd_rx, shutdown_rx).await;
252        });
253
254        Ok(ManagedWebsocket { event_rx, cmd_tx, _shutdown_tx: shutdown_tx })
255    }
256
257    /// Receive the next event from the WebSocket.
258    ///
259    /// Returns `None` when the background task has stopped (after permanent
260    /// disconnection or [`stop`](Self::stop)).
261    pub async fn recv(&mut self) -> Option<WsEvent> {
262        self.event_rx.next().await
263    }
264
265    /// Subscribe to topics. The subscription is tracked and replayed on reconnect.
266    ///
267    /// This is fire-and-forget — it queues the command to the background task.
268    /// The server's subscribe acknowledgement arrives as a [`WsEvent::Message`].
269    pub fn subscribe(
270        &self,
271        topics: impl IntoIterator<Item = Topic>,
272        id: Option<RequestId>,
273    ) -> Result<(), ManagedWsError> {
274        let params: Vec<String> = topics.into_iter().map(|t| t.to_string()).collect();
275        self.try_send_cmd(WsCommand::Subscribe(params, id))
276    }
277
278    /// Subscribe using pre-serialized topic strings (e.g. `"BTC-USD@aggTrade"`).
279    ///
280    /// Prefer [`subscribe`](Self::subscribe) with typed [`Topic`] values from
281    /// native Rust — this overload exists for binding layers (WASM/JS) that
282    /// already hold string topics.
283    pub fn subscribe_raw(
284        &self,
285        topics: impl IntoIterator<Item = String>,
286        id: Option<RequestId>,
287    ) -> Result<(), ManagedWsError> {
288        let params: Vec<String> = topics.into_iter().collect();
289        self.try_send_cmd(WsCommand::Subscribe(params, id))
290    }
291
292    /// Unsubscribe from topics. Removes them from the replay list.
293    pub fn unsubscribe(
294        &self,
295        topics: impl IntoIterator<Item = Topic>,
296        id: Option<RequestId>,
297    ) -> Result<(), ManagedWsError> {
298        let params: Vec<String> = topics.into_iter().map(|t| t.to_string()).collect();
299        self.try_send_cmd(WsCommand::Unsubscribe(params, id))
300    }
301
302    /// Raw-string counterpart of [`unsubscribe`](Self::unsubscribe).
303    pub fn unsubscribe_raw(
304        &self,
305        topics: impl IntoIterator<Item = String>,
306        id: Option<RequestId>,
307    ) -> Result<(), ManagedWsError> {
308        let params: Vec<String> = topics.into_iter().collect();
309        self.try_send_cmd(WsCommand::Unsubscribe(params, id))
310    }
311
312    /// Place an order via WebSocket.
313    pub fn order_place(
314        &self,
315        tx: impl Into<String>,
316        id: Option<RequestId>,
317    ) -> Result<(), ManagedWsError> {
318        self.try_send_cmd(WsCommand::Send(ClientMessage::OrderPlace {
319            id,
320            params: OrderParams { tx: tx.into() },
321        }))
322    }
323
324    /// Cancel an order via WebSocket.
325    pub fn order_cancel(
326        &self,
327        tx: impl Into<String>,
328        id: Option<RequestId>,
329    ) -> Result<(), ManagedWsError> {
330        self.try_send_cmd(WsCommand::Send(ClientMessage::OrderCancel {
331            id,
332            params: OrderParams { tx: tx.into() },
333        }))
334    }
335
336    /// Place an order using a signed [`Transaction`]. Base64-encodes internally.
337    ///
338    /// Returns a `SDKResult`-style error instead of `ManagedWsError` because
339    /// encoding can fail independently of the channel state.
340    ///
341    /// [`Transaction`]: bullet_exchange_interface::transaction::Transaction
342    pub fn place_order(
343        &self,
344        signed: &bullet_exchange_interface::transaction::Transaction,
345        id: Option<RequestId>,
346    ) -> Result<(), WSErrors> {
347        let base64 =
348            crate::Transaction::to_base64(signed).map_err(|e| WSErrors::WsError(e.to_string()))?;
349        self.order_place(base64, id).map_err(|e| WSErrors::WsError(e.to_string()))
350    }
351
352    /// Cancel an order using a signed [`Transaction`]. Base64-encodes internally.
353    ///
354    /// [`Transaction`]: bullet_exchange_interface::transaction::Transaction
355    pub fn cancel_order(
356        &self,
357        signed: &bullet_exchange_interface::transaction::Transaction,
358        id: Option<RequestId>,
359    ) -> Result<(), WSErrors> {
360        let base64 =
361            crate::Transaction::to_base64(signed).map_err(|e| WSErrors::WsError(e.to_string()))?;
362        self.order_cancel(base64, id).map_err(|e| WSErrors::WsError(e.to_string()))
363    }
364
365    /// Stop the managed WebSocket and its background task.
366    ///
367    /// After this returns the background task has been signaled; it will
368    /// terminate at its next await point without draining pending commands.
369    /// The event stream will end (`recv()` returns `None`) shortly after.
370    pub fn stop(self) {
371        // Drop self — `_shutdown_tx` is dropped, closing the oneshot. Task sees
372        // the signal and exits without going through the cmd queue.
373    }
374
375    fn try_send_cmd(&self, cmd: WsCommand) -> Result<(), ManagedWsError> {
376        // `Sender::clone` is an Arc bump, so try_send (which needs &mut self)
377        // can be called without requiring `&mut self` on the public API.
378        let mut tx = self.cmd_tx.clone();
379        tx.try_send(cmd)
380            .map_err(|e| if e.is_full() { ManagedWsError::Busy } else { ManagedWsError::Stopped })
381    }
382}
383
384// `mpsc::Sender`/`Receiver` are `Send`, and `oneshot::Sender<()>` is `Send + Sync`.
385// The handle is explicitly `Send + Sync` on all targets so callers can share it
386// across async tasks without a `Mutex`.
387
388/// Convenience wrapper on [`Client`] that forwards to [`ManagedWebsocket::connect`].
389///
390/// Kept as a thin helper so the common case (`client.connect_ws_managed()`) is
391/// discoverable; the real dependency still flows `managed → client`.
392#[bon]
393impl Client {
394    #[builder]
395    pub async fn connect_ws_managed(
396        &self,
397        config: Option<ManagedWsConfig>,
398    ) -> Result<ManagedWebsocket, WSErrors> {
399        match config {
400            Some(c) => ManagedWebsocket::connect_with(self, c).await,
401            None => ManagedWebsocket::connect(self).await,
402        }
403    }
404}
405
406/// Minimal client data needed by the background task for reconnection.
407///
408/// Constructed via [`ManagedWsClient::from_client`] so `Client` has no
409/// compile-time dependency on this struct.
410struct ManagedWsClient {
411    ws_client: reqwest::Client,
412    ws_url: String,
413}
414
415impl ManagedWsClient {
416    fn from_client(client: &Client) -> Self {
417        Self { ws_client: client.ws_client.clone(), ws_url: client.ws_url().to_string() }
418    }
419
420    async fn connect(
421        &self,
422        ws_config: &Option<WebsocketConfig>,
423    ) -> Result<WebsocketHandle, WSErrors> {
424        let timeout = ws_config
425            .as_ref()
426            .map(|c| c.connection_timeout)
427            .unwrap_or(web_time::Duration::from_secs(10));
428        WebsocketHandle::connect(&self.ws_client, &self.ws_url, timeout).await
429    }
430}
431
432/// Spawn a background future on the target's executor.
433///
434/// Native uses [`tokio::spawn`] (requires `Send`); wasm uses
435/// [`wasm_bindgen_futures::spawn_local`] (no `Send` required since JS is
436/// single-threaded).
437#[cfg(not(target_arch = "wasm32"))]
438fn spawn<F>(fut: F)
439where
440    F: std::future::Future<Output = ()> + Send + 'static,
441{
442    tokio::spawn(fut);
443}
444
445#[cfg(target_arch = "wasm32")]
446fn spawn<F>(fut: F)
447where
448    F: std::future::Future<Output = ()> + 'static,
449{
450    wasm_bindgen_futures::spawn_local(fut);
451}
452
453/// Persistent state carried across reconnect cycles.
454///
455/// The backoff duration persists across disconnect cycles so a zombie that
456/// accepts connections and immediately drops gets proper exponential relief
457/// instead of being hammered at `initial_backoff` forever. It resets to
458/// `initial_backoff` only after a connection has stayed up for
459/// `backoff_reset_after`.
460struct ReconnectState {
461    backoff: Duration,
462    /// When the *current* connection was established. `None` only before the
463    /// very first successful reconnect (the initial connect sets this on
464    /// entry to [`run_managed_ws`]).
465    connected_since: Option<Instant>,
466}
467
468/// Background task that manages the WebSocket lifecycle.
469async fn run_managed_ws(
470    client: ManagedWsClient,
471    mut ws: WebsocketHandle,
472    config: ManagedWsConfig,
473    mut event_tx: mpsc::Sender<WsEvent>,
474    mut cmd_rx: mpsc::Receiver<WsCommand>,
475    mut shutdown_rx: oneshot::Receiver<()>,
476) {
477    let mut active_topics: HashSet<String> = HashSet::new();
478    let mut last_msg = Instant::now();
479    let mut state = ReconnectState {
480        backoff: config.initial_backoff.max(MIN_BACKOFF),
481        connected_since: Some(Instant::now()),
482    };
483
484    /// One completed branch of the per-iteration select.
485    ///
486    /// `Recv` boxes the server message so this enum stays small — `ServerMessage`
487    /// is ~340 bytes and sits on the stack every iteration otherwise.
488    enum Branch {
489        Shutdown,
490        Recv(Result<Box<ServerMessage>, WSErrors>),
491        Cmd(Option<WsCommand>),
492        Idle,
493    }
494
495    loop {
496        // Idle-timeout future: fires if no server-pushed message arrives within
497        // the window. `Duration::ZERO` disables the timer.
498        let idle_remaining = if config.idle_timeout.is_zero() {
499            None
500        } else {
501            Some(config.idle_timeout.saturating_sub(last_msg.elapsed()))
502        };
503
504        // Run the select in its own scope so the fused recv/cmd futures (which
505        // hold `&mut ws` / `&mut cmd_rx`) are dropped before we touch those
506        // receivers again in the match arms below.
507        let branch = {
508            let recv_fut = ws.recv().fuse();
509            let cmd_fut = cmd_rx.next().fuse();
510            let idle_fut = match idle_remaining {
511                Some(d) => Either::Left(Delay::new(d)),
512                None => Either::Right(pending::<()>()),
513            }
514            .fuse();
515            futures::pin_mut!(recv_fut, cmd_fut, idle_fut);
516
517            futures::select! {
518                _ = (&mut shutdown_rx).fuse() => Branch::Shutdown,
519                r = recv_fut => Branch::Recv(r.map(Box::new)),
520                c = cmd_fut => Branch::Cmd(c),
521                _ = idle_fut => Branch::Idle,
522            }
523        };
524
525        match branch {
526            Branch::Shutdown => {
527                debug!("shutdown signaled, stopping managed ws");
528                return;
529            }
530            Branch::Recv(Ok(msg)) => {
531                // Server proved it's alive — reset the idle timer.
532                last_msg = Instant::now();
533                match event_tx.try_send(WsEvent::Message(msg)) {
534                    Ok(()) => {}
535                    Err(e) if e.is_full() => {
536                        warn!("event channel full, dropping message — consumer too slow");
537                    }
538                    Err(_) => {
539                        debug!("event receiver dropped, stopping managed ws");
540                        return;
541                    }
542                }
543            }
544            Branch::Recv(Err(e)) => {
545                match &e {
546                    WSErrors::WsClosed { code, reason } => {
547                        warn!(?code, %reason, "WebSocket disconnected, reconnecting");
548                    }
549                    WSErrors::WsStreamEnded => {
550                        warn!("WebSocket stream ended, reconnecting");
551                    }
552                    _ => {
553                        warn!(?e, "WebSocket error, reconnecting");
554                    }
555                }
556                if do_reconnect(
557                    &client,
558                    &config,
559                    &active_topics,
560                    &mut event_tx,
561                    &mut ws,
562                    &mut shutdown_rx,
563                    &mut state,
564                )
565                .await
566                {
567                    return;
568                }
569                last_msg = Instant::now();
570            }
571            Branch::Idle => {
572                let elapsed = last_msg.elapsed();
573                warn!(?elapsed, "no server messages within idle timeout, forcing reconnect");
574                if do_reconnect(
575                    &client,
576                    &config,
577                    &active_topics,
578                    &mut event_tx,
579                    &mut ws,
580                    &mut shutdown_rx,
581                    &mut state,
582                )
583                .await
584                {
585                    return;
586                }
587                last_msg = Instant::now();
588            }
589            Branch::Cmd(Some(WsCommand::Subscribe(params, id))) => {
590                // Dedup: only send for topics we aren't already subscribed to.
591                // The server may reject duplicates with an unhelpful error, and
592                // the topic set is the source of truth for replay.
593                let new_params: Vec<String> =
594                    params.into_iter().filter(|p| active_topics.insert(p.clone())).collect();
595                if new_params.is_empty() {
596                    debug!("subscribe: all topics already active, skipping wire send");
597                } else if let Err(e) =
598                    ws.send(ClientMessage::Subscribe { id, params: new_params }).await
599                {
600                    debug!(?e, "subscribe send failed, will replay after reconnect");
601                }
602            }
603            Branch::Cmd(Some(WsCommand::Unsubscribe(params, id))) => {
604                // Dedup: only send for topics we're actually subscribed to.
605                let to_send: Vec<String> =
606                    params.into_iter().filter(|p| active_topics.remove(p)).collect();
607                if to_send.is_empty() {
608                    debug!("unsubscribe: no matching active topics, skipping wire send");
609                } else if let Err(e) =
610                    ws.send(ClientMessage::Unsubscribe { id, params: to_send }).await
611                {
612                    debug!(?e, "unsubscribe send failed");
613                }
614            }
615            Branch::Cmd(Some(WsCommand::Send(msg))) => {
616                if let Err(e) = ws.send(msg.clone()).await {
617                    warn!(?e, "failed to send order message, reconnecting");
618                    if do_reconnect(
619                        &client,
620                        &config,
621                        &active_topics,
622                        &mut event_tx,
623                        &mut ws,
624                        &mut shutdown_rx,
625                        &mut state,
626                    )
627                    .await
628                    {
629                        return;
630                    }
631                    // Retry the message once on the new connection. If it fails
632                    // again the connection is still broken and do_reconnect will
633                    // run again on the next loop iteration.
634                    if let Err(e) = ws.send(msg).await {
635                        warn!(?e, "retry after reconnect also failed");
636                    }
637                    last_msg = Instant::now();
638                }
639            }
640            Branch::Cmd(None) => {
641                debug!("command channel closed, stopping managed ws");
642                return;
643            }
644        }
645    }
646}
647
648/// Handle reconnection. Returns `true` if the task should stop.
649///
650/// Emits `WsEvent::Reconnecting`, reuses/resets backoff per
651/// [`ReconnectState`], and on success updates `state.connected_since` and
652/// replaces `*ws` with the new handle.
653async fn do_reconnect(
654    client: &ManagedWsClient,
655    config: &ManagedWsConfig,
656    active_topics: &HashSet<String>,
657    event_tx: &mut mpsc::Sender<WsEvent>,
658    ws: &mut WebsocketHandle,
659    shutdown_rx: &mut oneshot::Receiver<()>,
660    state: &mut ReconnectState,
661) -> bool {
662    match event_tx.try_send(WsEvent::Reconnecting) {
663        Ok(()) => {}
664        Err(e) if e.is_full() => {}
665        Err(_) => return true,
666    }
667
668    // If the previous connection was stable for long enough, reset the
669    // exponential backoff. Otherwise carry it forward so we don't hammer a
670    // zombie that accepts-then-drops at `initial_backoff` forever.
671    if let Some(t) = state.connected_since
672        && t.elapsed() >= config.backoff_reset_after
673    {
674        debug!(
675            uptime = ?t.elapsed(),
676            "previous connection was stable; resetting backoff"
677        );
678        state.backoff = config.initial_backoff.max(MIN_BACKOFF);
679    }
680    state.connected_since = None;
681
682    match reconnect(client, config, active_topics, event_tx, shutdown_rx, state).await {
683        Ok(new_ws) => {
684            *ws = new_ws;
685            state.connected_since = Some(Instant::now());
686            info!("reconnected successfully");
687            false
688        }
689        Err(ReconnectError::HandleDropped) => true,
690        Err(err) => {
691            let _ = event_tx.try_send(WsEvent::Disconnected(err.to_string()));
692            true
693        }
694    }
695}
696
697/// Reconnect with exponential backoff + jitter and replay subscriptions.
698///
699/// Observes `shutdown_rx` during every sleep and between connect attempts so
700/// dropping the handle terminates the loop promptly. `state.backoff` is
701/// mutated in place so growth persists across calls (see [`do_reconnect`]).
702async fn reconnect(
703    client: &ManagedWsClient,
704    config: &ManagedWsConfig,
705    active_topics: &HashSet<String>,
706    event_tx: &mpsc::Sender<WsEvent>,
707    shutdown_rx: &mut oneshot::Receiver<()>,
708    state: &mut ReconnectState,
709) -> Result<WebsocketHandle, ReconnectError> {
710    let max_backoff = config.max_backoff.max(MIN_BACKOFF);
711    let mut attempts = 0u32;
712
713    loop {
714        if shutdown_observed(shutdown_rx) || event_tx.is_closed() {
715            return Err(ReconnectError::HandleDropped);
716        }
717
718        attempts += 1;
719        if let Some(max) = config.max_retries
720            && attempts > max
721        {
722            return Err(ReconnectError::RetriesExhausted(max));
723        }
724
725        // Jitter: add 0..50% of backoff to avoid thundering herd.
726        let jitter_ms = rand::random::<u64>() % (state.backoff.as_millis() as u64 / 2 + 1);
727        let delay = state.backoff + Duration::from_millis(jitter_ms);
728
729        info!(attempt = attempts, delay = ?delay, backoff = ?state.backoff, "attempting reconnect");
730
731        match future::select(Delay::new(delay), &mut *shutdown_rx).await {
732            Either::Left(_) => {}
733            Either::Right(_) => return Err(ReconnectError::HandleDropped),
734        }
735
736        let connect_fut = client.connect(&config.ws_config);
737        let connect_result = match future::select(Box::pin(connect_fut), &mut *shutdown_rx).await {
738            Either::Left((r, _)) => r,
739            Either::Right(_) => return Err(ReconnectError::HandleDropped),
740        };
741
742        match connect_result {
743            Ok(mut ws) => {
744                if !active_topics.is_empty() {
745                    let params: Vec<String> = active_topics.iter().cloned().collect();
746                    debug!(count = params.len(), "replaying subscriptions");
747                    if let Err(e) = ws.send(ClientMessage::Subscribe { id: None, params }).await {
748                        // Distinguish protocol errors (bad topic, oversize) from
749                        // transport errors (connection died mid-replay).
750                        if matches!(&e, WSErrors::WsStreamEnded | WSErrors::WsClosed { .. }) {
751                            warn!(?e, "replay send lost connection, retrying");
752                            state.backoff = (state.backoff * 2).min(max_backoff);
753                            continue;
754                        }
755                        return Err(ReconnectError::ReplayFailed(e));
756                    }
757                }
758                return Ok(ws);
759            }
760            Err(e) => {
761                warn!(?e, attempt = attempts, "reconnect failed");
762                state.backoff = (state.backoff * 2).min(max_backoff);
763            }
764        }
765    }
766}
767
768/// Returns `true` if shutdown has been signaled (sender sent, dropped, or the
769/// receiver is otherwise resolved).
770fn shutdown_observed(rx: &mut oneshot::Receiver<()>) -> bool {
771    !matches!(rx.try_recv(), Ok(None))
772}