simulator-client 0.8.0

Async WebSocket client for the Solana simulator backtest API
Documentation
//! Connection-managed wrappers over the backtest WebSocket protocol.
//!
//! Most callers should use [`ManagedBacktestSession`]. It owns the control and
//! subscription manager tasks, waits for session creation, gates `Continue`
//! sends on live connections, and provides one shutdown path.
//!
//! The control and subscription WebSockets are each owned by a dedicated task
//! that handles its own lifecycle: connect, handshake, keepalive, reconnect.
//! Workload code interacts via channels and a status watcher, never with the
//! WebSocket directly.

use std::time::Duration;

use rand::Rng;
use tokio_util::sync::CancellationToken;

mod control;
mod session;
mod subscription;

pub use control::{ControlEvent, ControlHandle, spawn_control_manager};
pub use session::{ManagedBacktestSession, ManagedEvent, ManagedSessionError};
pub use subscription::{
    SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
    spawn_transaction_subscription_manager,
};

/// Timeout for the initial WebSocket connect (TCP + TLS + HTTP upgrade).
pub const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

/// Timeout for any single response during a handshake (Create, Attach, Resume,
/// subscribe ack). Does not apply to per-message reads during an established
/// session — those are bounded by ping/pong liveness instead.
///
/// Set generously because the management service can park subscribe upgrades
/// for the duration of session startup (observed ~100s on cold runtimes); a
/// shorter timeout fires before management can forward the subscribe message
/// and gets reported as "subscribe ack timeout" even though the session is
/// fine. The proper fix is server-side (reject parked upgrades cleanly), but
/// this keeps the client from giving up early in the meantime.
pub const HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);

/// How often to send a WebSocket ping during an established connection.
pub const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(15);

/// How long without inbound traffic before declaring the connection dead.
/// Roughly three missed pings.
pub const KEEPALIVE_MISS_DEADLINE: Duration = Duration::from_secs(45);

/// Timeout for the graceful close handshake at end-of-session.
pub const GRACEFUL_CLOSE_TIMEOUT: Duration = Duration::from_secs(5);

pub const RECONNECT_INITIAL_BACKOFF: Duration = Duration::from_secs(1);
pub const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(30);
pub const RECONNECT_BACKOFF_MULTIPLIER: f64 = 2.0;
pub const RECONNECT_JITTER: f64 = 0.2;
pub const RECONNECT_MAX_TOTAL: Duration = Duration::from_secs(5 * 60);
pub const RECONNECT_MAX_ATTEMPTS: u32 = 20;

/// A connection that stays `Up` for this long resets the reconnect counter.
pub const RECONNECT_UPTIME_RESET: Duration = Duration::from_secs(30);

/// Connection state as observed from outside the manager task.
///
/// `Down` is a transient "currently not connected, manager is retrying"
/// state. `Failed` is terminal; the retry budget was exhausted.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ConnectionStatus {
    Up,
    Down,
    Failed(String),
}

/// Identity of a created backtest session.
#[derive(Clone, Debug)]
pub struct SessionInfo {
    pub session_id: String,
    pub rpc_endpoint: String,
    /// Opaque `task_id` reported by the server for this session, if any.
    pub task_id: Option<String>,
}

/// Tracks reconnect attempts and enforces the reconnect policy.
pub(crate) struct ReconnectBudget {
    attempts: u32,
    started_at: std::time::Instant,
    current_backoff: Duration,
}

impl ReconnectBudget {
    pub fn new() -> Self {
        Self {
            attempts: 0,
            started_at: std::time::Instant::now(),
            current_backoff: RECONNECT_INITIAL_BACKOFF,
        }
    }

    pub fn reset(&mut self) {
        self.attempts = 0;
        self.started_at = std::time::Instant::now();
        self.current_backoff = RECONNECT_INITIAL_BACKOFF;
    }

    pub fn attempt(&self) -> u32 {
        self.attempts
    }

    /// Record an attempt and return its backoff, or `None` if the budget is
    /// exhausted.
    pub fn next_backoff(&mut self) -> Option<Duration> {
        if self.attempts >= RECONNECT_MAX_ATTEMPTS
            || self.started_at.elapsed() >= RECONNECT_MAX_TOTAL
        {
            return None;
        }
        self.attempts += 1;
        let backoff = with_jitter(self.current_backoff);
        self.current_backoff = std::cmp::min(
            RECONNECT_MAX_BACKOFF,
            Duration::from_secs_f64(
                self.current_backoff.as_secs_f64() * RECONNECT_BACKOFF_MULTIPLIER,
            ),
        );
        Some(backoff)
    }
}

fn with_jitter(d: Duration) -> Duration {
    let jitter = rand::rng().random_range(-RECONNECT_JITTER..RECONNECT_JITTER);
    let secs = (d.as_secs_f64() * (1.0 + jitter)).max(0.0);
    Duration::from_secs_f64(secs)
}

/// Sleep for `delay`, returning early on cancellation. Returns `true` if the
/// sleep completed, `false` if cancelled.
pub(crate) async fn cancellable_sleep(delay: Duration, cancel: &CancellationToken) -> bool {
    tokio::select! {
        _ = tokio::time::sleep(delay) => true,
        _ = cancel.cancelled() => false,
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn budget_exhausts_after_max_attempts() {
        let mut b = ReconnectBudget::new();
        for _ in 0..RECONNECT_MAX_ATTEMPTS {
            assert!(b.next_backoff().is_some());
        }
        assert!(b.next_backoff().is_none());
    }

    #[test]
    fn budget_reset_restores_full_budget() {
        let mut b = ReconnectBudget::new();
        b.next_backoff();
        b.next_backoff();
        b.reset();
        assert_eq!(b.attempt(), 0);
    }
}