Skip to main content

ws_reconnect_client/
websocket.rs

1use backon::{ExponentialBuilder, Retryable};
2use futures_util::StreamExt;
3use std::time::Duration;
4use tokio_tungstenite::connect_async;
5
6use crate::{Result, WebSocketError, WsConnectionConfig, WsWriter, WsReader};
7
8/// Establishes a websocket connection with exponential backoff retry
9pub async fn connect_with_retry(config: &WsConnectionConfig) -> Result<(WsWriter, WsReader)> {
10    let backoff = ExponentialBuilder::default()
11        .with_min_delay(Duration::from_millis(config.initial_backoff_ms))
12        .with_max_delay(Duration::from_millis(config.max_backoff_ms))
13        .with_max_times(config.max_retries);
14
15    let connect_fn = || async {
16        connect_async(&config.url)
17            .await
18            .map(|(ws_stream, _)| ws_stream)
19            .map_err(|e| WebSocketError::ConnectionFailed(e.to_string()))
20    };
21
22    let ws_stream = connect_fn
23        .retry(backoff)
24        .await
25        .map_err(|_| WebSocketError::RetriesExhausted)?;
26
27    let (writer, reader) = ws_stream.split();
28    Ok((writer, reader))
29}
30
31
32/// WebSocket that automatically retries on connection failure
33pub struct WebSocket {
34    config: WsConnectionConfig,
35    writer: Option<WsWriter>,
36    reader: Option<WsReader>,
37}
38
39impl WebSocket {
40    pub fn new(config: WsConnectionConfig) -> Self {
41        Self {
42            config,
43            writer: None,
44            reader: None,
45        }
46    }
47
48    /// Connect or reconnect to the websocket
49    pub async fn connect(&mut self) -> Result<()> {
50        let (writer, reader) = connect_with_retry(&self.config).await?;
51        self.writer = Some(writer);
52        self.reader = Some(reader);
53        Ok(())
54    }
55
56    /// Get mutable reference to the writer
57    pub fn writer(&mut self) -> Result<&mut WsWriter> {
58        self.writer
59            .as_mut()
60            .ok_or(WebSocketError::NotConnected)
61    }
62
63    /// Get mutable reference to the reader
64    pub fn reader(&mut self) -> Result<&mut WsReader> {
65        self.reader
66            .as_mut()
67            .ok_or(WebSocketError::NotConnected)
68    }
69
70    /// Take ownership of writer and reader, consuming the connection
71    pub fn split(mut self) -> Result<(WsWriter, WsReader)> {
72        let writer = self
73            .writer
74            .take()
75            .ok_or(WebSocketError::WriterNotAvailable)?;
76        let reader = self
77            .reader
78            .take()
79            .ok_or(WebSocketError::ReaderNotAvailable)?;
80        Ok((writer, reader))
81    }
82}