#![cfg_attr(docsrs, doc(cfg(feature = "client")))]
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use http::Request;
use http::Response;
use http_body::Body;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::client::conn::http1::SendRequest;
use hyper::client::{self};
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use rustls::ClientConfig;
use rustls::RootCertStore;
use rustls::pki_types::ServerName;
use tokio::net::TcpStream;
use tokio::task::JoinHandle;
use tokio_rustls::TlsConnector;
#[cfg(not(feature = "native-certs"))]
use webpki_roots::TLS_SERVER_ROOTS;
fn load_root_certs(store: &mut RootCertStore) {
#[cfg(feature = "native-certs")]
{
let result = rustls_native_certs::load_native_certs();
for err in &result.errors {
tracing::warn!(error = %err, "rustls-native-certs partial failure");
}
for cert in result.certs {
let _ = store.add(cert);
}
}
#[cfg(not(feature = "native-certs"))]
{
store.extend(TLS_SERVER_ROOTS.iter().cloned());
}
}
pub struct V2Client {
inner: HyperClient<HttpConnector, Full<bytes::Bytes>>,
default_timeout: Option<Duration>,
max_retries: u32,
retry_backoff: Duration,
user_agent: Option<String>,
retry_only_idempotent: bool,
}
pub struct V2ClientBuilder {
pool_idle_timeout: Option<Duration>,
pool_max_idle_per_host: Option<usize>,
default_timeout: Option<Duration>,
max_retries: u32,
retry_backoff: Duration,
user_agent: Option<String>,
retry_only_idempotent: bool,
}
impl V2ClientBuilder {
fn new() -> Self {
Self {
pool_idle_timeout: Some(Duration::from_secs(90)),
pool_max_idle_per_host: Some(8),
default_timeout: Some(Duration::from_secs(30)),
max_retries: 0,
retry_backoff: Duration::from_millis(100),
user_agent: Some(format!("tako/{}", env!("CARGO_PKG_VERSION"))),
retry_only_idempotent: true,
}
}
pub fn timeout(mut self, d: Duration) -> Self {
self.default_timeout = Some(d);
self
}
pub fn max_retries(mut self, n: u32) -> Self {
self.max_retries = n;
self
}
pub fn retry_backoff(mut self, d: Duration) -> Self {
self.retry_backoff = d;
self
}
pub fn retry_non_idempotent(mut self, allow: bool) -> Self {
self.retry_only_idempotent = !allow;
self
}
pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
self.user_agent = Some(ua.into());
self
}
pub fn pool_idle_timeout(mut self, d: Duration) -> Self {
self.pool_idle_timeout = Some(d);
self
}
pub fn pool_max_idle_per_host(mut self, n: usize) -> Self {
self.pool_max_idle_per_host = Some(n);
self
}
pub fn build(self) -> V2Client {
let mut http = HttpConnector::new();
http.enforce_http(false);
let mut builder = HyperClient::builder(TokioExecutor::new());
if let Some(d) = self.pool_idle_timeout {
builder.pool_idle_timeout(d);
}
if let Some(n) = self.pool_max_idle_per_host {
builder.pool_max_idle_per_host(n);
}
let inner = builder.build(http);
V2Client {
inner,
default_timeout: self.default_timeout,
max_retries: self.max_retries,
retry_backoff: self.retry_backoff,
user_agent: self.user_agent,
retry_only_idempotent: self.retry_only_idempotent,
}
}
}
impl V2Client {
pub fn builder() -> V2ClientBuilder {
V2ClientBuilder::new()
}
pub async fn send(
&self,
mut req: Request<Full<bytes::Bytes>>,
) -> Result<Response<hyper::body::Incoming>, Box<dyn Error + Send + Sync>> {
if let Some(ua) = self.user_agent.as_deref()
&& !req.headers().contains_key(http::header::USER_AGENT)
&& let Ok(v) = http::HeaderValue::from_str(ua)
{
req.headers_mut().insert(http::header::USER_AGENT, v);
}
let method_idempotent = matches!(
*req.method(),
http::Method::GET
| http::Method::HEAD
| http::Method::PUT
| http::Method::DELETE
| http::Method::OPTIONS
| http::Method::TRACE
);
let retries_allowed = !self.retry_only_idempotent || method_idempotent;
let attempt_max = if retries_allowed {
self.max_retries.saturating_add(1)
} else {
1
};
let mut last_err: Option<Box<dyn Error + Send + Sync>> = None;
for attempt in 0..attempt_max {
let Some(req_clone) = clone_request_full(&req) else {
last_err = Some("failed to clone request for retry".into());
break;
};
if attempt > 0 {
let factor = 1u32
.checked_shl(attempt.saturating_sub(1))
.unwrap_or(u32::MAX);
let backoff = self
.retry_backoff
.saturating_mul(factor)
.saturating_add(Duration::from_millis(u64::from(attempt)));
tokio::time::sleep(backoff).await;
}
let send = self.inner.request(req_clone);
let result = if let Some(t) = self.default_timeout {
match tokio::time::timeout(t, send).await {
Ok(r) => r.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>),
Err(_) => Err("request timed out".into()),
}
} else {
send
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send + Sync>)
};
match result {
Ok(resp) if resp.status().is_server_error() && attempt + 1 < attempt_max => {
last_err = Some(format!("server error {}", resp.status()).into());
}
Ok(resp) => return Ok(resp),
Err(e) => {
last_err = Some(e);
if attempt + 1 == attempt_max {
break;
}
}
}
}
Err(last_err.unwrap_or_else(|| "client failed without error detail".into()))
}
}
fn clone_request_full(req: &Request<Full<bytes::Bytes>>) -> Option<Request<Full<bytes::Bytes>>> {
let mut builder = Request::builder()
.method(req.method().clone())
.uri(req.uri().clone())
.version(req.version());
for (k, v) in req.headers() {
builder = builder.header(k.clone(), v.clone());
}
let body = req.body().clone();
builder.body(body).ok()
}
pub struct TakoTlsClient<B: Body>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
sender: SendRequest<B>,
conn_handle: JoinHandle<Result<(), hyper::Error>>,
}
impl<B> TakoTlsClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub async fn new<'a>(host: &'a str, port: Option<u16>) -> Result<Self, Box<dyn Error>>
where
'a: 'static,
{
let port = port.unwrap_or(443);
let addr = format!("{host}:{port}");
let tcp_stream = TcpStream::connect(addr).await?;
let mut root_cert_store = RootCertStore::empty();
load_root_certs(&mut root_cert_store);
let tls_config = ClientConfig::builder()
.with_root_certificates(root_cert_store)
.with_no_client_auth();
let connector = TlsConnector::from(Arc::new(tls_config));
let server_name = ServerName::try_from(host)?;
let tls_stream = connector.connect(server_name, tcp_stream).await?;
let io = TokioIo::new(tls_stream);
let (sender, conn) = client::conn::http1::handshake::<_, B>(io).await?;
let conn_handle = tokio::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection error: {}", err);
}
Ok(())
});
Ok(Self {
sender,
conn_handle,
})
}
pub async fn request(&mut self, req: Request<B>) -> Result<Response<Vec<u8>>, Box<dyn Error>> {
let mut response = self.sender.send_request(req).await?;
let mut body_bytes = Vec::new();
while let Some(frame) = response.frame().await {
let frame = frame?;
if let Some(chunk) = frame.data_ref() {
body_bytes.extend_from_slice(chunk);
}
}
let parts = response.into_parts();
let resp = Response::from_parts(parts.0, body_bytes);
Ok(resp)
}
}
impl<B> Drop for TakoTlsClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn drop(&mut self) {
self.conn_handle.abort();
}
}
pub struct TakoClient<B: Body>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
sender: SendRequest<B>,
conn_handle: JoinHandle<Result<(), hyper::Error>>,
}
impl<B> TakoClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
pub async fn new<'a>(host: &'a str, port: Option<u16>) -> Result<Self, Box<dyn Error>>
where
'a: 'static,
{
let port = port.unwrap_or(80);
let addr = format!("{host}:{port}");
let tcp_stream = TcpStream::connect(addr).await?;
let io = TokioIo::new(tcp_stream);
let (sender, conn) = client::conn::http1::handshake::<_, B>(io).await?;
let conn_handle = tokio::spawn(async move {
if let Err(err) = conn.await {
tracing::error!("Connection error: {}", err);
}
Ok(())
});
Ok(Self {
sender,
conn_handle,
})
}
pub async fn request(&mut self, req: Request<B>) -> Result<Response<Vec<u8>>, Box<dyn Error>> {
let mut response = self.sender.send_request(req).await?;
let mut body_bytes = Vec::new();
while let Some(frame) = response.frame().await {
let frame = frame?;
if let Some(chunk) = frame.data_ref() {
body_bytes.extend_from_slice(chunk);
}
}
let parts = response.into_parts();
let resp = Response::from_parts(parts.0, body_bytes);
Ok(resp)
}
}
impl<B> Drop for TakoClient<B>
where
B: Body + Send + 'static,
B::Data: Send + 'static,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn drop(&mut self) {
self.conn_handle.abort();
}
}