simulator_client/managed/
mod.rs1use std::time::Duration;
13
14use rand::Rng;
15use tokio_util::sync::CancellationToken;
16
17mod control;
18mod session;
19mod subscription;
20
21pub use control::{ControlEvent, ControlHandle, spawn_control_manager};
22pub use session::{ManagedBacktestSession, ManagedEvent, ManagedSessionError};
23pub use subscription::{
24 SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
25 spawn_transaction_subscription_manager,
26};
27
28pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
30
31pub const HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
42
43pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);
45
46pub const KEEPALIVE_MISS_DEADLINE: Duration = Duration::from_secs(45);
49
50pub const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);
52
53pub const RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
54pub const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
55pub const RECONNECT_BACKOFF_MULTIPLIER: f64 = 2.0;
56pub const RECONNECT_JITTER: f64 = 0.2;
57pub const RECONNECT_MAX_TOTAL: Duration = Duration::from_secs(5 * 60);
58pub const RECONNECT_MAX_ATTEMPTS: u32 = 20;
59
60pub const RECONNECT_UPTIME_RESET: Duration = Duration::from_secs(30);
62
63#[derive(Clone, Debug, PartialEq, Eq)]
68pub enum ConnectionStatus {
69 Up,
70 Down,
71 Failed(String),
72}
73
74#[derive(Clone, Debug)]
76pub struct SessionInfo {
77 pub session_id: String,
78 pub rpc_endpoint: String,
79 pub task_id: Option<String>,
81}
82
83pub(crate) struct ReconnectBudget {
85 attempts: u32,
86 started_at: std::time::Instant,
87 current_backoff: Duration,
88}
89
90impl ReconnectBudget {
91 pub fn new() -> Self {
92 Self {
93 attempts: 0,
94 started_at: std::time::Instant::now(),
95 current_backoff: RECONNECT_INITIAL_BACKOFF,
96 }
97 }
98
99 pub fn reset(&mut self) {
100 self.attempts = 0;
101 self.started_at = std::time::Instant::now();
102 self.current_backoff = RECONNECT_INITIAL_BACKOFF;
103 }
104
105 pub fn attempt(&self) -> u32 {
106 self.attempts
107 }
108
109 pub fn next_backoff(&mut self) -> Option<Duration> {
112 if self.attempts >= RECONNECT_MAX_ATTEMPTS
113 || self.started_at.elapsed() >= RECONNECT_MAX_TOTAL
114 {
115 return None;
116 }
117 self.attempts += 1;
118 let backoff = with_jitter(self.current_backoff);
119 self.current_backoff = std::cmp::min(
120 RECONNECT_MAX_BACKOFF,
121 Duration::from_secs_f64(
122 self.current_backoff.as_secs_f64() * RECONNECT_BACKOFF_MULTIPLIER,
123 ),
124 );
125 Some(backoff)
126 }
127}
128
129fn with_jitter(d: Duration) -> Duration {
130 let jitter = rand::rng().random_range(-RECONNECT_JITTER..RECONNECT_JITTER);
131 let secs = (d.as_secs_f64() * (1.0 + jitter)).max(0.0);
132 Duration::from_secs_f64(secs)
133}
134
135pub(crate) async fn cancellable_sleep(delay: Duration, cancel: &CancellationToken) -> bool {
138 tokio::select! {
139 _ = tokio::time::sleep(delay) => true,
140 _ = cancel.cancelled() => false,
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn budget_exhausts_after_max_attempts() {
150 let mut b = ReconnectBudget::new();
151 for _ in 0..RECONNECT_MAX_ATTEMPTS {
152 assert!(b.next_backoff().is_some());
153 }
154 assert!(b.next_backoff().is_none());
155 }
156
157 #[test]
158 fn budget_reset_restores_full_budget() {
159 let mut b = ReconnectBudget::new();
160 b.next_backoff();
161 b.next_backoff();
162 b.reset();
163 assert_eq!(b.attempt(), 0);
164 }
165}