use std::marker::PhantomData;
use ntex_bytes::ByteString;
use ntex_error::Error;
use ntex_http::uri::Scheme;
use ntex_io::IoBoxed;
use ntex_net::connect::{Address, Connect, ConnectError, Connector2 as DefaultConnector};
use ntex_service::cfg::{Cfg, SharedCfg};
use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
use ntex_util::{channel::pool, time::timeout_checked};
use crate::client::{SimpleClient, stream::InflightStorage};
use crate::{client::ClientError, config::ServiceConfig};
#[derive(Debug)]
pub struct Connector<A: Address, T> {
svc: T,
scheme: Scheme,
pool: pool::Pool<()>,
_t: PhantomData<A>,
}
impl<A, T> Connector<A, T>
where
A: Address,
T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
IoBoxed: From<T::Response>,
{
pub fn new<F>(svc: F) -> Connector<A, T>
where
F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
{
Connector {
svc: svc.into_factory(),
scheme: Scheme::HTTP,
pool: pool::new(),
_t: PhantomData,
}
}
}
impl<A> Default for Connector<A, DefaultConnector<A>>
where
A: Address,
{
fn default() -> Self {
Self::new(DefaultConnector::default())
}
}
impl<A, T> Connector<A, T>
where
A: Address,
{
#[inline]
pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
self.scheme = scheme;
self
}
pub fn connector<U, F>(&self, svc: F) -> Connector<A, U>
where
F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
U: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
IoBoxed: From<U::Response>,
{
Connector {
svc: svc.into_factory(),
scheme: self.scheme.clone(),
pool: self.pool.clone(),
_t: PhantomData,
}
}
}
impl<A, T> ServiceFactory<A, SharedCfg> for Connector<A, T>
where
A: Address,
T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
IoBoxed: From<T::Response>,
{
type Response = SimpleClient;
type Error = Error<ClientError>;
type InitError = T::InitError;
type Service = ConnectorService<A, T::Service>;
async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
let config = cfg.get();
let svc = self.svc.create(cfg).await?;
Ok(ConnectorService {
svc,
config,
scheme: self.scheme.clone(),
pool: self.pool.clone(),
_t: PhantomData,
})
}
}
#[derive(Debug)]
pub struct ConnectorService<A, T> {
svc: T,
scheme: Scheme,
config: Cfg<ServiceConfig>,
pool: pool::Pool<()>,
_t: PhantomData<A>,
}
impl<A, T> Service<A> for ConnectorService<A, T>
where
A: Address,
T: Service<Connect<A>, Error = Error<ConnectError>>,
IoBoxed: From<T::Response>,
{
type Response = SimpleClient;
type Error = Error<ClientError>;
async fn call(&self, req: A, ctx: ServiceCtx<'_, Self>) -> Result<SimpleClient, Self::Error> {
let authority = ByteString::from(req.host());
let fut = async {
let io = ctx
.call(&self.svc, Connect::new(req))
.await
.map_err(|e| e.map(ClientError::from))?;
Ok::<_, Error<ClientError>>(SimpleClient::with_params(
io.into(),
self.config.clone(),
&self.scheme,
authority,
false,
InflightStorage::default(),
self.pool.clone(),
))
};
timeout_checked(self.config.handshake_timeout, fut)
.await
.map_err(|()| {
Error::from(ClientError::HandshakeTimeout).set_service(self.config.service())
})
.and_then(|item| item)
}
ntex_service::forward_ready!(svc, |e| e.map(ClientError::from));
ntex_service::forward_poll!(svc, |e| e.map(ClientError::from));
ntex_service::forward_shutdown!(svc);
}