1use std::collections::VecDeque;
2use std::{cell::Cell, cell::RefCell, fmt, marker::PhantomData, rc::Rc, time::Duration};
3
4use nanorand::{Rng, WyRand};
5use ntex_bytes::ByteString;
6use ntex_http::{HeaderMap, Method, uri::Scheme};
7use ntex_io::IoBoxed;
8use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
9use ntex_service::cfg::{Cfg, SharedCfg};
10use ntex_service::{IntoServiceFactory, Pipeline, ServiceFactory};
11use ntex_util::time::{Millis, Seconds, timeout_checked};
12use ntex_util::{channel::oneshot, channel::pool, future::BoxFuture};
13
14use super::stream::{InflightStorage, RecvStream, SendStream};
15use super::{ClientError, simple::SimpleClient};
16use crate::ServiceConfig;
17
18type Fut = BoxFuture<'static, Result<IoBoxed, ConnectError>>;
19type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, ConnectError>>>;
20
21#[derive(Clone)]
22pub struct Client {
24 inner: Rc<Inner>,
25 waiters: Rc<RefCell<VecDeque<pool::Sender<()>>>>,
26}
27
28struct Inner {
29 cfg: Cfg<ServiceConfig>,
30 config: InnerConfig,
31 connector: Connector,
32}
33
34fn notify(waiters: &mut VecDeque<pool::Sender<()>>) {
36 log::debug!("Notify waiter, total {:?}", waiters.len());
37 while let Some(waiter) = waiters.pop_front() {
38 if waiter.send(()).is_ok() {
39 break;
40 }
41 }
42}
43
44impl Client {
45 #[inline]
46 pub fn builder<A, U, T, F>(addr: U, connector: F) -> ClientBuilder<A, T>
48 where
49 A: Address + Clone,
50 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
51 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
52 IoBoxed: From<T::Response>,
53 Connect<A>: From<U>,
54 {
55 ClientBuilder::new(addr, connector)
56 }
57
58 pub async fn send(
60 &self,
61 method: Method,
62 path: ByteString,
63 headers: HeaderMap,
64 eof: bool,
65 ) -> Result<(SendStream, RecvStream), ClientError> {
66 self.client()
67 .await?
68 .send(method, path, headers, eof)
69 .await
70 .map_err(From::from)
71 }
72
73 pub async fn client(&self) -> Result<SimpleClient, ClientError> {
75 loop {
76 let (client, num) = self.get_client();
77
78 if let Some(client) = client {
79 return Ok(client);
80 } else {
81 self.connect(num).await?;
82 }
83 }
84 }
85
86 async fn connect(&self, num: usize) -> Result<(), ClientError> {
87 let cfg = &self.inner.config;
88
89 if !cfg.connecting.get() && (num < cfg.maxconn || (cfg.minconn > 0 && num < cfg.minconn)) {
91 cfg.connecting.set(true);
93
94 self.create_connection().await?;
95 } else {
96 log::debug!(
97 "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
98 cfg.connecting.get(),
99 num,
100 cfg.maxconn
101 );
102
103 let (tx, rx) = cfg.pool.channel();
105 self.waiters.borrow_mut().push_back(tx);
106 rx.await?;
107 }
108 Ok(())
109 }
110
111 fn get_client(&self) -> (Option<SimpleClient>, usize) {
112 let cfg = &self.inner.config;
113 let mut connections = cfg.connections.borrow_mut();
114
115 let mut idx = 0;
117 while idx < connections.len() {
118 if connections[idx].is_closed() {
119 connections.remove(idx);
120 } else if connections[idx].is_disconnecting() {
121 let con = connections.remove(idx);
122 let timeout = cfg.disconnect_timeout;
123 let _ = ntex_util::spawn(async move {
124 let _ = con.disconnect().disconnect_timeout(timeout).await;
125 });
126 } else {
127 idx += 1;
128 }
129 }
130 let num = connections.len();
131 if cfg.minconn > 0 && num < cfg.minconn {
132 (None, num)
134 } else {
135 let client = connections.iter().find(|item| {
137 let cap = item.max_streams().unwrap_or(cfg.max_streams) >> 1;
138 item.active_streams() <= cap
139 });
140 if let Some(client) = client {
141 (Some(client.clone()), num)
142 } else {
143 let available = connections.iter().filter(|item| item.is_ready()).count();
145 let client = if available > 0 {
146 let idx = WyRand::new().generate_range(0_usize..available);
147 connections
148 .iter()
149 .filter(|item| item.is_ready())
150 .nth(idx)
151 .cloned()
152 } else {
153 None
154 };
155
156 (client, num)
157 }
158 }
159 }
160
161 async fn create_connection(&self) -> Result<(), ClientError> {
162 let (tx, rx) = oneshot::channel();
163
164 let inner = self.inner.clone();
165 let waiters = self.waiters.clone();
166
167 let _ = ntex_util::spawn(async move {
168 let res = match timeout_checked(inner.config.conn_timeout, (*inner.connector)()).await {
169 Ok(Ok(io)) => {
170 let waiters2 = waiters.clone();
172 let storage = InflightStorage::new(move |_| {
173 notify(&mut waiters2.borrow_mut());
174 });
175 let client = SimpleClient::with_params(
177 io,
178 inner.cfg,
179 inner.config.scheme.clone(),
180 inner.config.authority.clone(),
181 inner.config.skip_unknown_streams,
182 storage,
183 inner.config.pool.clone(),
184 );
185 inner.config.connections.borrow_mut().push(client);
186 inner
187 .config
188 .total_connections
189 .set(inner.config.total_connections.get() + 1);
190 Ok(())
191 }
192 Ok(Err(err)) => Err(ClientError::from(err)),
193 Err(_) => Err(ClientError::HandshakeTimeout),
194 };
195 inner.config.connecting.set(false);
196 for waiter in waiters.borrow_mut().drain(..) {
197 let _ = waiter.send(());
198 }
199
200 if res.is_err() {
201 inner
202 .config
203 .connect_errors
204 .set(inner.config.connect_errors.get() + 1);
205 }
206 let _ = tx.send(res);
207 });
208
209 rx.await?
210 }
211
212 #[inline]
213 pub fn is_ready(&self) -> bool {
217 let connections = self.inner.config.connections.borrow();
218 for client in &*connections {
219 if client.is_ready() {
220 return true;
221 }
222 }
223
224 !self.inner.config.connecting.get() && connections.len() < self.inner.config.maxconn
225 }
226
227 #[inline]
228 pub async fn ready(&self) {
232 loop {
233 if !self.is_ready() {
234 let (tx, rx) = self.inner.config.pool.channel();
236 self.waiters.borrow_mut().push_back(tx);
237 let _ = rx.await;
238 'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
239 if tx.send(()).is_ok() {
240 break 'inner;
241 }
242 }
243 } else {
244 break;
245 }
246 }
247 }
248}
249
250#[doc(hidden)]
251impl Client {
252 pub fn stat_active_connections(&self) -> usize {
253 self.inner.config.connections.borrow().len()
254 }
255
256 pub fn stat_total_connections(&self) -> usize {
257 self.inner.config.total_connections.get()
258 }
259
260 pub fn stat_connect_errors(&self) -> usize {
261 self.inner.config.connect_errors.get()
262 }
263
264 pub fn stat_connections<F, R>(&self, f: F) -> R
265 where
266 F: FnOnce(&[SimpleClient]) -> R,
267 {
268 f(&self.inner.config.connections.borrow())
269 }
270}
271
272pub struct ClientBuilder<A, T> {
277 connect: Connect<A>,
278 inner: InnerConfig,
279 connector: T,
280 _t: PhantomData<A>,
281}
282
283struct InnerConfig {
284 minconn: usize,
285 maxconn: usize,
286 conn_timeout: Millis,
287 conn_lifetime: Duration,
288 disconnect_timeout: Millis,
289 max_streams: u32,
290 skip_unknown_streams: bool,
291 scheme: Scheme,
292 authority: ByteString,
293 connecting: Cell<bool>,
294 connections: RefCell<Vec<SimpleClient>>,
295 total_connections: Cell<usize>,
296 connect_errors: Cell<usize>,
297 pool: pool::Pool<()>,
298}
299
300impl<A, T> ClientBuilder<A, T>
301where
302 A: Address + Clone,
303 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
304 IoBoxed: From<T::Response>,
305{
306 fn new<U, F>(addr: U, connector: F) -> Self
307 where
308 Connect<A>: From<U>,
309 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
310 {
311 let connect = Connect::from(addr);
312 let authority = ByteString::from(connect.host());
313 let connector = connector.into_factory();
314
315 ClientBuilder {
316 connect,
317 connector,
318 inner: InnerConfig {
319 authority,
320 conn_timeout: Millis(1_000),
321 conn_lifetime: Duration::from_secs(0),
322 disconnect_timeout: Millis(15_000),
323 max_streams: 100,
324 skip_unknown_streams: false,
325 minconn: 1,
326 maxconn: 16,
327 scheme: Scheme::HTTP,
328 connecting: Cell::new(false),
329 connections: Default::default(),
330 total_connections: Cell::new(0),
331 connect_errors: Cell::new(0),
332 pool: pool::new(),
333 },
334 _t: PhantomData,
335 }
336 }
337}
338
339impl<A> ClientBuilder<A, DefaultConnector<A>>
340where
341 A: Address + Clone,
342{
343 pub fn with_default<U>(addr: U) -> Self
344 where
345 Connect<A>: From<U>,
346 {
347 Self::new(addr, DefaultConnector::default())
348 }
349}
350
351impl<A, T> ClientBuilder<A, T>
352where
353 A: Address + Clone,
354{
355 #[inline]
356 pub fn scheme(mut self, scheme: Scheme) -> Self {
358 self.inner.scheme = scheme;
359 self
360 }
361
362 pub fn max_streams(mut self, limit: u32) -> Self {
368 self.inner.max_streams = limit;
369 self
370 }
371
372 pub fn skip_unknown_streams(mut self) -> Self {
376 self.inner.skip_unknown_streams = true;
377 self
378 }
379
380 pub fn lifetime(mut self, dur: Seconds) -> Self {
387 self.inner.conn_lifetime = dur.into();
388 self
389 }
390
391 pub fn minconn(mut self, num: usize) -> Self {
395 self.inner.minconn = num;
396 self
397 }
398
399 pub fn maxconn(mut self, num: usize) -> Self {
403 self.inner.maxconn = num;
404 self
405 }
406
407 pub fn connector<U, F>(self, connector: F) -> ClientBuilder<A, U>
409 where
410 F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
411 U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
412 IoBoxed: From<U::Response>,
413 {
414 ClientBuilder {
415 connect: self.connect,
416 connector: connector.into_factory(),
417 inner: self.inner,
418 _t: PhantomData,
419 }
420 }
421}
422
423impl<A, T> ClientBuilder<A, T>
424where
425 A: Address + Clone,
426 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
427 IoBoxed: From<T::Response>,
428{
429 pub async fn build(self, cfg: SharedCfg) -> Result<Client, T::InitError> {
431 let connect = self.connect;
432 let svc = Pipeline::new(self.connector.create(cfg).await?);
433
434 let connector = Box::new(move || {
435 log::trace!(
436 "{}: Opening http/2 connection to {}",
437 cfg.tag(),
438 connect.host()
439 );
440 let fut = svc.call_static(connect.clone());
441 Box::pin(async move { fut.await.map(IoBoxed::from) }) as Fut
442 });
443
444 Ok(Client {
445 inner: Rc::new(Inner {
446 connector,
447 cfg: cfg.get(),
448 config: self.inner,
449 }),
450 waiters: Default::default(),
451 })
452 }
453}
454
455impl fmt::Debug for Client {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 f.debug_struct("Client")
458 .field("scheme", &self.inner.config.scheme)
459 .field("authority", &self.inner.config.authority)
460 .field("conn_timeout", &self.inner.config.conn_timeout)
461 .field("conn_lifetime", &self.inner.config.conn_lifetime)
462 .field("disconnect_timeout", &self.inner.config.disconnect_timeout)
463 .field("minconn", &self.inner.config.minconn)
464 .field("maxconn", &self.inner.config.maxconn)
465 .field("max-streams", &self.inner.config.max_streams)
466 .finish()
467 }
468}
469
470impl<A, T> fmt::Debug for ClientBuilder<A, T> {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 f.debug_struct("ClientBuilder")
473 .field("scheme", &self.inner.scheme)
474 .field("authority", &self.inner.authority)
475 .field("conn_timeout", &self.inner.conn_timeout)
476 .field("conn_lifetime", &self.inner.conn_lifetime)
477 .field("disconnect_timeout", &self.inner.disconnect_timeout)
478 .field("minconn", &self.inner.minconn)
479 .field("maxconn", &self.inner.maxconn)
480 .field("max-streams", &self.inner.max_streams)
481 .finish()
482 }
483}