simulator-client 0.7.0

Async WebSocket client for the Solana simulator backtest API
Documentation
use std::time::Duration;

use bon::Builder;
use simulator_api::{AvailableRange, BacktestResponse};
use tokio_tungstenite::{
    connect_async,
    tungstenite::{client::IntoClientRequest, http::HeaderValue},
};

use crate::{
    BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
    session::CreateRequestResult, urls::http_base_from_ws_url,
};

const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);

/// Backtest WebSocket client configured with a base URL and API key.
///
/// Build with [`BacktestClient::builder`], which supports optional timeouts and
/// raw message logging.
#[derive(Debug, Clone, Builder)]
#[builder(on(String, into))]
pub struct BacktestClient {
    /// WebSocket endpoint, e.g. `wss://.../backtest`.
    url: String,
    /// API key injected as the `X-API-Key` header.
    #[builder(default)]
    api_key: String,

    /// Timeout for the initial connect handshake.
    #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
    connect_timeout: Duration,

    /// Default timeout for request/response operations.
    request_timeout: Option<Duration>,

    /// Log raw inbound responses at debug level.
    #[builder(default)]
    log_raw: bool,
}

impl BacktestClient {
    async fn connect(&self) -> BacktestClientResult<BacktestSession> {
        let mut request = self.url.clone().into_client_request().map_err(|source| {
            BacktestClientError::BuildRequest {
                url: self.url.clone(),
                source: Box::new(source),
            }
        })?;

        request
            .headers_mut()
            .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);

        let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
            .await
            .map_err(|_| BacktestClientError::Timeout {
                action: "connecting",
                duration: self.connect_timeout,
            })?
            .map_err(|source| BacktestClientError::Connect {
                url: self.url.clone(),
                source: Box::new(source),
            })?;
        Ok(BacktestSession::new(
            stream,
            self.request_timeout,
            self.log_raw,
        ))
    }

    /// Create a backtest session by connecting and sending a `CreateBacktestSession` request.
    pub async fn create_session(
        &self,
        create: CreateSession,
    ) -> BacktestClientResult<BacktestSession> {
        let request = create.into_request()?;
        let rpc_base_url = http_base_from_ws_url(&self.url);
        let mut session = self.connect().await?;
        match session
            .create_with_request(request, rpc_base_url, None)
            .await?
        {
            CreateRequestResult::Single { .. } => {}
            CreateRequestResult::Parallel { session_ids } => {
                return Err(BacktestClientError::UnexpectedResponse {
                    context: "creating single session",
                    response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
                });
            }
        }
        Ok(session)
    }

    /// Create one or many sessions and return the resulting session IDs.
    ///
    /// When `CreateSession.parallel` is true, this may return many IDs.
    pub async fn create_sessions(
        &self,
        create: CreateSession,
    ) -> BacktestClientResult<Vec<String>> {
        self.create_sessions_with_progress(create, |_| {}).await
    }

    /// Create one or many sessions and stream each successfully created session ID.
    ///
    /// `on_session_created` is called for each streamed `SessionCreated` response.
    pub async fn create_sessions_with_progress(
        &self,
        create: CreateSession,
        mut on_session_created: impl FnMut(String) + Send,
    ) -> BacktestClientResult<Vec<String>> {
        let request = create.into_request()?;
        let rpc_base_url = http_base_from_ws_url(&self.url);
        let mut session = self.connect().await?;
        match session
            .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
            .await?
        {
            CreateRequestResult::Single { session_id } => Ok(vec![session_id]),
            CreateRequestResult::Parallel { session_ids } => Ok(session_ids),
        }
    }

    /// Fetch the available slot ranges from the server's `/available-ranges` endpoint.
    pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
        let base = http_base_from_ws_url(&self.url);
        let url = format!("{base}/available-ranges");
        let ranges = reqwest::Client::new()
            .get(&url)
            .send()
            .await
            .map_err(|e| BacktestClientError::Http {
                url: url.clone(),
                source: Box::new(e),
            })?
            .error_for_status()
            .map_err(|e| BacktestClientError::Http {
                url: url.clone(),
                source: Box::new(e),
            })?
            .json::<Vec<AvailableRange>>()
            .await
            .map_err(|e| BacktestClientError::Http {
                url: url.clone(),
                source: Box::new(e),
            })?;
        Ok(ranges)
    }

    /// Attach to an existing backtest session over the control websocket.
    pub async fn attach_session(
        &self,
        session_id: impl Into<String>,
        last_sequence: Option<u64>,
    ) -> BacktestClientResult<BacktestSession> {
        let rpc_base_url = http_base_from_ws_url(&self.url);
        let mut session = self.connect().await?;
        session
            .attach(session_id.into(), last_sequence, rpc_base_url)
            .await?;
        Ok(session)
    }
}