simulator_client/
client.rs1use std::time::Duration;
2
3use bon::Builder;
4use simulator_api::{AvailableRange, BacktestResponse};
5use tokio_tungstenite::{
6 connect_async,
7 tungstenite::{client::IntoClientRequest, http::HeaderValue},
8};
9
10use crate::{
11 BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
12 session::CreateRequestResult, urls::http_base_from_ws_url,
13};
14
15const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
16
17#[derive(Debug, Clone, Builder)]
22#[builder(on(String, into))]
23pub struct BacktestClient {
24 url: String,
26 #[builder(default)]
28 api_key: String,
29
30 #[builder(default = DEFAULT_CONNECT_TIMEOUT)]
32 connect_timeout: Duration,
33
34 request_timeout: Option<Duration>,
36
37 #[builder(default)]
39 log_raw: bool,
40}
41
42impl BacktestClient {
43 async fn connect(&self) -> BacktestClientResult<BacktestSession> {
44 let mut request = self.url.clone().into_client_request().map_err(|source| {
45 BacktestClientError::BuildRequest {
46 url: self.url.clone(),
47 source: Box::new(source),
48 }
49 })?;
50
51 request
52 .headers_mut()
53 .insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
54
55 let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
56 .await
57 .map_err(|_| BacktestClientError::Timeout {
58 action: "connecting",
59 duration: self.connect_timeout,
60 })?
61 .map_err(|source| BacktestClientError::Connect {
62 url: self.url.clone(),
63 source: Box::new(source),
64 })?;
65 Ok(BacktestSession::new(
66 stream,
67 self.request_timeout,
68 self.log_raw,
69 ))
70 }
71
72 pub async fn create_session(
74 &self,
75 create: CreateSession,
76 ) -> BacktestClientResult<BacktestSession> {
77 let request = create.into_request()?;
78 let rpc_base_url = http_base_from_ws_url(&self.url);
79 let mut session = self.connect().await?;
80 match session
81 .create_with_request(request, rpc_base_url, None)
82 .await?
83 {
84 CreateRequestResult::Single { .. } => {}
85 CreateRequestResult::Parallel { session_ids, .. } => {
86 return Err(BacktestClientError::UnexpectedResponse {
87 context: "creating single session",
88 response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
89 });
90 }
91 }
92 Ok(session)
93 }
94
95 pub async fn create_sessions(
99 &self,
100 create: CreateSession,
101 ) -> BacktestClientResult<Vec<String>> {
102 self.create_sessions_with_progress(create, |_| {}).await
103 }
104
105 pub async fn create_sessions_with_progress(
109 &self,
110 create: CreateSession,
111 mut on_session_created: impl FnMut(String) + Send,
112 ) -> BacktestClientResult<Vec<String>> {
113 let request = create.into_request()?;
114 let rpc_base_url = http_base_from_ws_url(&self.url);
115 let mut session = self.connect().await?;
116 match session
117 .create_with_request(request, rpc_base_url, Some(&mut on_session_created))
118 .await?
119 {
120 CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
121 CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
122 }
123 }
124
125 pub async fn create_sessions_with_task_ids(
128 &self,
129 create: CreateSession,
130 ) -> BacktestClientResult<Vec<(String, Option<String>)>> {
131 let request = create.into_request()?;
132 let rpc_base_url = http_base_from_ws_url(&self.url);
133 let mut session = self.connect().await?;
134 match session
135 .create_with_request(request, rpc_base_url, None)
136 .await?
137 {
138 CreateRequestResult::Single {
139 session_id,
140 task_id,
141 } => Ok(vec![(session_id, task_id)]),
142 CreateRequestResult::Parallel {
143 session_ids,
144 task_ids,
145 } => Ok(session_ids.into_iter().zip(task_ids).collect()),
146 }
147 }
148
149 pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
151 let base = http_base_from_ws_url(&self.url);
152 let url = format!("{base}/available-ranges");
153 let ranges = reqwest::Client::new()
154 .get(&url)
155 .send()
156 .await
157 .map_err(|e| BacktestClientError::Http {
158 url: url.clone(),
159 source: Box::new(e),
160 })?
161 .error_for_status()
162 .map_err(|e| BacktestClientError::Http {
163 url: url.clone(),
164 source: Box::new(e),
165 })?
166 .json::<Vec<AvailableRange>>()
167 .await
168 .map_err(|e| BacktestClientError::Http {
169 url: url.clone(),
170 source: Box::new(e),
171 })?;
172 Ok(ranges)
173 }
174
175 pub async fn attach_session(
177 &self,
178 session_id: impl Into<String>,
179 last_sequence: Option<u64>,
180 ) -> BacktestClientResult<BacktestSession> {
181 let rpc_base_url = http_base_from_ws_url(&self.url);
182 let mut session = self.connect().await?;
183 session
184 .attach(session_id.into(), last_sequence, rpc_base_url)
185 .await?;
186 Ok(session)
187 }
188}