Skip to main content

simulator_client/managed/
mod.rs

1//! Connection-managed wrappers over the backtest WebSocket protocol.
2//!
3//! Most callers should use [`ManagedBacktestSession`]. It owns the control and
4//! subscription manager tasks, waits for session creation, gates `Continue`
5//! sends on live connections, and provides one shutdown path.
6//!
7//! The control and subscription WebSockets are each owned by a dedicated task
8//! that handles its own lifecycle: connect, handshake, keepalive, reconnect.
9//! Workload code interacts via channels and a status watcher, never with the
10//! WebSocket directly.
11
12use std::time::Duration;
13
14use rand::Rng;
15use tokio_util::sync::CancellationToken;
16
17mod control;
18mod session;
19mod subscription;
20
21pub use control::{ControlEvent, ControlHandle, spawn_control_manager};
22pub use session::{ManagedBacktestSession, ManagedEvent, ManagedSessionError};
23pub use subscription::{
24    SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
25    spawn_transaction_subscription_manager,
26};
27
28/// Timeout for the initial WebSocket connect (TCP + TLS + HTTP upgrade).
29pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31/// Timeout for any single response during a handshake (Create, Attach, Resume,
32/// subscribe ack). Does not apply to per-message reads during an established
33/// session — those are bounded by ping/pong liveness instead.
34///
35/// Set generously because the management service can park subscribe upgrades
36/// for the duration of session startup (observed ~100s on cold runtimes); a
37/// shorter timeout fires before management can forward the subscribe message
38/// and gets reported as "subscribe ack timeout" even though the session is
39/// fine. The proper fix is server-side (reject parked upgrades cleanly), but
40/// this keeps the client from giving up early in the meantime.
41pub const HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
42
43/// How often to send a WebSocket ping during an established connection.
44pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);
45
46/// How long without inbound traffic before declaring the connection dead.
47/// Roughly three missed pings.
48pub const KEEPALIVE_MISS_DEADLINE: Duration = Duration::from_secs(45);
49
50/// Timeout for the graceful close handshake at end-of-session.
51pub const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);
52
53pub const RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
54pub const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
55pub const RECONNECT_BACKOFF_MULTIPLIER: f64 = 2.0;
56pub const RECONNECT_JITTER: f64 = 0.2;
57pub const RECONNECT_MAX_TOTAL: Duration = Duration::from_secs(5 * 60);
58pub const RECONNECT_MAX_ATTEMPTS: u32 = 20;
59
60/// A connection that stays `Up` for this long resets the reconnect counter.
61pub const RECONNECT_UPTIME_RESET: Duration = Duration::from_secs(30);
62
63/// Connection state as observed from outside the manager task.
64///
65/// `Down` is a transient "currently not connected, manager is retrying"
66/// state. `Failed` is terminal; the retry budget was exhausted.
67#[derive(Clone, Debug, PartialEq, Eq)]
68pub enum ConnectionStatus {
69    Up,
70    Down,
71    Failed(String),
72}
73
74/// Identity of a created backtest session.
75#[derive(Clone, Debug)]
76pub struct SessionInfo {
77    pub session_id: String,
78    pub rpc_endpoint: String,
79    /// Opaque `task_id` reported by the server for this session, if any.
80    pub task_id: Option<String>,
81}
82
83/// Tracks reconnect attempts and enforces the reconnect policy.
84pub(crate) struct ReconnectBudget {
85    attempts: u32,
86    started_at: std::time::Instant,
87    current_backoff: Duration,
88}
89
90impl ReconnectBudget {
91    pub fn new() -> Self {
92        Self {
93            attempts: 0,
94            started_at: std::time::Instant::now(),
95            current_backoff: RECONNECT_INITIAL_BACKOFF,
96        }
97    }
98
99    pub fn reset(&mut self) {
100        self.attempts = 0;
101        self.started_at = std::time::Instant::now();
102        self.current_backoff = RECONNECT_INITIAL_BACKOFF;
103    }
104
105    pub fn attempt(&self) -> u32 {
106        self.attempts
107    }
108
109    /// Record an attempt and return its backoff, or `None` if the budget is
110    /// exhausted.
111    pub fn next_backoff(&mut self) -> Option<Duration> {
112        if self.attempts >= RECONNECT_MAX_ATTEMPTS
113            || self.started_at.elapsed() >= RECONNECT_MAX_TOTAL
114        {
115            return None;
116        }
117        self.attempts += 1;
118        let backoff = with_jitter(self.current_backoff);
119        self.current_backoff = std::cmp::min(
120            RECONNECT_MAX_BACKOFF,
121            Duration::from_secs_f64(
122                self.current_backoff.as_secs_f64() * RECONNECT_BACKOFF_MULTIPLIER,
123            ),
124        );
125        Some(backoff)
126    }
127}
128
129fn with_jitter(d: Duration) -> Duration {
130    let jitter = rand::rng().random_range(-RECONNECT_JITTER..RECONNECT_JITTER);
131    let secs = (d.as_secs_f64() * (1.0 + jitter)).max(0.0);
132    Duration::from_secs_f64(secs)
133}
134
135/// Sleep for `delay`, returning early on cancellation. Returns `true` if the
136/// sleep completed, `false` if cancelled.
137pub(crate) async fn cancellable_sleep(delay: Duration, cancel: &CancellationToken) -> bool {
138    tokio::select! {
139        _ = tokio::time::sleep(delay) => true,
140        _ = cancel.cancelled() => false,
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn budget_exhausts_after_max_attempts() {
150        let mut b = ReconnectBudget::new();
151        for _ in 0..RECONNECT_MAX_ATTEMPTS {
152            assert!(b.next_backoff().is_some());
153        }
154        assert!(b.next_backoff().is_none());
155    }
156
157    #[test]
158    fn budget_reset_restores_full_budget() {
159        let mut b = ReconnectBudget::new();
160        b.next_backoff();
161        b.next_backoff();
162        b.reset();
163        assert_eq!(b.attempt(), 0);
164    }
165}