1use std::{cell::Cell, cell::RefCell, collections::VecDeque, fmt, rc::Rc, time::Duration};
2
3use nanorand::{Rng, WyRand};
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::IoBoxed;
7use ntex_net::connect::{self as connect, Address, Connect, Connector as DefaultConnector};
8use ntex_service::{IntoService, Pipeline, Service};
9use ntex_util::time::{timeout_checked, Millis, Seconds};
10use ntex_util::{channel::oneshot, future::BoxFuture};
11
12use super::stream::{InflightStorage, RecvStream, SendStream};
13use super::{simple::SimpleClient, ClientError};
14
15type Fut = BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>;
16type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>>;
17
18#[derive(Clone)]
19pub struct Client {
21 inner: Rc<Inner>,
22 waiters: Rc<RefCell<VecDeque<oneshot::Sender<()>>>>,
23}
24
25fn notify(waiters: &mut VecDeque<oneshot::Sender<()>>) {
27 log::debug!("Notify waiter, total {:?}", waiters.len());
28 while let Some(waiter) = waiters.pop_front() {
29 if waiter.send(()).is_ok() {
30 break;
31 }
32 }
33}
34
35impl Client {
36 #[inline]
37 pub fn build<A, U, T, F>(addr: U, connector: F) -> ClientBuilder
39 where
40 A: Address + Clone,
41 F: IntoService<T, Connect<A>>,
42 T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
43 IoBoxed: From<T::Response>,
44 Connect<A>: From<U>,
45 {
46 ClientBuilder::new(addr, connector)
47 }
48
49 #[inline]
50 pub fn with_default<A, U>(addr: U) -> ClientBuilder
52 where
53 A: Address + Clone,
54 Connect<A>: From<U>,
55 {
56 ClientBuilder::with_default(addr)
57 }
58
59 pub async fn send(
61 &self,
62 method: Method,
63 path: ByteString,
64 headers: HeaderMap,
65 eof: bool,
66 ) -> Result<(SendStream, RecvStream), ClientError> {
67 self.client()
68 .await?
69 .send(method, path, headers, eof)
70 .await
71 .map_err(From::from)
72 }
73
74 pub async fn client(&self) -> Result<SimpleClient, ClientError> {
76 loop {
77 let (client, num) = self.get_client();
78
79 if let Some(client) = client {
80 return Ok(client);
81 } else {
82 self.connect(num).await?;
83 }
84 }
85 }
86
87 async fn connect(&self, num: usize) -> Result<(), ClientError> {
88 if !self.inner.connecting.get()
90 && (num < self.inner.maxconn || (self.inner.minconn > 0 && num < self.inner.minconn))
91 {
92 self.inner.connecting.set(true);
94
95 self.create_connection().await?;
96 } else {
97 log::debug!(
98 "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
99 self.inner.connecting.get(), num, self.inner.maxconn);
100
101 let (tx, rx) = oneshot::channel();
103 self.waiters.borrow_mut().push_back(tx);
104 rx.await?;
105 }
106 Ok(())
107 }
108
109 fn get_client(&self) -> (Option<SimpleClient>, usize) {
110 let mut connections = self.inner.connections.borrow_mut();
111
112 let mut idx = 0;
114 while idx < connections.len() {
115 if connections[idx].is_closed() {
116 connections.remove(idx);
117 } else if connections[idx].is_disconnecting() {
118 let con = connections.remove(idx);
119 let timeout = self.inner.disconnect_timeout;
120 let _ = ntex_util::spawn(async move {
121 let _ = con.disconnect().disconnect_timeout(timeout).await;
122 });
123 } else {
124 idx += 1;
125 }
126 }
127 let num = connections.len();
128 if self.inner.minconn > 0 && num < self.inner.minconn {
129 (None, num)
131 } else {
132 let client = connections.iter().find(|item| {
134 let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1;
135 item.active_streams() <= cap
136 });
137 if let Some(client) = client {
138 (Some(client.clone()), num)
139 } else {
140 let available = connections.iter().filter(|item| item.is_ready()).count();
142 let client = if available > 0 {
143 let idx = WyRand::new().generate_range(0_usize..available);
144 connections
145 .iter()
146 .filter(|item| item.is_ready())
147 .nth(idx)
148 .cloned()
149 } else {
150 None
151 };
152
153 (client, num)
154 }
155 }
156 }
157
158 async fn create_connection(&self) -> Result<(), ClientError> {
159 let (tx, rx) = oneshot::channel();
160
161 let inner = self.inner.clone();
162 let waiters = self.waiters.clone();
163
164 let _ = ntex_util::spawn(async move {
165 let res = match timeout_checked(inner.conn_timeout, (*inner.connector)()).await {
166 Ok(Ok(io)) => {
167 let waiters2 = waiters.clone();
169 let storage = InflightStorage::new(move |_| {
170 notify(&mut waiters2.borrow_mut());
171 });
172 let client = SimpleClient::with_params(
174 io,
175 inner.config.clone(),
176 inner.scheme.clone(),
177 inner.authority.clone(),
178 inner.skip_unknown_streams,
179 storage,
180 );
181 inner.connections.borrow_mut().push(client);
182 inner
183 .total_connections
184 .set(inner.total_connections.get() + 1);
185 Ok(())
186 }
187 Ok(Err(err)) => Err(ClientError::from(err)),
188 Err(_) => Err(ClientError::HandshakeTimeout),
189 };
190 inner.connecting.set(false);
191 for waiter in waiters.borrow_mut().drain(..) {
192 let _ = waiter.send(());
193 }
194
195 if res.is_err() {
196 inner.connect_errors.set(inner.connect_errors.get() + 1);
197 }
198 let _ = tx.send(res);
199 });
200
201 rx.await?
202 }
203
204 #[inline]
205 pub fn is_ready(&self) -> bool {
209 let connections = self.inner.connections.borrow();
210 for client in &*connections {
211 if client.is_ready() {
212 return true;
213 }
214 }
215
216 !self.inner.connecting.get() && connections.len() < self.inner.maxconn
217 }
218
219 #[inline]
220 pub async fn ready(&self) {
224 loop {
225 if !self.is_ready() {
226 let (tx, rx) = oneshot::channel();
228 self.waiters.borrow_mut().push_back(tx);
229 let _ = rx.await;
230 'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
231 if tx.send(()).is_ok() {
232 break 'inner;
233 }
234 }
235 } else {
236 break;
237 }
238 }
239 }
240}
241
242#[doc(hidden)]
243impl Client {
244 pub fn stat_active_connections(&self) -> usize {
245 self.inner.connections.borrow().len()
246 }
247
248 pub fn stat_total_connections(&self) -> usize {
249 self.inner.total_connections.get()
250 }
251
252 pub fn stat_connect_errors(&self) -> usize {
253 self.inner.connect_errors.get()
254 }
255
256 pub fn stat_connections<F, R>(&self, f: F) -> R
257 where
258 F: FnOnce(&[SimpleClient]) -> R,
259 {
260 f(&self.inner.connections.borrow())
261 }
262}
263
264pub struct ClientBuilder(Inner);
269
270struct Inner {
271 minconn: usize,
272 maxconn: usize,
273 conn_timeout: Millis,
274 conn_lifetime: Duration,
275 disconnect_timeout: Millis,
276 max_streams: u32,
277 skip_unknown_streams: bool,
278 scheme: Scheme,
279 config: crate::Config,
280 authority: ByteString,
281 connector: Connector,
282 connecting: Cell<bool>,
283 connections: RefCell<Vec<SimpleClient>>,
284 total_connections: Cell<usize>,
285 connect_errors: Cell<usize>,
286}
287
288impl ClientBuilder {
289 fn new<A, U, T, F>(addr: U, connector: F) -> Self
290 where
291 A: Address + Clone,
292 F: IntoService<T, Connect<A>>,
293 T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
294 IoBoxed: From<T::Response>,
295 Connect<A>: From<U>,
296 {
297 let connect = Connect::from(addr);
298 let authority = ByteString::from(connect.host());
299 let connector = Pipeline::new(connector.into_service());
300
301 let connector = Box::new(move || {
302 log::trace!("Opening http/2 connection to {}", connect.host());
303 let connect = connect.clone();
304 let svc = connector.clone();
305 let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
306 f
307 });
308
309 ClientBuilder(Inner {
310 authority,
311 connector,
312 conn_timeout: Millis(1_000),
313 conn_lifetime: Duration::from_secs(0),
314 disconnect_timeout: Millis(15_000),
315 max_streams: 100,
316 skip_unknown_streams: false,
317 minconn: 1,
318 maxconn: 16,
319 scheme: Scheme::HTTP,
320 config: crate::Config::client(),
321 connecting: Cell::new(false),
322 connections: Default::default(),
323 total_connections: Cell::new(0),
324 connect_errors: Cell::new(0),
325 })
326 }
327
328 pub fn with_default<A, U>(addr: U) -> Self
329 where
330 A: Address + Clone,
331 Connect<A>: From<U>,
332 {
333 Self::new(addr, DefaultConnector::default())
334 }
335}
336
337impl ClientBuilder {
338 #[inline]
339 pub fn scheme(mut self, scheme: Scheme) -> Self {
341 self.0.scheme = scheme;
342 self
343 }
344
345 pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
350 self.0.conn_timeout = timeout.into();
351 self
352 }
353
354 pub fn max_streams(mut self, limit: u32) -> Self {
360 self.0.max_streams = limit;
361 self
362 }
363
364 pub fn skip_unknown_streams(mut self) -> Self {
368 self.0.skip_unknown_streams = true;
369 self
370 }
371
372 pub fn lifetime(mut self, dur: Seconds) -> Self {
379 self.0.conn_lifetime = dur.into();
380 self
381 }
382
383 pub fn minconn(mut self, num: usize) -> Self {
387 self.0.minconn = num;
388 self
389 }
390
391 pub fn maxconn(mut self, num: usize) -> Self {
395 self.0.maxconn = num;
396 self
397 }
398
399 pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
409 self.0.disconnect_timeout = timeout.into();
410 self
411 }
412
413 pub fn configure<O, R>(self, f: O) -> Self
415 where
416 O: FnOnce(&crate::Config) -> R,
417 {
418 let _ = f(&self.0.config);
419 self
420 }
421
422 pub fn config(&self) -> &crate::Config {
424 &self.0.config
425 }
426
427 pub fn connector<A, U, T, F>(mut self, addr: U, connector: F) -> Self
429 where
430 A: Address + Clone,
431 F: IntoService<T, Connect<A>>,
432 T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
433 IoBoxed: From<T::Response>,
434 Connect<A>: From<U>,
435 {
436 let connect = Connect::from(addr);
437 let authority = ByteString::from(connect.host());
438 let connector = Pipeline::new(connector.into_service());
439
440 let connector = Box::new(move || {
441 let connect = connect.clone();
442 let svc = connector.clone();
443 let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
444 f
445 });
446
447 self.0.authority = authority;
448 self.0.connector = connector;
449 self
450 }
451
452 pub fn finish(self) -> Client {
454 Client {
455 inner: Rc::new(self.0),
456 waiters: Default::default(),
457 }
458 }
459}
460
461impl fmt::Debug for Client {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 f.debug_struct("Client")
464 .field("scheme", &self.inner.scheme)
465 .field("authority", &self.inner.authority)
466 .field("conn_timeout", &self.inner.conn_timeout)
467 .field("conn_lifetime", &self.inner.conn_lifetime)
468 .field("disconnect_timeout", &self.inner.disconnect_timeout)
469 .field("minconn", &self.inner.minconn)
470 .field("maxconn", &self.inner.maxconn)
471 .field("max-streams", &self.inner.max_streams)
472 .field("config", &self.inner.config)
473 .finish()
474 }
475}
476
477impl fmt::Debug for ClientBuilder {
478 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
479 f.debug_struct("ClientBuilder")
480 .field("scheme", &self.0.scheme)
481 .field("authority", &self.0.authority)
482 .field("conn_timeout", &self.0.conn_timeout)
483 .field("conn_lifetime", &self.0.conn_lifetime)
484 .field("disconnect_timeout", &self.0.disconnect_timeout)
485 .field("minconn", &self.0.minconn)
486 .field("maxconn", &self.0.maxconn)
487 .field("max-streams", &self.0.max_streams)
488 .field("config", &self.0.config)
489 .finish()
490 }
491}