use std::time::Duration;
use serde::de::DeserializeOwned;
use serde::Serialize;
use crate::client::Client;
use crate::error::{map_error, Error, ErrorEnvelope, Result};
#[derive(Default, Clone, Copy)]
pub(crate) struct RequestOpts<'a> {
pub idempotency_key: Option<&'a str>,
}
pub(crate) async fn send<T, B>(
client: &Client,
method: reqwest::Method,
path: &str,
query: Option<&[(&str, String)]>,
body: Option<&B>,
opts: RequestOpts<'_>,
) -> Result<T>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
let retries = retry_budget(client.inner.max_retries, &method, opts.idempotency_key);
let mut attempt: u32 = 0;
loop {
attempt += 1;
let result = send_once::<T, B>(client, method.clone(), path, query, body, opts).await;
match result {
Ok(v) => return Ok(v),
Err(err) if attempt <= retries && is_retriable(&err) => {
sleep_with_backoff(attempt, retry_after_from(&err)).await;
continue;
}
Err(err) => return Err(err),
}
}
}
pub(crate) async fn send_empty<B>(
client: &Client,
method: reqwest::Method,
path: &str,
query: Option<&[(&str, String)]>,
body: Option<&B>,
opts: RequestOpts<'_>,
) -> Result<()>
where
B: Serialize + ?Sized,
{
let retries = retry_budget(client.inner.max_retries, &method, opts.idempotency_key);
let mut attempt: u32 = 0;
loop {
attempt += 1;
let result = send_empty_once::<B>(client, method.clone(), path, query, body, opts).await;
match result {
Ok(()) => return Ok(()),
Err(err) if attempt <= retries && is_retriable(&err) => {
sleep_with_backoff(attempt, retry_after_from(&err)).await;
continue;
}
Err(err) => return Err(err),
}
}
}
fn retry_budget(max: u32, method: &reqwest::Method, idem_key: Option<&str>) -> u32 {
if max == 0 {
return 0;
}
let is_safe = matches!(*method, reqwest::Method::GET | reqwest::Method::HEAD);
if is_safe || idem_key.is_some() {
max
} else {
0
}
}
fn is_retriable(err: &Error) -> bool {
matches!(err, Error::Network(_) | Error::RateLimited { .. })
|| matches!(err, Error::Api { status, .. } if *status >= 500)
}
fn retry_after_from(err: &Error) -> Option<u64> {
match err {
Error::RateLimited { retry_after, .. } => *retry_after,
_ => None,
}
}
async fn sleep_with_backoff(attempt: u32, retry_after: Option<u64>) {
let base_secs = 2_u64.saturating_pow(attempt.saturating_sub(1)).min(60);
let jittered = jitter(base_secs);
let floor = Duration::from_secs(retry_after.unwrap_or(0));
let delay = std::cmp::max(floor, jittered);
tokio::time::sleep(delay).await;
}
fn jitter(base_secs: u64) -> Duration {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let jitter_ms = (nanos % (base_secs * 500)).min(base_secs * 500);
Duration::from_millis(base_secs * 1000 - (base_secs * 250) + jitter_ms)
.min(Duration::from_secs(65))
}
async fn send_once<T, B>(
client: &Client,
method: reqwest::Method,
path: &str,
query: Option<&[(&str, String)]>,
body: Option<&B>,
opts: RequestOpts<'_>,
) -> Result<T>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
let (status, request_id, retry_after, bytes, ok) =
do_request(client, method, path, query, body, opts).await?;
if ok {
return serde_json::from_slice(&bytes).map_err(Error::Decode);
}
Err(parse_error(status, request_id, retry_after, &bytes))
}
async fn send_empty_once<B>(
client: &Client,
method: reqwest::Method,
path: &str,
query: Option<&[(&str, String)]>,
body: Option<&B>,
opts: RequestOpts<'_>,
) -> Result<()>
where
B: Serialize + ?Sized,
{
let (status, request_id, retry_after, bytes, ok) =
do_request(client, method, path, query, body, opts).await?;
if ok {
return Ok(());
}
Err(parse_error(status, request_id, retry_after, &bytes))
}
async fn do_request<B>(
client: &Client,
method: reqwest::Method,
path: &str,
query: Option<&[(&str, String)]>,
body: Option<&B>,
opts: RequestOpts<'_>,
) -> Result<(u16, Option<String>, Option<u64>, bytes::Bytes, bool)>
where
B: Serialize + ?Sized,
{
let url = format!("{}{}", client.inner.base_url, path);
let mut req = client
.inner
.http
.request(method, &url)
.header("X-Api-Key", client.inner.api_key_header.clone());
if let Some(q) = query {
req = req.query(q);
}
if let Some(b) = body {
req = req.json(b);
}
if let Some(k) = opts.idempotency_key {
req = req.header("Idempotency-Key", k);
}
let response = req.send().await.map_err(Error::Network)?;
let status = response.status().as_u16();
let ok = response.status().is_success();
let request_id = response
.headers()
.get("X-Request-Id")
.and_then(|v| v.to_str().ok())
.map(String::from);
let retry_after = response
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok());
let bytes = response.bytes().await.map_err(Error::Network)?;
Ok((status, request_id, retry_after, bytes, ok))
}
fn parse_error(
status: u16,
request_id: Option<String>,
retry_after: Option<u64>,
bytes: &[u8],
) -> Error {
match serde_json::from_slice::<ErrorEnvelope>(bytes) {
Ok(env) => map_error(status, env.error, request_id, retry_after),
Err(_) => Error::Api {
code: "unknown".into(),
message: String::from_utf8_lossy(bytes).to_string(),
status,
request_id,
},
}
}