1use std::collections::VecDeque;
2use std::{cell::Cell, cell::RefCell, fmt, marker::PhantomData, rc::Rc, time::Duration};
3
4use nanorand::{Rng, WyRand};
5use ntex_bytes::ByteString;
6use ntex_http::{HeaderMap, Method, uri::Scheme};
7use ntex_io::IoBoxed;
8use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
9use ntex_service::cfg::{Cfg, SharedCfg};
10use ntex_service::{IntoServiceFactory, Pipeline, ServiceFactory};
11use ntex_util::time::{Millis, Seconds, timeout_checked};
12use ntex_util::{channel::oneshot, channel::pool, future::BoxFuture};
13
14use super::stream::{InflightStorage, RecvStream, SendStream};
15use super::{ClientError, simple::SimpleClient};
16use crate::ServiceConfig;
17
18type Fut = BoxFuture<'static, Result<IoBoxed, ConnectError>>;
19type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, ConnectError>>>;
20
21#[derive(Clone)]
22pub struct Client {
24 inner: Rc<Inner>,
25 waiters: Rc<RefCell<VecDeque<pool::Sender<()>>>>,
26}
27
28struct Inner {
29 cfg: Cfg<ServiceConfig>,
30 config: InnerConfig,
31 connector: Connector,
32}
33
34fn notify(waiters: &mut VecDeque<pool::Sender<()>>) {
36 log::debug!("Notify waiter, total {:?}", waiters.len());
37 while let Some(waiter) = waiters.pop_front() {
38 if waiter.send(()).is_ok() {
39 break;
40 }
41 }
42}
43
44impl Client {
45 #[inline]
46 pub fn build<A, U, T, F>(addr: U, connector: F) -> ClientBuilder<A, T>
48 where
49 A: Address + Clone,
50 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
51 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
52 IoBoxed: From<T::Response>,
53 Connect<A>: From<U>,
54 {
55 ClientBuilder::new(addr, connector)
56 }
57
58 #[inline]
59 pub fn with_default<A, U>(addr: U) -> ClientBuilder<A, DefaultConnector<A>>
61 where
62 A: Address + Clone,
63 Connect<A>: From<U>,
64 {
65 ClientBuilder::with_default(addr)
66 }
67
68 pub async fn send(
70 &self,
71 method: Method,
72 path: ByteString,
73 headers: HeaderMap,
74 eof: bool,
75 ) -> Result<(SendStream, RecvStream), ClientError> {
76 self.client()
77 .await?
78 .send(method, path, headers, eof)
79 .await
80 .map_err(From::from)
81 }
82
83 pub async fn client(&self) -> Result<SimpleClient, ClientError> {
85 loop {
86 let (client, num) = self.get_client();
87
88 if let Some(client) = client {
89 return Ok(client);
90 } else {
91 self.connect(num).await?;
92 }
93 }
94 }
95
96 async fn connect(&self, num: usize) -> Result<(), ClientError> {
97 let cfg = &self.inner.config;
98
99 if !cfg.connecting.get() && (num < cfg.maxconn || (cfg.minconn > 0 && num < cfg.minconn)) {
101 cfg.connecting.set(true);
103
104 self.create_connection().await?;
105 } else {
106 log::debug!(
107 "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
108 cfg.connecting.get(),
109 num,
110 cfg.maxconn
111 );
112
113 let (tx, rx) = cfg.pool.channel();
115 self.waiters.borrow_mut().push_back(tx);
116 rx.await?;
117 }
118 Ok(())
119 }
120
121 fn get_client(&self) -> (Option<SimpleClient>, usize) {
122 let cfg = &self.inner.config;
123 let mut connections = cfg.connections.borrow_mut();
124
125 let mut idx = 0;
127 while idx < connections.len() {
128 if connections[idx].is_closed() {
129 connections.remove(idx);
130 } else if connections[idx].is_disconnecting() {
131 let con = connections.remove(idx);
132 let timeout = cfg.disconnect_timeout;
133 let _ = ntex_util::spawn(async move {
134 let _ = con.disconnect().disconnect_timeout(timeout).await;
135 });
136 } else {
137 idx += 1;
138 }
139 }
140 let num = connections.len();
141 if cfg.minconn > 0 && num < cfg.minconn {
142 (None, num)
144 } else {
145 let client = connections.iter().find(|item| {
147 let cap = item.max_streams().unwrap_or(cfg.max_streams) >> 1;
148 item.active_streams() <= cap
149 });
150 if let Some(client) = client {
151 (Some(client.clone()), num)
152 } else {
153 let available = connections.iter().filter(|item| item.is_ready()).count();
155 let client = if available > 0 {
156 let idx = WyRand::new().generate_range(0_usize..available);
157 connections
158 .iter()
159 .filter(|item| item.is_ready())
160 .nth(idx)
161 .cloned()
162 } else {
163 None
164 };
165
166 (client, num)
167 }
168 }
169 }
170
171 async fn create_connection(&self) -> Result<(), ClientError> {
172 let (tx, rx) = oneshot::channel();
173
174 let inner = self.inner.clone();
175 let waiters = self.waiters.clone();
176
177 let _ = ntex_util::spawn(async move {
178 let res = match timeout_checked(inner.config.conn_timeout, (*inner.connector)()).await {
179 Ok(Ok(io)) => {
180 let waiters2 = waiters.clone();
182 let storage = InflightStorage::new(move |_| {
183 notify(&mut waiters2.borrow_mut());
184 });
185 let client = SimpleClient::with_params(
187 io,
188 inner.cfg,
189 inner.config.scheme.clone(),
190 inner.config.authority.clone(),
191 inner.config.skip_unknown_streams,
192 storage,
193 inner.config.pool.clone(),
194 );
195 inner.config.connections.borrow_mut().push(client);
196 inner
197 .config
198 .total_connections
199 .set(inner.config.total_connections.get() + 1);
200 Ok(())
201 }
202 Ok(Err(err)) => Err(ClientError::from(err)),
203 Err(_) => Err(ClientError::HandshakeTimeout),
204 };
205 inner.config.connecting.set(false);
206 for waiter in waiters.borrow_mut().drain(..) {
207 let _ = waiter.send(());
208 }
209
210 if res.is_err() {
211 inner
212 .config
213 .connect_errors
214 .set(inner.config.connect_errors.get() + 1);
215 }
216 let _ = tx.send(res);
217 });
218
219 rx.await?
220 }
221
222 #[inline]
223 pub fn is_ready(&self) -> bool {
227 let connections = self.inner.config.connections.borrow();
228 for client in &*connections {
229 if client.is_ready() {
230 return true;
231 }
232 }
233
234 !self.inner.config.connecting.get() && connections.len() < self.inner.config.maxconn
235 }
236
237 #[inline]
238 pub async fn ready(&self) {
242 loop {
243 if !self.is_ready() {
244 let (tx, rx) = self.inner.config.pool.channel();
246 self.waiters.borrow_mut().push_back(tx);
247 let _ = rx.await;
248 'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
249 if tx.send(()).is_ok() {
250 break 'inner;
251 }
252 }
253 } else {
254 break;
255 }
256 }
257 }
258}
259
260#[doc(hidden)]
261impl Client {
262 pub fn stat_active_connections(&self) -> usize {
263 self.inner.config.connections.borrow().len()
264 }
265
266 pub fn stat_total_connections(&self) -> usize {
267 self.inner.config.total_connections.get()
268 }
269
270 pub fn stat_connect_errors(&self) -> usize {
271 self.inner.config.connect_errors.get()
272 }
273
274 pub fn stat_connections<F, R>(&self, f: F) -> R
275 where
276 F: FnOnce(&[SimpleClient]) -> R,
277 {
278 f(&self.inner.config.connections.borrow())
279 }
280}
281
282pub struct ClientBuilder<A, T> {
287 connect: Connect<A>,
288 inner: InnerConfig,
289 connector: T,
290 _t: PhantomData<A>,
291}
292
293struct InnerConfig {
294 minconn: usize,
295 maxconn: usize,
296 conn_timeout: Millis,
297 conn_lifetime: Duration,
298 disconnect_timeout: Millis,
299 max_streams: u32,
300 skip_unknown_streams: bool,
301 scheme: Scheme,
302 authority: ByteString,
303 connecting: Cell<bool>,
304 connections: RefCell<Vec<SimpleClient>>,
305 total_connections: Cell<usize>,
306 connect_errors: Cell<usize>,
307 pool: pool::Pool<()>,
308}
309
310impl<A, T> ClientBuilder<A, T>
311where
312 A: Address + Clone,
313 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
314 IoBoxed: From<T::Response>,
315{
316 fn new<U, F>(addr: U, connector: F) -> Self
317 where
318 Connect<A>: From<U>,
319 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
320 {
321 let connect = Connect::from(addr);
322 let authority = ByteString::from(connect.host());
323 let connector = connector.into_factory();
324
325 ClientBuilder {
326 connect,
327 connector,
328 inner: InnerConfig {
329 authority,
330 conn_timeout: Millis(1_000),
331 conn_lifetime: Duration::from_secs(0),
332 disconnect_timeout: Millis(15_000),
333 max_streams: 100,
334 skip_unknown_streams: false,
335 minconn: 1,
336 maxconn: 16,
337 scheme: Scheme::HTTP,
338 connecting: Cell::new(false),
339 connections: Default::default(),
340 total_connections: Cell::new(0),
341 connect_errors: Cell::new(0),
342 pool: pool::new(),
343 },
344 _t: PhantomData,
345 }
346 }
347}
348
349impl<A> ClientBuilder<A, DefaultConnector<A>>
350where
351 A: Address + Clone,
352{
353 pub fn with_default<U>(addr: U) -> Self
354 where
355 Connect<A>: From<U>,
356 {
357 Self::new(addr, DefaultConnector::default())
358 }
359}
360
361impl<A, T> ClientBuilder<A, T>
362where
363 A: Address + Clone,
364{
365 #[inline]
366 pub fn scheme(mut self, scheme: Scheme) -> Self {
368 self.inner.scheme = scheme;
369 self
370 }
371
372 pub fn max_streams(mut self, limit: u32) -> Self {
378 self.inner.max_streams = limit;
379 self
380 }
381
382 pub fn skip_unknown_streams(mut self) -> Self {
386 self.inner.skip_unknown_streams = true;
387 self
388 }
389
390 pub fn lifetime(mut self, dur: Seconds) -> Self {
397 self.inner.conn_lifetime = dur.into();
398 self
399 }
400
401 pub fn minconn(mut self, num: usize) -> Self {
405 self.inner.minconn = num;
406 self
407 }
408
409 pub fn maxconn(mut self, num: usize) -> Self {
413 self.inner.maxconn = num;
414 self
415 }
416
417 pub fn connector<U, F>(self, connector: F) -> ClientBuilder<A, U>
419 where
420 F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
421 U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
422 IoBoxed: From<U::Response>,
423 {
424 ClientBuilder {
425 connect: self.connect,
426 connector: connector.into_factory(),
427 inner: self.inner,
428 _t: PhantomData,
429 }
430 }
431}
432
433impl<A, T> ClientBuilder<A, T>
434where
435 A: Address + Clone,
436 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
437 IoBoxed: From<T::Response>,
438{
439 pub async fn finish(self, cfg: SharedCfg) -> Result<Client, T::InitError> {
441 let connect = self.connect;
442 let svc = Pipeline::new(self.connector.create(cfg).await?);
443
444 let connector = Box::new(move || {
445 log::trace!(
446 "{}: Opening http/2 connection to {}",
447 cfg.tag(),
448 connect.host()
449 );
450 let fut = svc.call_static(connect.clone());
451 Box::pin(async move { fut.await.map(IoBoxed::from) }) as Fut
452 });
453
454 Ok(Client {
455 inner: Rc::new(Inner {
456 connector,
457 cfg: cfg.get(),
458 config: self.inner,
459 }),
460 waiters: Default::default(),
461 })
462 }
463}
464
465impl fmt::Debug for Client {
466 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467 f.debug_struct("Client")
468 .field("scheme", &self.inner.config.scheme)
469 .field("authority", &self.inner.config.authority)
470 .field("conn_timeout", &self.inner.config.conn_timeout)
471 .field("conn_lifetime", &self.inner.config.conn_lifetime)
472 .field("disconnect_timeout", &self.inner.config.disconnect_timeout)
473 .field("minconn", &self.inner.config.minconn)
474 .field("maxconn", &self.inner.config.maxconn)
475 .field("max-streams", &self.inner.config.max_streams)
476 .finish()
477 }
478}
479
480impl<A, T> fmt::Debug for ClientBuilder<A, T> {
481 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482 f.debug_struct("ClientBuilder")
483 .field("scheme", &self.inner.scheme)
484 .field("authority", &self.inner.authority)
485 .field("conn_timeout", &self.inner.conn_timeout)
486 .field("conn_lifetime", &self.inner.conn_lifetime)
487 .field("disconnect_timeout", &self.inner.disconnect_timeout)
488 .field("minconn", &self.inner.minconn)
489 .field("maxconn", &self.inner.maxconn)
490 .field("max-streams", &self.inner.max_streams)
491 .finish()
492 }
493}