Skip to main content

simulator_client/
client.rs

1use std::time::Duration;
2
3use bon::Builder;
4use chrono::{DateTime, Utc};
5use simulator_api::{AvailableRange, BacktestResponse, usage::UsageReport};
6use tokio_tungstenite::{
7    connect_async,
8    tungstenite::{client::IntoClientRequest, http::HeaderValue},
9};
10
11use crate::{
12    BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
13    session::CreateRequestResult,
14    urls::{backtest_ws_url, http_base_from_ws_url},
15};
16
17const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
18
19/// Backtest WebSocket client configured with a base URL and API key.
20///
21/// Build with [`BacktestClient::builder`], which supports optional timeouts and
22/// raw message logging.
23#[derive(Debug, Clone, Builder)]
24#[builder(on(String, into))]
25pub struct BacktestClient {
26    /// WebSocket endpoint. Accepts a bare hostname (normalized to
27    /// `wss://{host}/backtest`) or an explicit `ws(s)://` URL, whose
28    /// `/backtest` path is appended only if missing.
29    #[builder(with = |url: impl Into<String>| backtest_ws_url(&url.into()))]
30    url: String,
31    /// API key injected as the `X-API-Key` header.
32    #[builder(default)]
33    api_key: String,
34
35    /// Timeout for the initial connect handshake.
36    #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
37    connect_timeout: Duration,
38
39    /// Default timeout for request/response operations.
40    request_timeout: Option<Duration>,
41
42    /// Log raw inbound responses at debug level.
43    #[builder(default)]
44    log_raw: bool,
45}
46
47impl BacktestClient {
48    async fn connect(&self) -> BacktestClientResult<BacktestSession> {
49        let mut request = self.url.clone().into_client_request().map_err(|source| {
50            BacktestClientError::BuildRequest {
51                url: self.url.clone(),
52                source: Box::new(source),
53            }
54        })?;
55
56        request
57            .headers_mut()
58            .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
59
60        let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
61            .await
62            .map_err(|_| BacktestClientError::Timeout {
63                action: "connecting",
64                duration: self.connect_timeout,
65            })?
66            .map_err(|source| BacktestClientError::Connect {
67                url: self.url.clone(),
68                source: Box::new(source),
69            })?;
70        Ok(BacktestSession::new(
71            stream,
72            self.request_timeout,
73            self.log_raw,
74        ))
75    }
76
77    /// Create a backtest session by connecting and sending a `CreateBacktestSession` request.
78    pub async fn create_session(
79        &self,
80        create: CreateSession,
81    ) -> BacktestClientResult<BacktestSession> {
82        let request = create.into_request()?;
83        let rpc_base_url = http_base_from_ws_url(&self.url);
84        let mut session = self.connect().await?;
85        match session
86            .create_with_request(request, rpc_base_url, None)
87            .await?
88        {
89            CreateRequestResult::Single { .. } => {}
90            CreateRequestResult::Parallel { session_ids, .. } => {
91                return Err(BacktestClientError::UnexpectedResponse {
92                    context: "creating single session",
93                    response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
94                });
95            }
96        }
97        Ok(session)
98    }
99
100    /// Create one or many sessions and return the resulting session IDs.
101    ///
102    /// When `CreateSession.parallel` is true, this may return many IDs.
103    pub async fn create_sessions(
104        &self,
105        create: CreateSession,
106    ) -> BacktestClientResult<Vec<String>> {
107        self.create_sessions_with_progress(create, |_| {}).await
108    }
109
110    /// Create one or many sessions and stream each successfully created session ID.
111    ///
112    /// `on_session_created` is called for each streamed `SessionCreated` response.
113    pub async fn create_sessions_with_progress(
114        &self,
115        create: CreateSession,
116        mut on_session_created: impl FnMut(String) + Send,
117    ) -> BacktestClientResult<Vec<String>> {
118        let request = create.into_request()?;
119        let rpc_base_url = http_base_from_ws_url(&self.url);
120        let mut session = self.connect().await?;
121        match session
122            .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
123            .await?
124        {
125            CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
126            CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
127        }
128    }
129
130    /// Like [`Self::create_sessions`], but additionally returns the opaque
131    /// per-session `task_id` reported by the server (if any).
132    pub async fn create_sessions_with_task_ids(
133        &self,
134        create: CreateSession,
135    ) -> BacktestClientResult<Vec<(String, Option<String>)>> {
136        let request = create.into_request()?;
137        let rpc_base_url = http_base_from_ws_url(&self.url);
138        let mut session = self.connect().await?;
139        match session
140            .create_with_request(request, rpc_base_url, None)
141            .await?
142        {
143            CreateRequestResult::Single {
144                session_id,
145                task_id,
146            } => Ok(vec![(session_id, task_id)]),
147            CreateRequestResult::Parallel {
148                session_ids,
149                task_ids,
150            } => Ok(session_ids.into_iter().zip(task_ids).collect()),
151        }
152    }
153
154    /// Fetch the available slot ranges from the server's `/available-ranges` endpoint.
155    pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
156        let base = http_base_from_ws_url(&self.url);
157        let url = format!("{base}/available-ranges");
158        let ranges = reqwest::Client::new()
159            .get(&url)
160            .send()
161            .await
162            .map_err(|e| BacktestClientError::Http {
163                url: url.clone(),
164                source: Box::new(e),
165            })?
166            .error_for_status()
167            .map_err(|e| BacktestClientError::Http {
168                url: url.clone(),
169                source: Box::new(e),
170            })?
171            .json::<Vec<AvailableRange>>()
172            .await
173            .map_err(|e| BacktestClientError::Http {
174                url: url.clone(),
175                source: Box::new(e),
176            })?;
177        Ok(ranges)
178    }
179
180    /// Fetch per-API-key usage metrics from the server's `/usage` endpoint.
181    ///
182    /// `since` and `until` narrow the reporting window; both default to the
183    /// server's configured window when omitted.
184    pub async fn usage(
185        &self,
186        since: Option<DateTime<Utc>>,
187        until: Option<DateTime<Utc>>,
188    ) -> BacktestClientResult<UsageReport> {
189        let base = http_base_from_ws_url(&self.url);
190        let url = format!("{base}/usage");
191        let mut req = reqwest::Client::new()
192            .get(&url)
193            .header("X-API-Key", &self.api_key);
194        if let Some(t) = since {
195            req = req.query(&[(
196                "since",
197                t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
198            )]);
199        }
200        if let Some(t) = until {
201            req = req.query(&[(
202                "until",
203                t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
204            )]);
205        }
206        let resp = req.send().await.map_err(|e| BacktestClientError::Http {
207            url: url.clone(),
208            source: Box::new(e),
209        })?;
210        let status = resp.status();
211        if !status.is_success() {
212            let body = resp.text().await.unwrap_or_default();
213            return Err(BacktestClientError::HttpStatus { url, status, body });
214        }
215        resp.json::<UsageReport>()
216            .await
217            .map_err(|e| BacktestClientError::Http {
218                url: url.clone(),
219                source: Box::new(e),
220            })
221    }
222
223    /// Attach to an existing backtest session over the control websocket.
224    pub async fn attach_session(
225        &self,
226        session_id: impl Into<String>,
227        last_sequence: Option<u64>,
228    ) -> BacktestClientResult<BacktestSession> {
229        let rpc_base_url = http_base_from_ws_url(&self.url);
230        let mut session = self.connect().await?;
231        session
232            .attach(session_id.into(), last_sequence, rpc_base_url)
233            .await?;
234        Ok(session)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241
242    #[test]
243    fn builder_normalizes_bare_hosts_and_keeps_full_urls() {
244        let from_host = BacktestClient::builder()
245            .url("simulator.termina.technology")
246            .build();
247        assert_eq!(from_host.url, "wss://simulator.termina.technology/backtest");
248
249        let from_full_url = BacktestClient::builder()
250            .url("ws://localhost:8900/backtest".to_string())
251            .build();
252        assert_eq!(from_full_url.url, "ws://localhost:8900/backtest");
253    }
254}