use std::time::Duration;
use anyhow::{Context, anyhow};
use crate::{
Result,
protocol::{ApiEndpoint, ApiGeneralResponse},
retry::{RetryOptions, RetryState, is_retryable_request_error},
transport::{MilkyTransport, MilkyTransportKind, spawn_sse_transport, spawn_websocket_transport},
url::{build_api_url, build_event_url, websocket_request},
};
#[derive(Debug, Clone, PartialEq)]
pub struct MilkyClientConfig {
pub event_reconnect: RetryOptions,
pub request_retry: RetryOptions,
}
impl Default for MilkyClientConfig {
fn default() -> Self {
Self {
event_reconnect: RetryOptions::Exponential {
initial_delay: Duration::from_millis(300),
factor: 2.0,
max_delay: Some(Duration::from_secs(5)),
max_retries: None,
},
request_retry: RetryOptions::Disabled,
}
}
}
#[derive(Debug, Clone)]
pub struct MilkyClient {
base_url: reqwest::Url,
client: reqwest::Client,
authorization: Option<reqwest::header::HeaderValue>,
config: MilkyClientConfig,
}
impl MilkyClient {
pub fn new(base_url: &str, token: Option<&str>) -> Result<Self> {
Self::with_config(base_url, token, MilkyClientConfig::default())
}
pub fn with_config(
base_url: &str,
token: Option<&str>,
config: MilkyClientConfig,
) -> Result<Self> {
let base_url = reqwest::Url::parse(base_url).context("Failed to parse base URL")?;
let mut client = reqwest::Client::builder();
let authorization = token
.map(|token| reqwest::header::HeaderValue::from_str(&format!("Bearer {token}")))
.transpose()
.context("Failed to parse authorization header")?;
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::CONTENT_TYPE,
reqwest::header::HeaderValue::from_str("application/json")
.context("Failed to parse content type header")?,
);
if let Some(authorization) = authorization.as_ref() {
headers.insert(reqwest::header::AUTHORIZATION, authorization.clone());
}
client = client.default_headers(headers);
let client = client.build().context("Failed to build client")?;
Ok(Self {
base_url,
client,
authorization,
config,
})
}
pub async fn request<T>(&self, input: T) -> Result<T::Output>
where
T: ApiEndpoint,
{
self.send_request_with_retry(input, None).await
}
pub async fn send_request_with_retry<T>(
&self,
input: T,
retry: Option<RetryOptions>,
) -> Result<T::Output>
where
T: ApiEndpoint,
{
let url = build_api_url::<T>(&self.base_url)?;
let response = self
.send_request_response_with_retry(
url,
&input,
retry.unwrap_or_else(|| self.config.request_retry.clone()),
)
.await?;
let output = response.json::<ApiGeneralResponse<T::Output>>().await?;
if output.retcode != 0 {
Err(anyhow!(
"milky error: code={}, message={:?}",
output.retcode,
output.message
))
} else {
output.data.ok_or(anyhow!("milky error: data not found"))
}
}
pub async fn event_sse(&self) -> Result<MilkyTransport> {
let url = build_event_url(&self.base_url)?;
Ok(spawn_sse_transport(
self.client.clone(),
url,
self.config.event_reconnect.clone(),
))
}
pub async fn event_ws(&self) -> Result<MilkyTransport> {
let event_url = build_event_url(&self.base_url)?;
let request = websocket_request(&event_url, self.authorization.as_ref())?;
Ok(spawn_websocket_transport(
request,
self.config.event_reconnect.clone(),
))
}
pub async fn event(&self, kind: MilkyTransportKind) -> Result<MilkyTransport> {
match kind {
MilkyTransportKind::WebSocket => self.event_ws().await,
MilkyTransportKind::Sse => self.event_sse().await,
}
}
async fn send_request_response_with_retry<T>(
&self,
url: reqwest::Url,
input: &T,
retry: RetryOptions,
) -> Result<reqwest::Response>
where
T: ApiEndpoint,
{
let mut retry_state = RetryState::new(retry);
loop {
match self.client.post(url.clone()).json(input).send().await {
Ok(response) => return Ok(response),
Err(error) if is_retryable_request_error(&error) => {
let Some(decision) = retry_state.next() else {
return Err(error.into());
};
tokio::time::sleep(decision.delay).await;
}
Err(error) => return Err(error.into()),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_client_config_matches_transport_defaults() {
let config = MilkyClientConfig::default();
assert_eq!(
config.event_reconnect,
RetryOptions::Exponential {
initial_delay: Duration::from_millis(300),
factor: 2.0,
max_delay: Some(Duration::from_secs(5)),
max_retries: None,
}
);
assert_eq!(config.request_retry, RetryOptions::Disabled);
}
}