rithmic_rs/
ws.rs

1use async_trait::async_trait;
2use std::time::Duration;
3use tracing::{info, warn};
4
5use tokio::{
6    net::TcpStream,
7    time::{Instant, Interval, interval_at, sleep, timeout},
8};
9
10use tokio_tungstenite::{
11    MaybeTlsStream, WebSocketStream, connect_async_with_config,
12    tungstenite::{Error, Message},
13};
14
15/// Number of seconds between heartbeats sent to the server.
16pub(crate) const HEARTBEAT_SECS: u64 = 60;
17
18/// Number of seconds between WebSocket ping frames sent to detect dead connections.
19pub(crate) const PING_INTERVAL_SECS: u64 = 60;
20
21/// Timeout in seconds for WebSocket pong response.
22pub(crate) const PING_TIMEOUT_SECS: u64 = 50;
23
24/// Connection attempt timeout in seconds.
25const CONNECT_TIMEOUT_SECS: u64 = 2;
26
27/// Base backoff in milliseconds multiplied by the attempt number.
28const BACKOFF_MS_BASE: u64 = 500;
29
30/// Maximum backoff duration in seconds (rate limit for login attempts).
31const MAX_BACKOFF_SECS: u64 = 60;
32
33/// Connection strategy for connecting to Rithmic servers.
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum ConnectStrategy {
36    /// Single connection attempt. Recommended for most users.
37    Simple,
38    /// Retry same URL indefinitely with exponential backoff (capped at 60s).
39    Retry,
40    /// Alternates between primary and beta URLs indefinitely. Useful when main server has issues.
41    AlternateWithRetry,
42}
43
44#[async_trait]
45pub(crate) trait PlantActor {
46    type Command;
47
48    async fn run(&mut self);
49    async fn handle_command(&mut self, command: Self::Command);
50    async fn handle_rithmic_message(&mut self, message: Result<Message, Error>)
51    -> Result<bool, ()>;
52}
53
54pub(crate) fn get_heartbeat_interval(override_secs: Option<u64>) -> Interval {
55    let secs = override_secs.unwrap_or(HEARTBEAT_SECS);
56    let heartbeat_interval = Duration::from_secs(secs);
57    let start_offset = Instant::now() + heartbeat_interval;
58
59    interval_at(start_offset, heartbeat_interval)
60}
61
62/// Creates an interval for sending WebSocket pings.
63///
64/// Returns an interval starting after the first ping period elapses.
65pub(crate) fn get_ping_interval(override_secs: Option<u64>) -> Interval {
66    let secs = override_secs.unwrap_or(PING_INTERVAL_SECS);
67    let ping_interval = Duration::from_secs(secs);
68    let start_offset = Instant::now() + ping_interval;
69
70    interval_at(start_offset, ping_interval)
71}
72
73/// Connect to a single URL without retry.
74///
75/// # Arguments
76/// * `url` - WebSocket URL to connect to
77///
78/// # Returns
79/// WebSocketStream on success, error on failure.
80async fn connect(url: &str) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
81    info!("Connecting to {}", url);
82
83    let (ws_stream, _) = connect_async_with_config(url, None, true).await?;
84
85    info!("Successfully connected to {}", url);
86
87    Ok(ws_stream)
88}
89
90/// Connect to a single URL with indefinite retry and exponential backoff.
91///
92/// Retries indefinitely with exponential backoff capped at 60 seconds.
93/// This ensures at most one connection attempt per minute after initial ramp-up.
94///
95/// # Arguments
96/// * `url` - WebSocket URL to connect to
97///
98/// # Returns
99/// WebSocketStream on success (never returns error as it retries indefinitely).
100async fn connect_with_retry_single_url(
101    url: &str,
102) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
103    let mut attempt: u64 = 1;
104
105    loop {
106        info!("Attempt {}: connecting to {}", attempt, url);
107
108        match timeout(
109            Duration::from_secs(CONNECT_TIMEOUT_SECS),
110            connect_async_with_config(url, None, true),
111        )
112        .await
113        {
114            Ok(Ok((ws_stream, _))) => {
115                info!("Successfully connected to {}", url);
116                return Ok(ws_stream);
117            }
118            Ok(Err(e)) => warn!("connect_async failed for {}: {:?}", url, e),
119            Err(e) => warn!("connect_async to {} timed out: {:?}", url, e),
120        }
121
122        let backoff_ms: u64 = BACKOFF_MS_BASE * attempt;
123        let backoff_duration =
124            Duration::from_millis(backoff_ms).min(Duration::from_secs(MAX_BACKOFF_SECS));
125
126        info!("Backing off for {:?} before retry", backoff_duration);
127
128        sleep(backoff_duration).await;
129        attempt += 1;
130    }
131}
132
133/// Alternate between primary and beta URLs with indefinite retry.
134///
135/// Retries indefinitely, alternating between primary and beta URLs.
136/// Use when main server has issues. Exponential backoff capped at 60 seconds.
137///
138/// # Arguments
139/// * `primary_url` - Primary WebSocket URL
140/// * `secondary_url` - Beta WebSocket URL (used after first failure)
141///
142/// # Returns
143/// WebSocketStream on success (never returns error as it retries indefinitely).
144async fn connect_with_retry(
145    primary_url: &str,
146    secondary_url: &str,
147) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
148    let mut attempt: u64 = 1;
149
150    loop {
151        let selected_url = if attempt == 1 {
152            primary_url
153        } else if attempt.is_multiple_of(2) {
154            secondary_url
155        } else {
156            primary_url
157        };
158
159        info!("Attempt {}: connecting to {}", attempt, selected_url);
160
161        match timeout(
162            Duration::from_secs(CONNECT_TIMEOUT_SECS),
163            connect_async_with_config(selected_url, None, true),
164        )
165        .await
166        {
167            Ok(Ok((ws_stream, _))) => return Ok(ws_stream),
168            Ok(Err(e)) => warn!("connect_async failed for {}: {:?}", selected_url, e),
169            Err(e) => warn!("connect_async to {} timed out: {:?}", selected_url, e),
170        }
171
172        let backoff_ms: u64 = BACKOFF_MS_BASE * attempt;
173        let backoff_duration =
174            Duration::from_millis(backoff_ms).min(Duration::from_secs(MAX_BACKOFF_SECS));
175
176        info!("Backing off for {:?} before retry", backoff_duration);
177
178        sleep(backoff_duration).await;
179        attempt += 1;
180    }
181}
182
183/// Connect using the specified strategy.
184///
185/// # Arguments
186/// * `primary_url` - Primary WebSocket URL
187/// * `beta_url` - Beta WebSocket URL (only used for AlternateWithRetry)
188/// * `strategy` - Connection strategy to use
189///
190/// # Returns
191/// WebSocketStream on success, error on failure.
192pub(crate) async fn connect_with_strategy(
193    primary_url: &str,
194    beta_url: &str,
195    strategy: ConnectStrategy,
196) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>, Error> {
197    match strategy {
198        ConnectStrategy::Simple => connect(primary_url).await,
199        ConnectStrategy::Retry => connect_with_retry_single_url(primary_url).await,
200        ConnectStrategy::AlternateWithRetry => connect_with_retry(primary_url, beta_url).await,
201    }
202}