use std::{cell::Cell, cell::RefCell, collections::VecDeque, fmt, rc::Rc, time::Duration};
use nanorand::{Rng, WyRand};
use ntex_bytes::{ByteString, PoolId, PoolRef};
use ntex_connect::{self as connect, Address, Connect, Connector as DefaultConnector};
use ntex_http::{uri::Scheme, HeaderMap, Method};
use ntex_io::IoBoxed;
use ntex_service::{IntoService, Pipeline, Service};
use ntex_util::time::{timeout_checked, Millis, Seconds};
use ntex_util::{channel::oneshot, future::BoxFuture};
use super::stream::{InflightStorage, RecvStream, SendStream};
use super::{simple::SimpleClient, ClientError};
type Fut = BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>;
type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>>;
#[derive(Clone)]
pub struct Client {
inner: Rc<Inner>,
waiters: Rc<RefCell<VecDeque<oneshot::Sender<()>>>>,
}
fn notify(waiters: &mut VecDeque<oneshot::Sender<()>>) {
log::debug!("Notify waiter, total {:?}", waiters.len());
while let Some(waiter) = waiters.pop_front() {
if waiter.send(()).is_ok() {
break;
}
}
}
impl Client {
#[inline]
pub fn build<A, U, T, F>(addr: U, connector: F) -> ClientBuilder
where
A: Address + Clone,
F: IntoService<T, Connect<A>>,
T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
IoBoxed: From<T::Response>,
Connect<A>: From<U>,
{
ClientBuilder::new(addr, connector)
}
#[inline]
pub fn with_default<A, U>(addr: U) -> ClientBuilder
where
A: Address + Clone,
Connect<A>: From<U>,
{
ClientBuilder::with_default(addr)
}
#[inline]
pub async fn send(
&self,
method: Method,
path: ByteString,
headers: HeaderMap,
eof: bool,
) -> Result<(SendStream, RecvStream), ClientError> {
loop {
let (client, num) = {
let mut connections = self.inner.connections.borrow_mut();
let mut idx = 0;
while idx < connections.len() {
if connections[idx].is_closed() {
connections.remove(idx);
} else {
idx += 1;
}
}
let available = connections.iter().filter(|item| item.is_ready()).count();
let client = if available > 0 {
let idx = WyRand::new().generate_range(0_usize..available);
connections
.iter()
.filter(|item| item.is_ready())
.nth(idx)
.cloned()
} else {
None
};
(client, connections.len())
};
if let Some(client) = client {
return client
.send(method, path, headers, eof)
.await
.map_err(From::from);
}
if !self.inner.connecting.get() && num < self.inner.maxconn {
self.inner.connecting.set(true);
let (tx, rx) = oneshot::channel();
let inner = self.inner.clone();
let waiters = self.waiters.clone();
ntex_rt::spawn(async move {
let res = match timeout_checked(inner.conn_timeout, (*inner.connector)()).await
{
Ok(Ok(io)) => {
let waiters2 = waiters.clone();
let storage = InflightStorage::new(move |_| {
notify(&mut waiters2.borrow_mut());
});
io.set_memory_pool(inner.pool);
let client = SimpleClient::with_params(
io,
inner.config.clone(),
inner.scheme.clone(),
inner.authority.clone(),
storage,
);
inner.connections.borrow_mut().push(client.clone());
Ok(client)
}
Ok(Err(err)) => Err(ClientError::from(err)),
Err(_) => Err(ClientError::HandshakeTimeout),
};
inner.connecting.set(false);
for waiter in waiters.borrow_mut().drain(..) {
let _ = waiter.send(());
}
let _ = tx.send(res);
});
return rx
.await??
.send(method, path, headers, eof)
.await
.map_err(From::from);
} else {
log::debug!(
"New connection is being established {:?} or number of existing cons {} greater than allowed {}",
self.inner.connecting.get(), num, self.inner.maxconn);
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await;
}
}
}
#[inline]
pub fn is_ready(&self) -> bool {
let connections = self.inner.connections.borrow();
for client in &*connections {
if client.is_ready() {
return true;
}
}
!self.inner.connecting.get() && connections.len() < self.inner.maxconn
}
#[inline]
pub async fn ready(&self) {
loop {
if !self.is_ready() {
let (tx, rx) = oneshot::channel();
self.waiters.borrow_mut().push_back(tx);
let _ = rx.await;
'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
if tx.send(()).is_ok() {
break 'inner;
}
}
} else {
break;
}
}
}
}
pub struct ClientBuilder(Inner);
struct Inner {
maxconn: usize,
conn_timeout: Millis,
conn_lifetime: Duration,
disconnect_timeout: Millis,
limit: usize,
scheme: Scheme,
config: crate::Config,
authority: ByteString,
connector: Connector,
pool: PoolRef,
connecting: Cell<bool>,
connections: RefCell<Vec<SimpleClient>>,
}
impl ClientBuilder {
fn new<A, U, T, F>(addr: U, connector: F) -> Self
where
A: Address + Clone,
F: IntoService<T, Connect<A>>,
T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
IoBoxed: From<T::Response>,
Connect<A>: From<U>,
{
let connect = Connect::from(addr);
let authority = ByteString::from(connect.host());
let connector = Pipeline::new(connector.into_service());
let connector = Box::new(move || {
log::trace!("Opening http/2 connection to {}", connect.host());
let connect = connect.clone();
let svc = connector.clone();
let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
f
});
ClientBuilder(Inner {
authority,
connector,
conn_timeout: Millis(1_000),
conn_lifetime: Duration::from_secs(0),
disconnect_timeout: Millis(3_000),
limit: 100,
maxconn: 16,
scheme: Scheme::HTTP,
config: crate::Config::client(),
connecting: Cell::new(false),
connections: Default::default(),
pool: PoolId::P5.pool_ref(),
})
}
pub fn with_default<A, U>(addr: U) -> Self
where
A: Address + Clone,
Connect<A>: From<U>,
{
Self::new(addr, DefaultConnector::default())
}
}
impl ClientBuilder {
#[inline]
pub fn scheme(mut self, scheme: Scheme) -> Self {
self.0.scheme = scheme;
self
}
pub fn memory_pool(mut self, id: PoolId) -> Self {
self.0.pool = id.pool_ref();
self
}
pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.0.conn_timeout = timeout.into();
self
}
pub fn limit(mut self, limit: usize) -> Self {
self.0.limit = limit;
self
}
pub fn lifetime(mut self, dur: Seconds) -> Self {
self.0.conn_lifetime = dur.into();
self
}
pub fn maxconn(mut self, num: usize) -> Self {
self.0.maxconn = num;
self
}
pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
self.0.disconnect_timeout = timeout.into();
self
}
pub fn configure<O, R>(self, f: O) -> Self
where
O: FnOnce(&crate::Config) -> R,
{
let _ = f(&self.0.config);
self
}
pub fn config(&self) -> &crate::Config {
&self.0.config
}
pub fn connector<A, U, T, F>(mut self, addr: U, connector: F) -> Self
where
A: Address + Clone,
F: IntoService<T, Connect<A>>,
T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
IoBoxed: From<T::Response>,
Connect<A>: From<U>,
{
let connect = Connect::from(addr);
let authority = ByteString::from(connect.host());
let connector = Pipeline::new(connector.into_service());
let connector = Box::new(move || {
let connect = connect.clone();
let svc = connector.clone();
let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
f
});
self.0.authority = authority;
self.0.connector = connector;
self
}
pub fn finish(self) -> Client {
Client {
inner: Rc::new(self.0),
waiters: Default::default(),
}
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("scheme", &self.inner.scheme)
.field("authority", &self.inner.authority)
.field("conn_timeout", &self.inner.conn_timeout)
.field("conn_lifetime", &self.inner.conn_lifetime)
.field("disconnect_timeout", &self.inner.disconnect_timeout)
.field("maxconn", &self.inner.maxconn)
.field("limit", &self.inner.limit)
.field("pool", &self.inner.pool)
.field("config", &self.inner.config)
.finish()
}
}
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientBuilder")
.field("scheme", &self.0.scheme)
.field("authority", &self.0.authority)
.field("conn_timeout", &self.0.conn_timeout)
.field("conn_lifetime", &self.0.conn_lifetime)
.field("disconnect_timeout", &self.0.disconnect_timeout)
.field("maxconn", &self.0.maxconn)
.field("limit", &self.0.limit)
.field("pool", &self.0.pool)
.field("config", &self.0.config)
.finish()
}
}