use std::time::Duration;
use bon::Builder;
use chrono::{DateTime, Utc};
use simulator_api::{AvailableRange, BacktestResponse, usage::UsageReport};
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, http::HeaderValue},
};
use crate::{
BacktestClientError, BacktestClientResult, BacktestSession, CreateSession,
session::CreateRequestResult, urls::http_base_from_ws_url,
};
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Clone, Builder)]
#[builder(on(String, into))]
pub struct BacktestClient {
url: String,
#[builder(default)]
api_key: String,
#[builder(default = DEFAULT_CONNECT_TIMEOUT)]
connect_timeout: Duration,
request_timeout: Option<Duration>,
#[builder(default)]
log_raw: bool,
}
impl BacktestClient {
async fn connect(&self) -> BacktestClientResult<BacktestSession> {
let mut request = self.url.clone().into_client_request().map_err(|source| {
BacktestClientError::BuildRequest {
url: self.url.clone(),
source: Box::new(source),
}
})?;
request
.headers_mut()
.insert("X-API-Key", HeaderValue::from_str(&self.api_key)?);
let (stream, _) = tokio::time::timeout(self.connect_timeout, connect_async(request))
.await
.map_err(|_| BacktestClientError::Timeout {
action: "connecting",
duration: self.connect_timeout,
})?
.map_err(|source| BacktestClientError::Connect {
url: self.url.clone(),
source: Box::new(source),
})?;
Ok(BacktestSession::new(
stream,
self.request_timeout,
self.log_raw,
))
}
pub async fn create_session(
&self,
create: CreateSession,
) -> BacktestClientResult<BacktestSession> {
let request = create.into_request()?;
let rpc_base_url = http_base_from_ws_url(&self.url);
let mut session = self.connect().await?;
match session
.create_with_request(request, rpc_base_url, None)
.await?
{
CreateRequestResult::Single { .. } => {}
CreateRequestResult::Parallel { session_ids, .. } => {
return Err(BacktestClientError::UnexpectedResponse {
context: "creating single session",
response: Box::new(BacktestResponse::SessionsCreated { session_ids }),
});
}
}
Ok(session)
}
pub async fn create_sessions(
&self,
create: CreateSession,
) -> BacktestClientResult<Vec<String>> {
self.create_sessions_with_progress(create, |_| {}).await
}
pub async fn create_sessions_with_progress(
&self,
create: CreateSession,
mut on_session_created: impl FnMut(String) + Send,
) -> BacktestClientResult<Vec<String>> {
let request = create.into_request()?;
let rpc_base_url = http_base_from_ws_url(&self.url);
let mut session = self.connect().await?;
match session
.create_with_request(request, rpc_base_url, Some(&mut on_session_created))
.await?
{
CreateRequestResult::Single { session_id, .. } => Ok(vec![session_id]),
CreateRequestResult::Parallel { session_ids, .. } => Ok(session_ids),
}
}
pub async fn create_sessions_with_task_ids(
&self,
create: CreateSession,
) -> BacktestClientResult<Vec<(String, Option<String>)>> {
let request = create.into_request()?;
let rpc_base_url = http_base_from_ws_url(&self.url);
let mut session = self.connect().await?;
match session
.create_with_request(request, rpc_base_url, None)
.await?
{
CreateRequestResult::Single {
session_id,
task_id,
} => Ok(vec![(session_id, task_id)]),
CreateRequestResult::Parallel {
session_ids,
task_ids,
} => Ok(session_ids.into_iter().zip(task_ids).collect()),
}
}
pub async fn available_ranges(&self) -> BacktestClientResult<Vec<AvailableRange>> {
let base = http_base_from_ws_url(&self.url);
let url = format!("{base}/available-ranges");
let ranges = reqwest::Client::new()
.get(&url)
.send()
.await
.map_err(|e| BacktestClientError::Http {
url: url.clone(),
source: Box::new(e),
})?
.error_for_status()
.map_err(|e| BacktestClientError::Http {
url: url.clone(),
source: Box::new(e),
})?
.json::<Vec<AvailableRange>>()
.await
.map_err(|e| BacktestClientError::Http {
url: url.clone(),
source: Box::new(e),
})?;
Ok(ranges)
}
pub async fn usage(
&self,
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
) -> BacktestClientResult<UsageReport> {
let base = http_base_from_ws_url(&self.url);
let url = format!("{base}/usage");
let mut req = reqwest::Client::new()
.get(&url)
.header("X-API-Key", &self.api_key);
if let Some(t) = since {
req = req.query(&[(
"since",
t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
)]);
}
if let Some(t) = until {
req = req.query(&[(
"until",
t.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
)]);
}
let resp = req.send().await.map_err(|e| BacktestClientError::Http {
url: url.clone(),
source: Box::new(e),
})?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
return Err(BacktestClientError::HttpStatus { url, status, body });
}
resp.json::<UsageReport>()
.await
.map_err(|e| BacktestClientError::Http {
url: url.clone(),
source: Box::new(e),
})
}
pub async fn attach_session(
&self,
session_id: impl Into<String>,
last_sequence: Option<u64>,
) -> BacktestClientResult<BacktestSession> {
let rpc_base_url = http_base_from_ws_url(&self.url);
let mut session = self.connect().await?;
session
.attach(session_id.into(), last_sequence, rpc_base_url)
.await?;
Ok(session)
}
}