use std::sync::Arc;
use std::time::Duration;
use chateau::client::conn::service::ClientExecutorService;
#[cfg(feature = "tls")]
use crate::client::conn::Stream;
use http::HeaderValue;
use http_body::Body;
#[cfg(feature = "tls")]
use rustls::ClientConfig;
#[cfg(feature = "tls")]
use tokio::io::{AsyncRead, AsyncWrite};
use tower::ServiceBuilder;
use tower::layer::util::{Identity, Stack};
use tower_http::follow_redirect::FollowRedirectLayer;
use tower_http::follow_redirect::policy;
use tower_http::set_header::SetRequestHeaderLayer;
use super::conn::dns::GaiResolver;
use super::conn::protocol::auto;
use crate::BoxError;
use crate::client::UriKey;
#[cfg(feature = "tls")]
use crate::client::conn::{AutoTlsTransport, tls::future::TransportBraidFuture};
#[cfg(feature = "tls")]
use crate::client::default_tls_config;
use crate::client::{Client, conn::protocol::auto::AlpnHttpConnectionBuilder};
use crate::service::IncomingResponseLayer;
use crate::service::OptionLayerExt;
use crate::service::TimeoutLayer;
use crate::service::{Http1ChecksLayer, Http2ChecksLayer, HttpConnectionInfo, SetHostHeaderLayer};
use chateau::client::ConnectionPoolLayer;
use chateau::client::conn::Connection;
use chateau::client::conn::Protocol;
use chateau::client::conn::Transport;
#[cfg(feature = "tls")]
use chateau::client::conn::transport::TlsConnectionError;
use chateau::client::conn::transport::tcp::{TcpTransport, TcpTransportConfig};
use chateau::client::pool::{PoolableConnection, PoolableStream};
use chateau::info::HasConnectionInfo;
use chateau::services::SharedService;
#[derive(Debug)]
pub struct Builder<T, P, RP = policy::Standard, S = Identity, BIn = crate::Body, BOut = crate::Body>
{
transport: T,
protocol: P,
builder: ServiceBuilder<S>,
user_agent: Option<String>,
redirect: Option<RP>,
timeout: Option<Duration>,
#[cfg(feature = "tls")]
tls: Option<ClientConfig>,
pool: Option<chateau::client::PoolConfig>,
body: std::marker::PhantomData<fn(BIn) -> BOut>,
}
impl Builder<(), (), policy::Standard> {
pub fn new() -> Self {
Self {
transport: (),
protocol: (),
builder: ServiceBuilder::new(),
user_agent: None,
redirect: None,
timeout: None,
#[cfg(feature = "tls")]
tls: None,
pool: None,
body: std::marker::PhantomData,
}
}
}
impl Default
for Builder<
TcpTransport<GaiResolver>,
AlpnHttpConnectionBuilder<crate::Body>,
policy::Standard,
Identity,
crate::Body,
crate::Body,
>
{
fn default() -> Self {
Self {
transport: Default::default(),
protocol: Default::default(),
builder: ServiceBuilder::new(),
user_agent: None,
redirect: Some(policy::Standard::default()),
timeout: Some(Duration::from_secs(30)),
#[cfg(feature = "tls")]
tls: Some(default_tls_config()),
pool: Some(Default::default()),
body: std::marker::PhantomData,
}
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_tcp(
self,
config: TcpTransportConfig,
) -> Builder<TcpTransport<GaiResolver>, P, RP, S, BIn, BOut> {
Builder {
transport: TcpTransport::new(GaiResolver::new(), Arc::new(config)),
protocol: self.protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn with_transport<T2>(self, transport: T2) -> Builder<T2, P, RP, S, BIn, BOut> {
Builder {
transport,
protocol: self.protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn transport(&mut self) -> &mut T {
&mut self.transport
}
}
#[cfg(feature = "tls")]
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn without_tls(mut self) -> Self {
self.tls = None;
self
}
pub fn with_tls(mut self, config: ClientConfig) -> Self {
self.tls = Some(config);
self
}
pub fn with_default_tls(mut self) -> Self {
self.tls = Some(default_tls_config());
self
}
pub fn tls(&mut self) -> &mut Option<ClientConfig> {
&mut self.tls
}
}
#[cfg(not(feature = "tls"))]
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn without_tls(self) -> Self {
self
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn pool(&mut self) -> Option<&mut chateau::client::PoolConfig> {
self.pool.as_mut()
}
pub fn with_pool(mut self, pool: chateau::client::PoolConfig) -> Self {
self.pool = Some(pool);
self
}
pub fn with_default_pool(mut self) -> Self {
self.pool = Some(Default::default());
self
}
pub fn without_pool(mut self) -> Self {
self.pool = None;
self
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_auto_http(
self,
) -> Builder<T, auto::AlpnHttpConnectionBuilder<BIn>, RP, S, BIn, BOut> {
Builder {
transport: self.transport,
protocol: auto::AlpnHttpConnectionBuilder::default(),
builder: self.builder,
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn with_protocol<P2>(self, protocol: P2) -> Builder<T, P2, RP, S, BIn, BOut> {
Builder {
transport: self.transport,
protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn protocol(&mut self) -> &mut P {
&mut self.protocol
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_user_agent(mut self, user_agent: String) -> Self {
self.user_agent = Some(user_agent);
self
}
pub fn user_agent(&self) -> Option<&str> {
self.user_agent.as_deref()
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_redirect_policy<RP2>(self, policy: RP2) -> Builder<T, P, RP2, S, BIn, BOut> {
Builder {
transport: self.transport,
protocol: self.protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: Some(policy),
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn without_redirects(self) -> Builder<T, P, policy::Standard, S, BIn, BOut> {
Builder {
transport: self.transport,
protocol: self.protocol,
user_agent: self.user_agent,
builder: self.builder,
redirect: None,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn with_standard_redirect_policy(self) -> Builder<T, P, policy::Standard, S, BIn, BOut> {
Builder {
transport: self.transport,
protocol: self.protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: Some(policy::Standard::default()),
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
pub fn redirect_policy(&mut self) -> Option<&mut RP> {
self.redirect.as_mut()
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn timeout(&self) -> Option<Duration> {
self.timeout
}
pub fn without_timeout(mut self) -> Self {
self.timeout = None;
self
}
pub fn with_optional_timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn with_body<B2In, B2Out>(self) -> Builder<T, P, RP, S, B2In, B2Out>
where
B2In: Default + Body + Unpin + Send + 'static,
<B2In as Body>::Data: Send,
<B2In as Body>::Error: Into<BoxError>,
B2Out: From<hyper::body::Incoming> + Body + Unpin + Send + 'static,
{
Builder {
transport: self.transport,
protocol: self.protocol,
builder: self.builder,
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: std::marker::PhantomData,
}
}
}
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut> {
pub fn layer<L>(self, layer: L) -> Builder<T, P, RP, Stack<L, S>, BIn, BOut> {
Builder {
transport: self.transport,
protocol: self.protocol,
builder: self.builder.layer(layer),
user_agent: self.user_agent,
redirect: self.redirect,
timeout: self.timeout,
#[cfg(feature = "tls")]
tls: self.tls,
pool: self.pool,
body: self.body,
}
}
}
#[cfg(not(feature = "tls"))]
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut>
where
T: Transport<http::Request<BIn>> + Clone + Send + Sync + 'static,
T::Error: Into<BoxError>,
<T as Transport<http::Request<BIn>>>::IO:
PoolableStream + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
<<T as Transport<http::Request<BIn>>>::IO as HasConnectionInfo>::Addr: Unpin + Clone + Send,
P: Protocol<<T as Transport<http::Request<BIn>>>::IO, http::Request<BIn>>
+ Clone
+ Send
+ Sync
+ 'static,
<P as Protocol<<T as Transport<http::Request<BIn>>>::IO, http::Request<BIn>>>::Error:
Into<BoxError>,
<P as Protocol<<T as Transport<http::Request<BIn>>>::IO, http::Request<BIn>>>::Connection:
Connection<http::Request<BIn>, Response = http::Response<hyper::body::Incoming>>
+ HttpConnectionInfo<BIn>
+ PoolableConnection<http::Request<BIn>>,
<<P as chateau::client::conn::Protocol<
<T as Transport<http::Request<BIn>>>::IO,
http::Request<BIn>,
>>::Connection as Connection<http::Request<BIn>>>::Error: Into<BoxError>,
RP: policy::Policy<BIn, super::Error> + Clone + Send + Sync + 'static,
S: tower::Layer<SharedService<http::Request<BIn>, http::Response<BOut>, super::Error>>,
S::Service: tower::Service<http::Request<BIn>, Response = http::Response<BOut>, Error = super::Error>
+ Clone
+ Send
+ Sync
+ 'static,
<S::Service as tower::Service<http::Request<BIn>>>::Future: Send + 'static,
BIn: Default + Body + Unpin + Send + 'static,
<BIn as Body>::Data: Send,
<BIn as Body>::Error: Into<BoxError>,
BOut: From<hyper::body::Incoming> + Body + Unpin + Send + 'static,
{
pub fn build_service(
self,
) -> SharedService<http::Request<BIn>, http::Response<BOut>, super::Error> {
let user_agent = if let Some(ua) = self.user_agent {
HeaderValue::from_str(&ua).expect("user-agent should be a valid http header")
} else {
HeaderValue::from_static(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
};
let executor = ServiceBuilder::new()
.layer(SetHostHeaderLayer::new())
.layer(Http2ChecksLayer::new())
.layer(Http1ChecksLayer::new())
.service(ClientExecutorService::new());
let middleware = self
.builder
.layer(SharedService::layer())
.check_service::<_, _, _, super::Error>()
.optional(
self.timeout
.map(|d| TimeoutLayer::new(|| super::Error::RequestTimeout, d)),
)
.optional(self.redirect.map(FollowRedirectLayer::with_policy))
.layer(SetRequestHeaderLayer::if_not_present(
http::header::USER_AGENT,
user_agent,
))
.layer(IncomingResponseLayer::new());
let service = SharedService::new(
middleware
.map_err(super::Error::from)
.layer(
ConnectionPoolLayer::<_, _, http::Request<BIn>, UriKey>::new(
self.transport,
self.protocol,
)
.with_optional_pool(self.pool.clone()),
)
.service(executor),
);
service
}
}
#[cfg(feature = "tls")]
impl<T, P, RP, S, BIn, BOut> Builder<T, P, RP, S, BIn, BOut>
where
T: Transport<http::Request<BIn>> + Clone + Send + Sync + 'static,
T::Error: Into<BoxError>,
<T as Transport<http::Request<BIn>>>::IO: PoolableStream + HasConnectionInfo + AsyncRead + AsyncWrite + Unpin,
<<T as Transport<http::Request<BIn>>>::IO as HasConnectionInfo>::Addr: Clone + Send + Unpin,
P: Protocol<Stream<<T as Transport<http::Request<BIn>>>::IO>, http::Request<BIn>> + Clone + Send + Sync + 'static,
<P as Protocol<Stream<<T as Transport<http::Request<BIn>>>::IO>, http::Request<BIn>>>::Error: Into<BoxError>,
<P as Protocol<Stream<<T as Transport<http::Request<BIn>>>::IO>, http::Request<BIn>>>::Connection: Connection<http::Request<BIn>, Response = http::Response<hyper::body::Incoming>>
+ HttpConnectionInfo<BIn>
+ PoolableConnection<http::Request<BIn>>,
<<P as chateau::client::conn::Protocol<Stream<<T as Transport<http::Request<BIn>>>::IO>, http::Request<BIn>>>::Connection as Connection<http::Request<BIn>>>::Error: Into<BoxError>,
RP: policy::Policy<BIn, super::Error> + Clone + Send + Sync + 'static,
S: tower::Layer<SharedService<http::Request<BIn>, http::Response<BOut>, super::Error>>,
S::Service: tower::Service<http::Request<BIn>, Response = http::Response<BOut>, Error = super::Error>
+ Clone
+ Send
+ Sync
+ 'static,
<S::Service as tower::Service<http::Request<BIn>>>::Future: Send + 'static,
BIn: Default + Body + Unpin + Send + 'static,
<BIn as Body>::Data: Send,
BOut: From<hyper::body::Incoming> + Body + Unpin + Send + 'static,
{
pub fn build_service(
self,
) -> SharedService<http::Request<BIn>, http::Response<BOut>, super::Error> {
let user_agent = if let Some(ua) = self.user_agent {
HeaderValue::from_str(&ua).expect("user-agent should be a valid http header")
} else {
HeaderValue::from_static(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION")
))
};
let middleware = self
.builder
.layer(SharedService::layer())
.optional(
self.timeout
.map(|d| TimeoutLayer::new(|| super::Error::RequestTimeout, d)),
)
.optional(self.redirect.map(FollowRedirectLayer::with_policy))
.layer(SetRequestHeaderLayer::if_not_present(
http::header::USER_AGENT,
user_agent,
))
.layer(IncomingResponseLayer::new());
let executor = ServiceBuilder::new()
.layer(SetHostHeaderLayer::new())
.layer(Http2ChecksLayer::new())
.layer(Http1ChecksLayer::new())
.service(ClientExecutorService::new());
SharedService::new(
middleware
.check_service::<_, http::Request<BIn>, http::Response<BOut>, super::Error>()
.map_err(super::Error::from)
.layer(
ConnectionPoolLayer::< _, _, http::Request<BIn>, UriKey>::new(
AutoTlsTransport::new(self.transport).with_optional_tls(self.tls.map(Arc::new)),
self.protocol,
)
.with_optional_pool(self.pool.clone()),
)
.service(executor),
)
}
}
#[cfg(not(feature = "tls"))]
impl< T, P, RP, S> Builder<T, P, RP, S, crate::Body, crate::Body>
where
T: Transport<http::Request<crate::Body>> + Clone + Send + Sync + 'static,
T::Error: Into<BoxError>,
<T as Transport<http::Request<crate::Body>>>::IO: PoolableStream + tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
<<T as Transport<http::Request<crate::Body>>>::IO as HasConnectionInfo>::Addr: Unpin + Clone + Send,
P: Protocol<<T as Transport<http::Request<crate::Body>>>::IO, http::Request<crate::Body>> + Clone + Send + Sync + 'static,
<P as Protocol<<T as Transport<http::Request<crate::Body>>>::IO, http::Request<crate::Body>>>::Error: Into<BoxError>,
<P as Protocol<<T as Transport<http::Request<crate::Body>>>::IO, http::Request<crate::Body>>>::Connection: Connection<http::Request<crate::Body>, Response = http::Response<hyper::body::Incoming>>
+ HttpConnectionInfo<crate::Body>
+ PoolableConnection<http::Request<crate::Body>>,
<<P as chateau::client::conn::Protocol<<T as Transport<http::Request<crate::Body>>>::IO, http::Request<crate::Body>>>::Connection as Connection<http::Request<crate::Body>>>::Error: Into<BoxError>,
RP: policy::Policy<crate::Body, super::Error> + Clone + Send + Sync + 'static,
S: tower::Layer<SharedService<http::Request<crate::Body>, http::Response<crate::Body>, super::Error>>,
S::Service: tower::Service<http::Request<crate::Body>, Response = http::Response<crate::Body>, Error=super::Error>
+ Clone
+ Send
+ Sync
+ 'static,
<S::Service as tower::Service<http::Request<crate::Body>>>::Future: Send + 'static,
{
pub fn build(self) -> Client {
Client::new_from_service(self.build_service())
}
}
#[cfg(feature = "tls")]
impl<T, P, RP, S> Builder<T, P, RP, S, crate::Body, crate::Body>
where
T: Transport<http::Request<crate::Body>> + Clone + Send + Sync + 'static,
T::Error: Into<BoxError>,
AutoTlsTransport<T>: Transport<
http::Request<crate::Body>,
Error = TlsConnectionError<<T as Transport<http::Request<crate::Body>>>::Error>,
Future = TransportBraidFuture<T, http::Request<crate::Body>>,
IO = Stream<<T as Transport<http::Request<crate::Body>>>::IO>,
>,
<T as Transport<http::Request<crate::Body>>>::IO:
PoolableStream + HasConnectionInfo + AsyncRead + AsyncWrite + Unpin,
<<T as Transport<http::Request<crate::Body>>>::IO as HasConnectionInfo>::Addr:
Clone + Send + Unpin,
P: Protocol<
Stream<<T as Transport<http::Request<crate::Body>>>::IO>,
http::Request<crate::Body>,
> + Clone
+ Send
+ Sync
+ 'static,
<P as Protocol<
Stream<<T as Transport<http::Request<crate::Body>>>::IO>,
http::Request<crate::Body>,
>>::Error: Into<BoxError>,
<P as Protocol<
Stream<<T as Transport<http::Request<crate::Body>>>::IO>,
http::Request<crate::Body>,
>>::Connection: Connection<http::Request<crate::Body>, Response = http::Response<hyper::body::Incoming>>
+ HttpConnectionInfo<crate::Body>
+ PoolableConnection<http::Request<crate::Body>>,
<<P as chateau::client::conn::Protocol<
Stream<<T as Transport<http::Request<crate::Body>>>::IO>,
http::Request<crate::Body>,
>>::Connection as Connection<http::Request<crate::Body>>>::Error: Into<BoxError>,
RP: policy::Policy<crate::Body, super::Error> + Clone + Send + Sync + 'static,
S: tower::Layer<
SharedService<http::Request<crate::Body>, http::Response<crate::Body>, super::Error>,
>,
S::Service: tower::Service<
http::Request<crate::Body>,
Response = http::Response<crate::Body>,
Error = super::Error,
> + Clone
+ Send
+ Sync
+ 'static,
<S::Service as tower::Service<http::Request<crate::Body>>>::Future: Send + 'static,
{
pub fn build(self) -> Client {
Client::new_from_service(self.build_service())
}
}
#[cfg(test)]
mod tests {
use super::Builder;
#[test]
fn build_default_compiles() {
#[cfg(feature = "tls")]
{
crate::fixtures::tls_install_default();
}
let _ = Builder::default().build();
}
}