use reqwest::{Client, Response, StatusCode};
use serde::Serialize;
use std::collections::HashMap;
use std::time::Duration;
use crate::error::{ErrorBody, FlowError, FlowResult};
const MAX_RETRIES: u32 = 3;
const VERSION: &str = "0.3.2";
pub(crate) struct HttpClient {
base_url: String,
api_key: String,
api_secret: Option<String>,
client: Client,
}
impl HttpClient {
pub fn new(api_key: &str, base_url: &str, timeout: Duration) -> Self {
Self {
base_url: base_url.to_string(),
api_key: api_key.to_string(),
api_secret: None,
client: Client::builder()
.timeout(timeout)
.build()
.expect("failed to build HTTP client"),
}
}
pub fn with_secret(mut self, secret: &str) -> Self {
self.api_secret = Some(secret.to_string());
self
}
pub async fn get(
&self,
path: &str,
params: Option<&HashMap<String, String>>,
) -> FlowResult<Vec<u8>> {
let url = format!("{}{}", self.base_url, path);
let mut last_err = None;
for attempt in 0..MAX_RETRIES {
let mut req = self
.client
.get(&url)
.header("X-API-Key", &self.api_key)
.header("User-Agent", format!("flow-rust/{}", VERSION));
if let Some(ref s) = self.api_secret {
req = req.header("X-API-Secret", s);
}
if let Some(p) = params {
for (k, v) in p {
if !v.is_empty() {
req = req.query(&[(k.as_str(), v.as_str())]);
}
}
}
match req.send().await {
Ok(resp) => {
return self.handle_response(resp, attempt).await;
}
Err(e) => {
last_err = Some(e);
if attempt < MAX_RETRIES - 1 {
tokio::time::sleep(backoff(attempt)).await;
}
}
}
}
Err(FlowError::Network(last_err.unwrap()))
}
pub async fn post<T: Serialize>(
&self,
path: &str,
body: Option<&T>,
) -> FlowResult<Vec<u8>> {
let url = format!("{}{}", self.base_url, path);
let mut last_err = None;
for attempt in 0..MAX_RETRIES {
let mut req = self
.client
.post(&url)
.header("X-API-Key", &self.api_key)
.header("Content-Type", "application/json")
.header("User-Agent", format!("flow-rust/{}", VERSION));
if let Some(ref s) = self.api_secret {
req = req.header("X-API-Secret", s);
}
if let Some(b) = body {
req = req.json(b);
}
match req.send().await {
Ok(resp) => {
return self.handle_response(resp, attempt).await;
}
Err(e) => {
last_err = Some(e);
if attempt < MAX_RETRIES - 1 {
tokio::time::sleep(backoff(attempt)).await;
}
}
}
}
Err(FlowError::Network(last_err.unwrap()))
}
pub async fn patch<T: Serialize>(
&self,
path: &str,
body: Option<&T>,
) -> FlowResult<Vec<u8>> {
let url = format!("{}{}", self.base_url, path);
let mut req = self
.client
.patch(&url)
.header("X-API-Key", &self.api_key)
.header("Content-Type", "application/json")
.header("User-Agent", format!("flow-rust/{}", VERSION));
if let Some(ref s) = self.api_secret {
req = req.header("X-API-Secret", s);
}
if let Some(b) = body {
req = req.json(b);
}
match req.send().await {
Ok(resp) => self.handle_response(resp, 0).await,
Err(e) => Err(FlowError::Network(e)),
}
}
pub async fn delete(&self, path: &str) -> FlowResult<Vec<u8>> {
let url = format!("{}{}", self.base_url, path);
let mut req = self
.client
.delete(&url)
.header("X-API-Key", &self.api_key)
.header("User-Agent", format!("flow-rust/{}", VERSION));
if let Some(ref s) = self.api_secret {
req = req.header("X-API-Secret", s);
}
match req.send().await {
Ok(resp) => self.handle_response(resp, 0).await,
Err(e) => Err(FlowError::Network(e)),
}
}
async fn handle_response(
&self,
resp: Response,
attempt: u32,
) -> FlowResult<Vec<u8>> {
let status = resp.status();
let retry_after = resp
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
if should_retry(status) && attempt < MAX_RETRIES - 1 {
let wait = if status == StatusCode::TOO_MANY_REQUESTS && retry_after > 0.0 {
Duration::from_secs_f64(retry_after)
} else {
backoff(attempt)
};
tokio::time::sleep(wait).await;
return Err(FlowError::Other("retry".into()));
}
if status == StatusCode::NO_CONTENT {
return Ok(Vec::new());
}
let body = resp.bytes().await.map_err(FlowError::Network)?;
if status.is_client_error() || status.is_server_error() {
return Err(parse_error(status.as_u16(), &body, retry_after));
}
Ok(body.to_vec())
}
}
fn backoff(attempt: u32) -> Duration {
Duration::from_millis(500 * (1 << attempt))
}
fn should_retry(status: StatusCode) -> bool {
matches!(
status,
StatusCode::TOO_MANY_REQUESTS
| StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
)
}
fn parse_error(status: u16, body: &[u8], retry_after: f64) -> FlowError {
let err_body: ErrorBody = serde_json::from_slice(body).unwrap_or_default();
let msg = err_body.msg();
let code = err_body.code.unwrap_or_default();
match status {
401 => FlowError::Authentication {
message: msg,
code,
},
404 => FlowError::NotFound {
message: msg,
code,
},
422 => FlowError::Validation {
message: msg,
code,
errors: err_body.errors,
},
429 => FlowError::RateLimit {
message: msg,
code,
retry_after,
},
500..=599 => FlowError::Server {
message: msg,
code,
},
_ => FlowError::Api {
message: msg,
code,
status,
},
}
}