1use async_trait::async_trait;
2use std::time::Duration;
3use tracing::{info, warn};
4
5use tokio::{
6 net::TcpStream,
7 time::{Instant, Interval, interval_at, sleep, timeout},
8};
9
10use tokio_tungstenite::{
11 MaybeTlsStream, WebSocketStream, connect_async_with_config,
12 tungstenite::{Error, Message},
13};
14
15pub(crate) const HEARTBEAT_SECS: u64 = 60;
17
18pub(crate) const PING_INTERVAL_SECS: u64 = 60;
20
21pub(crate) const PING_TIMEOUT_SECS: u64 = 50;
23
24const CONNECT_TIMEOUT_SECS: u64 = 2;
26
27const BACKOFF_MS_BASE: u64 = 500;
29
30const MAX_BACKOFF_SECS: u64 = 60;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ConnectStrategy {
36 Simple,
38 Retry,
40 AlternateWithRetry,
42}
43
44#[async_trait]
45pub(crate) trait PlantActor {
46 type Command;
47
48 async fn run(&mut self);
49 async fn handle_command(&mut self, command: Self::Command);
50 async fn handle_rithmic_message(&mut self, message: Result<Message, Error>)
51 -> Result<bool, ()>;
52}
53
54pub(crate) fn get_heartbeat_interval(override_secs: Option<u64>) -> Interval {
55 let secs = override_secs.unwrap_or(HEARTBEAT_SECS);
56 let heartbeat_interval = Duration::from_secs(secs);
57 let start_offset = Instant::now() + heartbeat_interval;
58
59 interval_at(start_offset, heartbeat_interval)
60}
61
62pub(crate) fn get_ping_interval(override_secs: Option<u64>) -> Interval {
66 let secs = override_secs.unwrap_or(PING_INTERVAL_SECS);
67 let ping_interval = Duration::from_secs(secs);
68 let start_offset = Instant::now() + ping_interval;
69
70 interval_at(start_offset, ping_interval)
71}
72
73async fn connect(url: &str) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
81 info!("Connecting to {}", url);
82
83 let (ws_stream, _) = connect_async_with_config(url, None, true).await?;
84
85 info!("Successfully connected to {}", url);
86
87 Ok(ws_stream)
88}
89
90async fn connect_with_retry_single_url(
101 url: &str,
102) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
103 let mut attempt: u64 = 1;
104
105 loop {
106 info!("Attempt {}: connecting to {}", attempt, url);
107
108 match timeout(
109 Duration::from_secs(CONNECT_TIMEOUT_SECS),
110 connect_async_with_config(url, None, true),
111 )
112 .await
113 {
114 Ok(Ok((ws_stream, _))) => {
115 info!("Successfully connected to {}", url);
116 return Ok(ws_stream);
117 }
118 Ok(Err(e)) => warn!("connect_async failed for {}: {:?}", url, e),
119 Err(e) => warn!("connect_async to {} timed out: {:?}", url, e),
120 }
121
122 let backoff_ms: u64 = BACKOFF_MS_BASE * attempt;
123 let backoff_duration =
124 Duration::from_millis(backoff_ms).min(Duration::from_secs(MAX_BACKOFF_SECS));
125
126 info!("Backing off for {:?} before retry", backoff_duration);
127
128 sleep(backoff_duration).await;
129 attempt += 1;
130 }
131}
132
133async fn connect_with_retry(
145 primary_url: &str,
146 secondary_url: &str,
147) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
148 let mut attempt: u64 = 1;
149
150 loop {
151 let selected_url = if attempt == 1 {
152 primary_url
153 } else if attempt.is_multiple_of(2) {
154 secondary_url
155 } else {
156 primary_url
157 };
158
159 info!("Attempt {}: connecting to {}", attempt, selected_url);
160
161 match timeout(
162 Duration::from_secs(CONNECT_TIMEOUT_SECS),
163 connect_async_with_config(selected_url, None, true),
164 )
165 .await
166 {
167 Ok(Ok((ws_stream, _))) => return Ok(ws_stream),
168 Ok(Err(e)) => warn!("connect_async failed for {}: {:?}", selected_url, e),
169 Err(e) => warn!("connect_async to {} timed out: {:?}", selected_url, e),
170 }
171
172 let backoff_ms: u64 = BACKOFF_MS_BASE * attempt;
173 let backoff_duration =
174 Duration::from_millis(backoff_ms).min(Duration::from_secs(MAX_BACKOFF_SECS));
175
176 info!("Backing off for {:?} before retry", backoff_duration);
177
178 sleep(backoff_duration).await;
179 attempt += 1;
180 }
181}
182
183pub(crate) async fn connect_with_strategy(
193 primary_url: &str,
194 beta_url: &str,
195 strategy: ConnectStrategy,
196) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
197 match strategy {
198 ConnectStrategy::Simple => connect(primary_url).await,
199 ConnectStrategy::Retry => connect_with_retry_single_url(primary_url).await,
200 ConnectStrategy::AlternateWithRetry => connect_with_retry(primary_url, beta_url).await,
201 }
202}