Skip to main content

simulator_client/
client.rs

1use std::time::Duration;
2
3use bon::Builder;
4use chrono::{DateTime, Utc};
5use simulator_api::{AvailableRange, BacktestResponse, usage::UsageReport};
6use tokio_tungstenite::{
7    connect_async,
8    tungstenite::{client::IntoClientRequest, http::HeaderValue},
9};
10
11use crate::{
12    BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
13    session::CreateRequestResult, urls::http_base_from_ws_url,
14};
15
16const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
17
18/// Backtest WebSocket client configured with a base URL and API key.
19///
20/// Build with [`BacktestClient::builder`], which supports optional timeouts and
21/// raw message logging.
22#[derive(Debug, Clone, Builder)]
23#[builder(on(String, into))]
24pub struct BacktestClient {
25    /// WebSocket endpoint, e.g. `wss://.../backtest`.
26    url: String,
27    /// API key injected as the `X-API-Key` header.
28    #[builder(default)]
29    api_key: String,
30
31    /// Timeout for the initial connect handshake.
32    #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
33    connect_timeout: Duration,
34
35    /// Default timeout for request/response operations.
36    request_timeout: Option<Duration>,
37
38    /// Log raw inbound responses at debug level.
39    #[builder(default)]
40    log_raw: bool,
41}
42
43impl BacktestClient {
44    async fn connect(&self) -> BacktestClientResult<BacktestSession> {
45        let mut request = self.url.clone().into_client_request().map_err(|source| {
46            BacktestClientError::BuildRequest {
47                url: self.url.clone(),
48                source: Box::new(source),
49            }
50        })?;
51
52        request
53            .headers_mut()
54            .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
55
56        let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
57            .await
58            .map_err(|_| BacktestClientError::Timeout {
59                action: "connecting",
60                duration: self.connect_timeout,
61            })?
62            .map_err(|source| BacktestClientError::Connect {
63                url: self.url.clone(),
64                source: Box::new(source),
65            })?;
66        Ok(BacktestSession::new(
67            stream,
68            self.request_timeout,
69            self.log_raw,
70        ))
71    }
72
73    /// Create a backtest session by connecting and sending a `CreateBacktestSession` request.
74    pub async fn create_session(
75        &self,
76        create: CreateSession,
77    ) -> BacktestClientResult<BacktestSession> {
78        let request = create.into_request()?;
79        let rpc_base_url = http_base_from_ws_url(&self.url);
80        let mut session = self.connect().await?;
81        match session
82            .create_with_request(request, rpc_base_url, None)
83            .await?
84        {
85            CreateRequestResult::Single { .. } => {}
86            CreateRequestResult::Parallel { session_ids, .. } => {
87                return Err(BacktestClientError::UnexpectedResponse {
88                    context: "creating single session",
89                    response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
90                });
91            }
92        }
93        Ok(session)
94    }
95
96    /// Create one or many sessions and return the resulting session IDs.
97    ///
98    /// When `CreateSession.parallel` is true, this may return many IDs.
99    pub async fn create_sessions(
100        &self,
101        create: CreateSession,
102    ) -> BacktestClientResult<Vec<String>> {
103        self.create_sessions_with_progress(create, |_| {}).await
104    }
105
106    /// Create one or many sessions and stream each successfully created session ID.
107    ///
108    /// `on_session_created` is called for each streamed `SessionCreated` response.
109    pub async fn create_sessions_with_progress(
110        &self,
111        create: CreateSession,
112        mut on_session_created: impl FnMut(String) + Send,
113    ) -> BacktestClientResult<Vec<String>> {
114        let request = create.into_request()?;
115        let rpc_base_url = http_base_from_ws_url(&self.url);
116        let mut session = self.connect().await?;
117        match session
118            .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
119            .await?
120        {
121            CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
122            CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
123        }
124    }
125
126    /// Like [`Self::create_sessions`], but additionally returns the opaque
127    /// per-session `task_id` reported by the server (if any).
128    pub async fn create_sessions_with_task_ids(
129        &self,
130        create: CreateSession,
131    ) -> BacktestClientResult<Vec<(String, Option<String>)>> {
132        let request = create.into_request()?;
133        let rpc_base_url = http_base_from_ws_url(&self.url);
134        let mut session = self.connect().await?;
135        match session
136            .create_with_request(request, rpc_base_url, None)
137            .await?
138        {
139            CreateRequestResult::Single {
140                session_id,
141                task_id,
142            } => Ok(vec![(session_id, task_id)]),
143            CreateRequestResult::Parallel {
144                session_ids,
145                task_ids,
146            } => Ok(session_ids.into_iter().zip(task_ids).collect()),
147        }
148    }
149
150    /// Fetch the available slot ranges from the server's `/available-ranges` endpoint.
151    pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
152        let base = http_base_from_ws_url(&self.url);
153        let url = format!("{base}/available-ranges");
154        let ranges = reqwest::Client::new()
155            .get(&url)
156            .send()
157            .await
158            .map_err(|e| BacktestClientError::Http {
159                url: url.clone(),
160                source: Box::new(e),
161            })?
162            .error_for_status()
163            .map_err(|e| BacktestClientError::Http {
164                url: url.clone(),
165                source: Box::new(e),
166            })?
167            .json::<Vec<AvailableRange>>()
168            .await
169            .map_err(|e| BacktestClientError::Http {
170                url: url.clone(),
171                source: Box::new(e),
172            })?;
173        Ok(ranges)
174    }
175
176    /// Fetch per-API-key usage metrics from the server's `/usage` endpoint.
177    ///
178    /// `since` and `until` narrow the reporting window; both default to the
179    /// server's configured window when omitted.
180    pub async fn usage(
181        &self,
182        since: Option<DateTime<Utc>>,
183        until: Option<DateTime<Utc>>,
184    ) -> BacktestClientResult<UsageReport> {
185        let base = http_base_from_ws_url(&self.url);
186        let url = format!("{base}/usage");
187        let mut req = reqwest::Client::new()
188            .get(&url)
189            .header("X-API-Key", &self.api_key);
190        if let Some(t) = since {
191            req = req.query(&[(
192                "since",
193                t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
194            )]);
195        }
196        if let Some(t) = until {
197            req = req.query(&[(
198                "until",
199                t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
200            )]);
201        }
202        let resp = req.send().await.map_err(|e| BacktestClientError::Http {
203            url: url.clone(),
204            source: Box::new(e),
205        })?;
206        let status = resp.status();
207        if !status.is_success() {
208            let body = resp.text().await.unwrap_or_default();
209            return Err(BacktestClientError::HttpStatus { url, status, body });
210        }
211        resp.json::<UsageReport>()
212            .await
213            .map_err(|e| BacktestClientError::Http {
214                url: url.clone(),
215                source: Box::new(e),
216            })
217    }
218
219    /// Attach to an existing backtest session over the control websocket.
220    pub async fn attach_session(
221        &self,
222        session_id: impl Into<String>,
223        last_sequence: Option<u64>,
224    ) -> BacktestClientResult<BacktestSession> {
225        let rpc_base_url = http_base_from_ws_url(&self.url);
226        let mut session = self.connect().await?;
227        session
228            .attach(session_id.into(), last_sequence, rpc_base_url)
229            .await?;
230        Ok(session)
231    }
232}