use std::{collections::VecDeque, io, marker::PhantomData, net::SocketAddr};
use ntex_error::Error;
use ntex_io::{Io, IoConfig, types};
use ntex_service::cfg::{Cfg, SharedCfg};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::{future::Either, time::timeout_checked};
use super::{Address, Connect, ConnectError, ConnectServiceError, resolve};
#[derive(Copy, Clone, Debug)]
pub struct Connector<T>(PhantomData<T>);
#[derive(Clone, Debug)]
pub struct ConnectorService<T> {
cfg: Cfg<IoConfig>,
shared: SharedCfg,
_t: PhantomData<T>,
}
impl<T> Connector<T> {
pub fn new() -> Self {
Connector(PhantomData)
}
}
impl<T> Default for Connector<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> ConnectorService<T> {
#[inline]
pub fn new() -> Self {
ConnectorService::with(SharedCfg::default())
}
#[inline]
pub fn with(cfg: SharedCfg) -> Self {
ConnectorService {
cfg: cfg.get(),
shared: cfg,
_t: PhantomData,
}
}
}
impl<T> Default for ConnectorService<T> {
fn default() -> Self {
ConnectorService::new()
}
}
impl<T: Address> ConnectorService<T> {
pub async fn connect<U>(&self, message: U) -> Result<Io, ConnectError>
where
Connect<T>: From<U>,
{
timeout_checked(self.cfg.connect_timeout(), async {
let msg = resolve::lookup(message.into(), self.shared.tag())
.await
.map_err(Error::into_error)?;
let port = msg.port();
let Connect { req, addr, .. } = msg;
if let Some(addr) = addr {
connect(req, port, addr, self.shared.clone())
.await
.map_err(Error::into_error)
} else if let Some(addr) = req.addr() {
connect(req, addr.port(), Either::Left(addr), self.shared.clone())
.await
.map_err(Error::into_error)
} else {
log::error!("{}: TCP connector: got unresolved address", self.cfg.tag());
Err(ConnectError::Unresolved)
}
})
.await
.map_err(|()| {
ConnectError::Io(io::Error::new(io::ErrorKind::TimedOut, "Connect timeout"))
})
.and_then(|item| item)
}
}
impl<T: Address> ServiceFactory<Connect<T>, SharedCfg> for Connector<T> {
type Response = Io;
type Error = ConnectError;
type Service = ConnectorService<T>;
type InitError = ConnectServiceError;
async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
Ok(ConnectorService::with(cfg))
}
}
impl<T: Address> Service<Connect<T>> for ConnectorService<T> {
type Response = Io;
type Error = ConnectError;
async fn call(
&self,
req: Connect<T>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.connect(req).await
}
}
#[derive(Copy, Clone, Debug)]
pub struct Connector2<T>(PhantomData<T>);
#[derive(Clone, Debug)]
pub struct ConnectorService2<T> {
cfg: Cfg<IoConfig>,
shared: SharedCfg,
_t: PhantomData<T>,
}
impl<T> Connector2<T> {
pub fn new() -> Self {
Connector2(PhantomData)
}
}
impl<T> Default for Connector2<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> ConnectorService2<T> {
#[inline]
pub fn new() -> Self {
ConnectorService2::with(SharedCfg::default())
}
#[inline]
pub fn with(cfg: SharedCfg) -> Self {
ConnectorService2 {
cfg: cfg.get(),
shared: cfg,
_t: PhantomData,
}
}
}
impl<T> Default for ConnectorService2<T> {
fn default() -> Self {
ConnectorService2::new()
}
}
impl<T: Address> ConnectorService2<T> {
pub async fn connect<U>(&self, message: U) -> Result<Io, Error<ConnectError>>
where
Connect<T>: From<U>,
{
timeout_checked(self.cfg.connect_timeout(), async {
let msg = resolve::lookup(message.into(), self.shared.tag()).await?;
let port = msg.port();
let Connect { req, addr, .. } = msg;
if let Some(addr) = addr {
connect(req, port, addr, self.shared.clone()).await
} else if let Some(addr) = req.addr() {
connect(req, addr.port(), Either::Left(addr), self.shared.clone()).await
} else {
Err(Error::from(ConnectError::Unresolved))
}
})
.await
.map_err(|()| {
Error::from(ConnectError::Io(io::Error::new(
io::ErrorKind::TimedOut,
"Connect timeout",
)))
})
.and_then(|item| item)
.map_err(|e| e.set_service(self.shared.service()))
}
}
impl<T: Address> ServiceFactory<Connect<T>, SharedCfg> for Connector2<T> {
type Response = Io;
type Error = Error<ConnectError>;
type Service = ConnectorService2<T>;
type InitError = ConnectServiceError;
async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
Ok(ConnectorService2::with(cfg))
}
}
impl<T: Address> Service<Connect<T>> for ConnectorService2<T> {
type Response = Io;
type Error = Error<ConnectError>;
async fn call(
&self,
req: Connect<T>,
_: ServiceCtx<'_, Self>,
) -> Result<Self::Response, Self::Error> {
self.connect(req).await
}
}
async fn connect<T: Address>(
req: T,
port: u16,
addr: Either<SocketAddr, VecDeque<SocketAddr>>,
cfg: SharedCfg,
) -> Result<Io, Error<ConnectError>> {
log::trace!(
"{}: TCP connector - connecting to {:?} addr:{addr:?} port:{port}",
cfg.tag(),
req.host(),
);
let io = match addr {
Either::Left(addr) => crate::tcp_connect(addr, cfg.clone())
.await
.map_err(ConnectError::from)?,
Either::Right(mut addrs) => loop {
let addr = addrs.pop_front().unwrap();
match crate::tcp_connect(addr, cfg.clone()).await {
Ok(io) => break io,
Err(err) => {
log::trace!(
"{}: TCP connector - failed to connect to {:?} port: {port} err: {err:?}",
cfg.tag(),
req.host(),
);
if addrs.is_empty() {
return Err(ConnectError::from(err).into());
}
}
}
},
};
log::trace!(
"{}: TCP connector - successfully connected to {:?} - {:?}",
cfg.tag(),
req.host(),
io.query::<types::PeerAddr>().get()
);
Ok(io)
}