1use std::time::Duration;
2
3use bon::Builder;
4use chrono::{DateTime, Utc};
5use simulator_api::{AvailableRange, BacktestResponse, usage::UsageReport};
6use tokio_tungstenite::{
7 connect_async,
8 tungstenite::{client::IntoClientRequest, http::HeaderValue},
9};
10
11use crate::{
12 BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
13 session::CreateRequestResult, urls::http_base_from_ws_url,
14};
15
16const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
17
18#[derive(Debug, Clone, Builder)]
23#[builder(on(String, into))]
24pub struct BacktestClient {
25 url: String,
27 #[builder(default)]
29 api_key: String,
30
31 #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
33 connect_timeout: Duration,
34
35 request_timeout: Option<Duration>,
37
38 #[builder(default)]
40 log_raw: bool,
41}
42
43impl BacktestClient {
44 async fn connect(&self) -> BacktestClientResult<BacktestSession> {
45 let mut request = self.url.clone().into_client_request().map_err(|source| {
46 BacktestClientError::BuildRequest {
47 url: self.url.clone(),
48 source: Box::new(source),
49 }
50 })?;
51
52 request
53 .headers_mut()
54 .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
55
56 let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
57 .await
58 .map_err(|_| BacktestClientError::Timeout {
59 action: "connecting",
60 duration: self.connect_timeout,
61 })?
62 .map_err(|source| BacktestClientError::Connect {
63 url: self.url.clone(),
64 source: Box::new(source),
65 })?;
66 Ok(BacktestSession::new(
67 stream,
68 self.request_timeout,
69 self.log_raw,
70 ))
71 }
72
73 pub async fn create_session(
75 &self,
76 create: CreateSession,
77 ) -> BacktestClientResult<BacktestSession> {
78 let request = create.into_request()?;
79 let rpc_base_url = http_base_from_ws_url(&self.url);
80 let mut session = self.connect().await?;
81 match session
82 .create_with_request(request, rpc_base_url, None)
83 .await?
84 {
85 CreateRequestResult::Single { .. } => {}
86 CreateRequestResult::Parallel { session_ids, .. } => {
87 return Err(BacktestClientError::UnexpectedResponse {
88 context: "creating single session",
89 response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
90 });
91 }
92 }
93 Ok(session)
94 }
95
96 pub async fn create_sessions(
100 &self,
101 create: CreateSession,
102 ) -> BacktestClientResult<Vec<String>> {
103 self.create_sessions_with_progress(create, |_| {}).await
104 }
105
106 pub async fn create_sessions_with_progress(
110 &self,
111 create: CreateSession,
112 mut on_session_created: impl FnMut(String) + Send,
113 ) -> BacktestClientResult<Vec<String>> {
114 let request = create.into_request()?;
115 let rpc_base_url = http_base_from_ws_url(&self.url);
116 let mut session = self.connect().await?;
117 match session
118 .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
119 .await?
120 {
121 CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
122 CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
123 }
124 }
125
126 pub async fn create_sessions_with_task_ids(
129 &self,
130 create: CreateSession,
131 ) -> BacktestClientResult<Vec<(String, Option<String>)>> {
132 let request = create.into_request()?;
133 let rpc_base_url = http_base_from_ws_url(&self.url);
134 let mut session = self.connect().await?;
135 match session
136 .create_with_request(request, rpc_base_url, None)
137 .await?
138 {
139 CreateRequestResult::Single {
140 session_id,
141 task_id,
142 } => Ok(vec![(session_id, task_id)]),
143 CreateRequestResult::Parallel {
144 session_ids,
145 task_ids,
146 } => Ok(session_ids.into_iter().zip(task_ids).collect()),
147 }
148 }
149
150 pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
152 let base = http_base_from_ws_url(&self.url);
153 let url = format!("{base}/available-ranges");
154 let ranges = reqwest::Client::new()
155 .get(&url)
156 .send()
157 .await
158 .map_err(|e| BacktestClientError::Http {
159 url: url.clone(),
160 source: Box::new(e),
161 })?
162 .error_for_status()
163 .map_err(|e| BacktestClientError::Http {
164 url: url.clone(),
165 source: Box::new(e),
166 })?
167 .json::<Vec<AvailableRange>>()
168 .await
169 .map_err(|e| BacktestClientError::Http {
170 url: url.clone(),
171 source: Box::new(e),
172 })?;
173 Ok(ranges)
174 }
175
176 pub async fn usage(
181 &self,
182 since: Option<DateTime<Utc>>,
183 until: Option<DateTime<Utc>>,
184 ) -> BacktestClientResult<UsageReport> {
185 let base = http_base_from_ws_url(&self.url);
186 let url = format!("{base}/usage");
187 let mut req = reqwest::Client::new()
188 .get(&url)
189 .header("X-API-Key", &self.api_key);
190 if let Some(t) = since {
191 req = req.query(&[(
192 "since",
193 t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
194 )]);
195 }
196 if let Some(t) = until {
197 req = req.query(&[(
198 "until",
199 t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
200 )]);
201 }
202 let resp = req.send().await.map_err(|e| BacktestClientError::Http {
203 url: url.clone(),
204 source: Box::new(e),
205 })?;
206 let status = resp.status();
207 if !status.is_success() {
208 let body = resp.text().await.unwrap_or_default();
209 return Err(BacktestClientError::HttpStatus { url, status, body });
210 }
211 resp.json::<UsageReport>()
212 .await
213 .map_err(|e| BacktestClientError::Http {
214 url: url.clone(),
215 source: Box::new(e),
216 })
217 }
218
219 pub async fn attach_session(
221 &self,
222 session_id: impl Into<String>,
223 last_sequence: Option<u64>,
224 ) -> BacktestClientResult<BacktestSession> {
225 let rpc_base_url = http_base_from_ws_url(&self.url);
226 let mut session = self.connect().await?;
227 session
228 .attach(session_id.into(), last_sequence, rpc_base_url)
229 .await?;
230 Ok(session)
231 }
232}