ws_reconnect_client/
websocket.rs1use backon::{ExponentialBuilder, Retryable};
2use futures_util::StreamExt;
3use std::time::Duration;
4use tokio_tungstenite::connect_async;
5
6use crate::{Result, WebSocketError, WsConnectionConfig, WsWriter, WsReader};
7
8pub async fn connect_with_retry(config: &WsConnectionConfig) -> Result<(WsWriter, WsReader)> {
10 let backoff = ExponentialBuilder::default()
11 .with_min_delay(Duration::from_millis(config.initial_backoff_ms))
12 .with_max_delay(Duration::from_millis(config.max_backoff_ms))
13 .with_max_times(config.max_retries);
14
15 let connect_fn = || async {
16 connect_async(&config.url)
17 .await
18 .map(|(ws_stream, _)| ws_stream)
19 .map_err(|e| WebSocketError::ConnectionFailed(e.to_string()))
20 };
21
22 let ws_stream = connect_fn
23 .retry(backoff)
24 .await
25 .map_err(|_| WebSocketError::RetriesExhausted)?;
26
27 let (writer, reader) = ws_stream.split();
28 Ok((writer, reader))
29}
30
31
32pub struct WebSocket {
34 config: WsConnectionConfig,
35 writer: Option<WsWriter>,
36 reader: Option<WsReader>,
37}
38
39impl WebSocket {
40 pub fn new(config: WsConnectionConfig) -> Self {
41 Self {
42 config,
43 writer: None,
44 reader: None,
45 }
46 }
47
48 pub async fn connect(&mut self) -> Result<()> {
50 let (writer, reader) = connect_with_retry(&self.config).await?;
51 self.writer = Some(writer);
52 self.reader = Some(reader);
53 Ok(())
54 }
55
56 pub fn writer(&mut self) -> Result<&mut WsWriter> {
58 self.writer
59 .as_mut()
60 .ok_or(WebSocketError::NotConnected)
61 }
62
63 pub fn reader(&mut self) -> Result<&mut WsReader> {
65 self.reader
66 .as_mut()
67 .ok_or(WebSocketError::NotConnected)
68 }
69
70 pub fn split(mut self) -> Result<(WsWriter, WsReader)> {
72 let writer = self
73 .writer
74 .take()
75 .ok_or(WebSocketError::WriterNotAvailable)?;
76 let reader = self
77 .reader
78 .take()
79 .ok_or(WebSocketError::ReaderNotAvailable)?;
80 Ok((writer, reader))
81 }
82}