use std::time::Duration;
use rand::Rng;
use tokio_util::sync::CancellationToken;
mod control;
mod session;
mod subscription;
pub use control::{ControlEvent, ControlHandle, spawn_control_manager};
pub use session::{ManagedBacktestSession, ManagedEvent, ManagedSessionError};
pub use subscription::{
SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
spawn_transaction_subscription_manager,
};
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
pub const HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);
pub const KEEPALIVE_MISS_DEADLINE: Duration = Duration::from_secs(45);
pub const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);
pub const RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
pub const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
pub const RECONNECT_BACKOFF_MULTIPLIER: f64 = 2.0;
pub const RECONNECT_JITTER: f64 = 0.2;
pub const RECONNECT_MAX_TOTAL: Duration = Duration::from_secs(5 * 60);
pub const RECONNECT_MAX_ATTEMPTS: u32 = 20;
pub const RECONNECT_UPTIME_RESET: Duration = Duration::from_secs(30);
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionStatus {
Up,
Down,
Failed(String),
}
#[derive(Clone, Debug)]
pub struct SessionInfo {
pub session_id: String,
pub rpc_endpoint: String,
pub task_id: Option<String>,
}
pub(crate) struct ReconnectBudget {
attempts: u32,
started_at: std::time::Instant,
current_backoff: Duration,
}
impl ReconnectBudget {
pub fn new() -> Self {
Self {
attempts: 0,
started_at: std::time::Instant::now(),
current_backoff: RECONNECT_INITIAL_BACKOFF,
}
}
pub fn reset(&mut self) {
self.attempts = 0;
self.started_at = std::time::Instant::now();
self.current_backoff = RECONNECT_INITIAL_BACKOFF;
}
pub fn attempt(&self) -> u32 {
self.attempts
}
pub fn next_backoff(&mut self) -> Option<Duration> {
if self.attempts >= RECONNECT_MAX_ATTEMPTS
|| self.started_at.elapsed() >= RECONNECT_MAX_TOTAL
{
return None;
}
self.attempts += 1;
let backoff = with_jitter(self.current_backoff);
self.current_backoff = std::cmp::min(
RECONNECT_MAX_BACKOFF,
Duration::from_secs_f64(
self.current_backoff.as_secs_f64() * RECONNECT_BACKOFF_MULTIPLIER,
),
);
Some(backoff)
}
}
fn with_jitter(d: Duration) -> Duration {
let jitter = rand::rng().random_range(-RECONNECT_JITTER..RECONNECT_JITTER);
let secs = (d.as_secs_f64() * (1.0 + jitter)).max(0.0);
Duration::from_secs_f64(secs)
}
pub(crate) async fn cancellable_sleep(delay: Duration, cancel: &CancellationToken) -> bool {
tokio::select! {
_ = tokio::time::sleep(delay) => true,
_ = cancel.cancelled() => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn budget_exhausts_after_max_attempts() {
let mut b = ReconnectBudget::new();
for _ in 0..RECONNECT_MAX_ATTEMPTS {
assert!(b.next_backoff().is_some());
}
assert!(b.next_backoff().is_none());
}
#[test]
fn budget_reset_restores_full_budget() {
let mut b = ReconnectBudget::new();
b.next_backoff();
b.next_backoff();
b.reset();
assert_eq!(b.attempt(), 0);
}
}