use crate::raw::{BuildRawClient, DefaultRawClient, RawBody, Service};
use crate::service::gzip::{DecodedBody, GzipLayer};
use crate::service::http_error::HttpErrorLayer;
use crate::service::map_error::MapErrorLayer;
use crate::service::metrics::MetricsLayer;
use crate::service::node::{NodeMetricsLayer, NodeSelectorLayer, NodeUriLayer};
use crate::service::proxy::{ProxyConfig, ProxyLayer};
use crate::service::response_body::ResponseBodyLayer;
use crate::service::retry::RetryLayer;
use crate::service::root_span::RootSpanLayer;
use crate::service::trace_propagation::TracePropagationLayer;
use crate::service::user_agent::UserAgentLayer;
use crate::service::wait_for_spans::{WaitForSpansBody, WaitForSpansLayer};
use crate::service::{Identity, Layer, ServiceBuilder, Stack};
use crate::{BodyWriter, Builder, ResponseBody};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use bytes::Bytes;
use conjure_error::Error;
use conjure_http::client::{AsyncClient, AsyncRequestBody, AsyncService};
use conjure_runtime_config::ServiceConfig;
use http::{Request, Response};
use refreshable::Subscription;
use std::error;
use std::sync::Arc;
macro_rules! layers {
() => { Identity };
($layer:ty, $($rem:tt)*) => { Stack<$layer, layers!($($rem)*)> };
}
type BaseLayer = layers!(
ResponseBodyLayer,
MetricsLayer,
RootSpanLayer,
RetryLayer,
HttpErrorLayer,
WaitForSpansLayer,
NodeSelectorLayer,
NodeUriLayer,
NodeMetricsLayer,
ProxyLayer,
TracePropagationLayer,
UserAgentLayer,
GzipLayer,
MapErrorLayer,
);
type BaseService<T> = <BaseLayer as Layer<T>>::Service;
pub(crate) type BaseBody<B> = WaitForSpansBody<DecodedBody<B>>;
pub(crate) struct ClientState<T> {
service: BaseService<T>,
}
impl<T> ClientState<T> {
pub(crate) fn new<U>(builder: &Builder<U>) -> Result<ClientState<T>, Error>
where
U: BuildRawClient<RawClient = T>,
{
let service = builder.get_service().expect("service not set");
let client = builder.get_raw_client_builder().build_raw_client(builder)?;
let proxy = ProxyConfig::from_config(builder.get_proxy())?;
let service = ServiceBuilder::new()
.layer(ResponseBodyLayer)
.layer(MetricsLayer::new(service, builder))
.layer(RootSpanLayer)
.layer(RetryLayer::new(builder))
.layer(HttpErrorLayer::new(builder))
.layer(WaitForSpansLayer)
.layer(NodeSelectorLayer::new(service, builder)?)
.layer(NodeUriLayer)
.layer(NodeMetricsLayer)
.layer(ProxyLayer::new(&proxy))
.layer(TracePropagationLayer)
.layer(UserAgentLayer::new(builder))
.layer(GzipLayer)
.layer(MapErrorLayer)
.service(client);
Ok(ClientState { service })
}
}
pub struct Client<T = DefaultRawClient> {
state: Arc<ArcSwap<ClientState<T>>>,
subscription: Option<Arc<Subscription<ServiceConfig, Error>>>,
}
impl<T> Clone for Client<T> {
fn clone(&self) -> Self {
Client {
state: self.state.clone(),
subscription: self.subscription.clone(),
}
}
}
impl Client {
pub fn builder() -> Builder {
Builder::new()
}
}
impl<T> Client<T> {
pub(crate) fn new(
state: Arc<ArcSwap<ClientState<T>>>,
subscription: Option<Subscription<ServiceConfig, Error>>,
) -> Client<T> {
Client {
state,
subscription: subscription.map(Arc::new),
}
}
}
impl<T> AsyncService<Client<T>> for Client<T> {
fn new(client: Client<T>) -> Self {
client
}
}
#[async_trait]
impl<T, B> AsyncClient for Client<T>
where
T: Service<http::Request<RawBody>, Response = http::Response<B>> + 'static + Sync + Send,
T::Error: Into<Box<dyn error::Error + Sync + Send>>,
T::Future: Send,
B: http_body::Body<Data = Bytes> + 'static + Send,
B::Error: Into<Box<dyn error::Error + Sync + Send>>,
{
type BodyWriter = BodyWriter;
type ResponseBody = ResponseBody<B>;
async fn send(
&self,
request: Request<AsyncRequestBody<'_, Self::BodyWriter>>,
) -> Result<Response<Self::ResponseBody>, Error> {
let future = self.state.load().service.call(request);
future.await
}
}