Skip to main content

simulator_client/
client.rs

1use std::time::Duration;
2
3use bon::Builder;
4use simulator_api::{AvailableRange, BacktestResponse};
5use tokio_tungstenite::{
6    connect_async,
7    tungstenite::{client::IntoClientRequest, http::HeaderValue},
8};
9
10use crate::{
11    BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
12    session::CreateRequestResult, urls::http_base_from_ws_url,
13};
14
15const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
16
17/// Backtest WebSocket client configured with a base URL and API key.
18///
19/// Build with [`BacktestClient::builder`], which supports optional timeouts and
20/// raw message logging.
21#[derive(Debug, Clone, Builder)]
22#[builder(on(String, into))]
23pub struct BacktestClient {
24    /// WebSocket endpoint, e.g. `wss://.../backtest`.
25    url: String,
26    /// API key injected as the `X-API-Key` header.
27    #[builder(default)]
28    api_key: String,
29
30    /// Timeout for the initial connect handshake.
31    #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
32    connect_timeout: Duration,
33
34    /// Default timeout for request/response operations.
35    request_timeout: Option<Duration>,
36
37    /// Log raw inbound responses at debug level.
38    #[builder(default)]
39    log_raw: bool,
40}
41
42impl BacktestClient {
43    async fn connect(&self) -> BacktestClientResult<BacktestSession> {
44        let mut request = self.url.clone().into_client_request().map_err(|source| {
45            BacktestClientError::BuildRequest {
46                url: self.url.clone(),
47                source: Box::new(source),
48            }
49        })?;
50
51        request
52            .headers_mut()
53            .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
54
55        let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
56            .await
57            .map_err(|_| BacktestClientError::Timeout {
58                action: "connecting",
59                duration: self.connect_timeout,
60            })?
61            .map_err(|source| BacktestClientError::Connect {
62                url: self.url.clone(),
63                source: Box::new(source),
64            })?;
65        Ok(BacktestSession::new(
66            stream,
67            self.request_timeout,
68            self.log_raw,
69        ))
70    }
71
72    /// Create a backtest session by connecting and sending a `CreateBacktestSession` request.
73    pub async fn create_session(
74        &self,
75        create: CreateSession,
76    ) -> BacktestClientResult<BacktestSession> {
77        let request = create.into_request()?;
78        let rpc_base_url = http_base_from_ws_url(&self.url);
79        let mut session = self.connect().await?;
80        match session
81            .create_with_request(request, rpc_base_url, None)
82            .await?
83        {
84            CreateRequestResult::Single { .. } => {}
85            CreateRequestResult::Parallel { session_ids, .. } => {
86                return Err(BacktestClientError::UnexpectedResponse {
87                    context: "creating single session",
88                    response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
89                });
90            }
91        }
92        Ok(session)
93    }
94
95    /// Create one or many sessions and return the resulting session IDs.
96    ///
97    /// When `CreateSession.parallel` is true, this may return many IDs.
98    pub async fn create_sessions(
99        &self,
100        create: CreateSession,
101    ) -> BacktestClientResult<Vec<String>> {
102        self.create_sessions_with_progress(create, |_| {}).await
103    }
104
105    /// Create one or many sessions and stream each successfully created session ID.
106    ///
107    /// `on_session_created` is called for each streamed `SessionCreated` response.
108    pub async fn create_sessions_with_progress(
109        &self,
110        create: CreateSession,
111        mut on_session_created: impl FnMut(String) + Send,
112    ) -> BacktestClientResult<Vec<String>> {
113        let request = create.into_request()?;
114        let rpc_base_url = http_base_from_ws_url(&self.url);
115        let mut session = self.connect().await?;
116        match session
117            .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
118            .await?
119        {
120            CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
121            CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
122        }
123    }
124
125    /// Like [`Self::create_sessions`], but additionally returns the opaque
126    /// per-session `task_id` reported by the server (if any).
127    pub async fn create_sessions_with_task_ids(
128        &self,
129        create: CreateSession,
130    ) -> BacktestClientResult<Vec<(String, Option<String>)>> {
131        let request = create.into_request()?;
132        let rpc_base_url = http_base_from_ws_url(&self.url);
133        let mut session = self.connect().await?;
134        match session
135            .create_with_request(request, rpc_base_url, None)
136            .await?
137        {
138            CreateRequestResult::Single {
139                session_id,
140                task_id,
141            } => Ok(vec![(session_id, task_id)]),
142            CreateRequestResult::Parallel {
143                session_ids,
144                task_ids,
145            } => Ok(session_ids.into_iter().zip(task_ids).collect()),
146        }
147    }
148
149    /// Fetch the available slot ranges from the server's `/available-ranges` endpoint.
150    pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
151        let base = http_base_from_ws_url(&self.url);
152        let url = format!("{base}/available-ranges");
153        let ranges = reqwest::Client::new()
154            .get(&url)
155            .send()
156            .await
157            .map_err(|e| BacktestClientError::Http {
158                url: url.clone(),
159                source: Box::new(e),
160            })?
161            .error_for_status()
162            .map_err(|e| BacktestClientError::Http {
163                url: url.clone(),
164                source: Box::new(e),
165            })?
166            .json::<Vec<AvailableRange>>()
167            .await
168            .map_err(|e| BacktestClientError::Http {
169                url: url.clone(),
170                source: Box::new(e),
171            })?;
172        Ok(ranges)
173    }
174
175    /// Attach to an existing backtest session over the control websocket.
176    pub async fn attach_session(
177        &self,
178        session_id: impl Into<String>,
179        last_sequence: Option<u64>,
180    ) -> BacktestClientResult<BacktestSession> {
181        let rpc_base_url = http_base_from_ws_url(&self.url);
182        let mut session = self.connect().await?;
183        session
184            .attach(session_id.into(), last_sequence, rpc_base_url)
185            .await?;
186        Ok(session)
187    }
188}