use std::{sync::Arc, time::Duration};
use reqwest::{Client, Method, Response, StatusCode};
use serde::{de::DeserializeOwned, Serialize};
use url::Url;
use crate::{
error::{ApiError, ErrorEnvelope, Result},
retry::{is_idempotent_method, RetryConfig},
};
#[derive(Debug, Clone)]
pub struct HttpInner {
pub(crate) base_url: Arc<Url>,
pub(crate) client: Client,
pub(crate) retry: RetryConfig,
}
impl HttpInner {
pub(crate) fn new(base_url: Url, client: Client, retry: RetryConfig) -> Self {
Self {
base_url: Arc::new(base_url),
client,
retry,
}
}
pub fn reqwest_client(&self) -> &Client {
&self.client
}
pub fn base_url(&self) -> &Url {
&self.base_url
}
pub fn url(&self, path: &str) -> Result<Url> {
let joined = self
.base_url
.join(path.trim_start_matches('/'))
.map_err(ApiError::from)?;
Ok(joined)
}
pub async fn execute<T: DeserializeOwned>(&self, method: Method, url: Url) -> Result<T> {
self.execute_inner::<T, ()>(method, url, None).await
}
pub async fn execute_with_body<T, B>(&self, method: Method, url: Url, body: &B) -> Result<T>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
self.execute_inner::<T, B>(method, url, Some(body)).await
}
async fn execute_inner<T, B>(&self, method: Method, url: Url, body: Option<&B>) -> Result<T>
where
T: DeserializeOwned,
B: Serialize + ?Sized,
{
let max_attempts = if is_idempotent_method(&method) {
self.retry.max_attempts.max(1)
} else {
1
};
let mut last_err: Option<ApiError> = None;
for attempt in 1..=max_attempts {
#[cfg(feature = "tracing")]
let started = std::time::Instant::now();
let mut req = self.client.request(method.clone(), url.clone());
if let Some(b) = body {
req = req.json(b);
}
let send_result = req.send().await;
let response = match send_result {
Ok(r) => r,
Err(e) => {
let err = ApiError::from(e);
if attempt < max_attempts && err.is_retryable() {
let delay = self.retry.backoff_for(attempt);
#[cfg(feature = "tracing")]
tracing::warn!(
attempt,
next_delay_ms = delay.as_millis() as u64,
error = %err,
"transport error, retrying"
);
tokio::time::sleep(delay).await;
last_err = Some(err);
continue;
}
return Err(err);
}
};
let status = response.status();
#[cfg(feature = "tracing")]
tracing::debug!(
method = %method,
path = %url.path(),
status = status.as_u16(),
elapsed_ms = started.elapsed().as_millis() as u64,
"cnb api call"
);
if status.is_success() {
return decode_response::<T>(response, status).await;
}
let retry_after = parse_retry_after(&response);
let api_err = build_http_error(response, status).await;
if attempt < max_attempts && api_err.is_retryable() {
let delay = retry_after
.unwrap_or_else(|| self.retry.backoff_for(attempt))
.min(self.retry.max_delay.max(Duration::from_secs(60)));
#[cfg(feature = "tracing")]
tracing::warn!(
attempt,
status = status.as_u16(),
next_delay_ms = delay.as_millis() as u64,
"retryable HTTP error, backing off"
);
tokio::time::sleep(delay).await;
last_err = Some(api_err);
continue;
}
return Err(api_err);
}
Err(last_err.unwrap_or_else(|| ApiError::EnvVar("retry loop exited unexpectedly".into())))
}
}
async fn decode_response<T: DeserializeOwned>(response: Response, status: StatusCode) -> Result<T> {
let bytes = response.bytes().await?;
if bytes.is_empty() {
return serde_json::from_slice::<T>(b"null").map_err(|e| {
let _ = status;
ApiError::from(e)
});
}
serde_json::from_slice::<T>(&bytes).map_err(ApiError::from)
}
async fn build_http_error(response: Response, status: StatusCode) -> ApiError {
let header_request_id = response
.headers()
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let body: String = response.text().await.unwrap_or_default();
let envelope = if body.is_empty() {
ErrorEnvelope::default()
} else {
ErrorEnvelope::parse(&body)
};
let message = envelope
.message
.clone()
.unwrap_or_else(|| status.canonical_reason().unwrap_or("error").to_string());
let request_id = envelope.request_id.clone().or(header_request_id);
ApiError::Http {
status: status.as_u16(),
body,
code: envelope.code,
message,
request_id,
}
}
fn parse_retry_after(response: &Response) -> Option<Duration> {
response
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.trim().parse::<u64>().ok())
.map(Duration::from_secs)
}