use std::error::Error;
use std::time::Duration;
use http::Request;
use http::Response;
use http_body_util::Full;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioExecutor;
pub struct V2Client {
inner: HyperClient<HttpConnector, Full<bytes::Bytes>>,
default_timeout: Option<Duration>,
max_retries: u32,
retry_backoff: Duration,
user_agent: Option<String>,
retry_only_idempotent: bool,
}
pub struct V2ClientBuilder {
pool_idle_timeout: Option<Duration>,
pool_max_idle_per_host: Option<usize>,
default_timeout: Option<Duration>,
max_retries: u32,
retry_backoff: Duration,
user_agent: Option<String>,
retry_only_idempotent: bool,
}
impl V2ClientBuilder {
fn new() -> Self {
Self {
pool_idle_timeout: Some(Duration::from_secs(90)),
pool_max_idle_per_host: Some(8),
default_timeout: Some(Duration::from_secs(30)),
max_retries: 0,
retry_backoff: Duration::from_millis(100),
user_agent: Some(format!("tako/{}", env!("CARGO_PKG_VERSION"))),
retry_only_idempotent: true,
}
}
pub fn timeout(mut self, d: Duration) -> Self {
self.default_timeout = Some(d);
self
}
pub fn max_retries(mut self, n: u32) -> Self {
self.max_retries = n;
self
}
pub fn retry_backoff(mut self, d: Duration) -> Self {
self.retry_backoff = d;
self
}
pub fn retry_non_idempotent(mut self, allow: bool) -> Self {
self.retry_only_idempotent = !allow;
self
}
pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
self.user_agent = Some(ua.into());
self
}
pub fn pool_idle_timeout(mut self, d: Duration) -> Self {
self.pool_idle_timeout = Some(d);
self
}
pub fn pool_max_idle_per_host(mut self, n: usize) -> Self {
self.pool_max_idle_per_host = Some(n);
self
}
pub fn build(self) -> V2Client {
let mut http = HttpConnector::new();
http.enforce_http(false);
let mut builder = HyperClient::builder(TokioExecutor::new());
if let Some(d) = self.pool_idle_timeout {
builder.pool_idle_timeout(d);
}
if let Some(n) = self.pool_max_idle_per_host {
builder.pool_max_idle_per_host(n);
}
let inner = builder.build(http);
V2Client {
inner,
default_timeout: self.default_timeout,
max_retries: self.max_retries,
retry_backoff: self.retry_backoff,
user_agent: self.user_agent,
retry_only_idempotent: self.retry_only_idempotent,
}
}
}
impl V2Client {
pub fn builder() -> V2ClientBuilder {
V2ClientBuilder::new()
}
pub async fn send(
&self,
mut req: Request<Full<bytes::Bytes>>,
) -> Result<Response<hyper::body::Incoming>, Box<dyn Error + Send + Sync>> {
if let Some(ua) = self.user_agent.as_deref()
&& !req.headers().contains_key(http::header::USER_AGENT)
&& let Ok(v) = http::HeaderValue::from_str(ua)
{
req.headers_mut().insert(http::header::USER_AGENT, v);
}
let method_idempotent = matches!(
*req.method(),
http::Method::GET
| http::Method::HEAD
| http::Method::PUT
| http::Method::DELETE
| http::Method::OPTIONS
| http::Method::TRACE
);
let retries_allowed = !self.retry_only_idempotent || method_idempotent;
let attempt_max = if retries_allowed {
self.max_retries.saturating_add(1)
} else {
1
};
let mut last_err: Option<Box<dyn Error + Send + Sync>> = None;
for attempt in 0..attempt_max {
let Some(req_clone) = clone_request_full(&req) else {
last_err = Some("failed to clone request for retry".into());
break;
};
if attempt > 0 {
let factor = 1u32
.checked_shl(attempt.saturating_sub(1))
.unwrap_or(u32::MAX);
let backoff = self
.retry_backoff
.saturating_mul(factor)
.saturating_add(Duration::from_millis(u64::from(attempt)));
tokio::time::sleep(backoff).await;
}
let send = self.inner.request(req_clone);
let result = if let Some(t) = self.default_timeout {
match tokio::time::timeout(t, send).await {
Ok(r) => r.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>),
Err(_) => Err("request timed out".into()),
}
} else {
send
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
};
match result {
Ok(resp) if resp.status().is_server_error() && attempt + 1 < attempt_max => {
last_err = Some(format!("server error {}", resp.status()).into());
}
Ok(resp) => return Ok(resp),
Err(e) => {
last_err = Some(e);
if attempt + 1 == attempt_max {
break;
}
}
}
}
Err(last_err.unwrap_or_else(|| "client failed without error detail".into()))
}
}
fn clone_request_full(req: &Request<Full<bytes::Bytes>>) -> Option<Request<Full<bytes::Bytes>>> {
let mut builder = Request::builder()
.method(req.method().clone())
.uri(req.uri().clone())
.version(req.version());
for (k, v) in req.headers() {
builder = builder.header(k.clone(), v.clone());
}
let body = req.body().clone();
builder.body(body).ok()
}