ntex_h2/client/
pool.rs

1use std::{cell::Cell, cell::RefCell, collections::VecDeque, fmt, rc::Rc, time::Duration};
2
3use nanorand::{Rng, WyRand};
4use ntex_bytes::ByteString;
5use ntex_http::{uri::Scheme, HeaderMap, Method};
6use ntex_io::IoBoxed;
7use ntex_net::connect::{self as connect, Address, Connect, Connector as DefaultConnector};
8use ntex_service::{IntoService, Pipeline, Service};
9use ntex_util::time::{timeout_checked, Millis, Seconds};
10use ntex_util::{channel::oneshot, future::BoxFuture};
11
12use super::stream::{InflightStorage, RecvStream, SendStream};
13use super::{simple::SimpleClient, ClientError};
14
15type Fut = BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>;
16type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, connect::ConnectError>>>;
17
18#[derive(Clone)]
19/// Manages http client network connectivity.
20pub struct Client {
21    inner: Rc<Inner>,
22    waiters: Rc<RefCell<VecDeque<oneshot::Sender<()>>>>,
23}
24
25/// Notify one active waiter
26fn notify(waiters: &mut VecDeque<oneshot::Sender<()>>) {
27    log::debug!("Notify waiter, total {:?}", waiters.len());
28    while let Some(waiter) = waiters.pop_front() {
29        if waiter.send(()).is_ok() {
30            break;
31        }
32    }
33}
34
35impl Client {
36    #[inline]
37    /// Configure and build client
38    pub fn build<A, U, T, F>(addr: U, connector: F) -> ClientBuilder
39    where
40        A: Address + Clone,
41        F: IntoService<T, Connect<A>>,
42        T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
43        IoBoxed: From<T::Response>,
44        Connect<A>: From<U>,
45    {
46        ClientBuilder::new(addr, connector)
47    }
48
49    #[inline]
50    /// Configure and build client
51    pub fn with_default<A, U>(addr: U) -> ClientBuilder
52    where
53        A: Address + Clone,
54        Connect<A>: From<U>,
55    {
56        ClientBuilder::with_default(addr)
57    }
58
59    /// Send request to the peer
60    pub async fn send(
61        &self,
62        method: Method,
63        path: ByteString,
64        headers: HeaderMap,
65        eof: bool,
66    ) -> Result<(SendStream, RecvStream), ClientError> {
67        self.client()
68            .await?
69            .send(method, path, headers, eof)
70            .await
71            .map_err(From::from)
72    }
73
74    /// Get client from the pool
75    pub async fn client(&self) -> Result<SimpleClient, ClientError> {
76        loop {
77            let (client, num) = self.get_client();
78
79            if let Some(client) = client {
80                return Ok(client);
81            } else {
82                self.connect(num).await?;
83            }
84        }
85    }
86
87    async fn connect(&self, num: usize) -> Result<(), ClientError> {
88        // can create new connection
89        if !self.inner.connecting.get()
90            && (num < self.inner.maxconn || (self.inner.minconn > 0 && num < self.inner.minconn))
91        {
92            // create new connection
93            self.inner.connecting.set(true);
94
95            self.create_connection().await?;
96        } else {
97            log::debug!(
98                "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
99                self.inner.connecting.get(), num, self.inner.maxconn);
100
101            // wait for available connection
102            let (tx, rx) = oneshot::channel();
103            self.waiters.borrow_mut().push_back(tx);
104            rx.await?;
105        }
106        Ok(())
107    }
108
109    fn get_client(&self) -> (Option<SimpleClient>, usize) {
110        let mut connections = self.inner.connections.borrow_mut();
111
112        // cleanup connections
113        let mut idx = 0;
114        while idx < connections.len() {
115            if connections[idx].is_closed() {
116                connections.remove(idx);
117            } else if connections[idx].is_disconnecting() {
118                let con = connections.remove(idx);
119                let timeout = self.inner.disconnect_timeout;
120                let _ = ntex_util::spawn(async move {
121                    let _ = con.disconnect().disconnect_timeout(timeout).await;
122                });
123            } else {
124                idx += 1;
125            }
126        }
127        let num = connections.len();
128        if self.inner.minconn > 0 && num < self.inner.minconn {
129            // create new connection
130            (None, num)
131        } else {
132            // first search for connections with less than 50% capacity usage
133            let client = connections.iter().find(|item| {
134                let cap = item.max_streams().unwrap_or(self.inner.max_streams) >> 1;
135                item.active_streams() <= cap
136            });
137            if let Some(client) = client {
138                (Some(client.clone()), num)
139            } else {
140                // check existing connections
141                let available = connections.iter().filter(|item| item.is_ready()).count();
142                let client = if available > 0 {
143                    let idx = WyRand::new().generate_range(0_usize..available);
144                    connections
145                        .iter()
146                        .filter(|item| item.is_ready())
147                        .nth(idx)
148                        .cloned()
149                } else {
150                    None
151                };
152
153                (client, num)
154            }
155        }
156    }
157
158    async fn create_connection(&self) -> Result<(), ClientError> {
159        let (tx, rx) = oneshot::channel();
160
161        let inner = self.inner.clone();
162        let waiters = self.waiters.clone();
163
164        let _ = ntex_util::spawn(async move {
165            let res = match timeout_checked(inner.conn_timeout, (*inner.connector)()).await {
166                Ok(Ok(io)) => {
167                    // callbacks for end of stream
168                    let waiters2 = waiters.clone();
169                    let storage = InflightStorage::new(move |_| {
170                        notify(&mut waiters2.borrow_mut());
171                    });
172                    // construct client
173                    let client = SimpleClient::with_params(
174                        io,
175                        inner.config.clone(),
176                        inner.scheme.clone(),
177                        inner.authority.clone(),
178                        inner.skip_unknown_streams,
179                        storage,
180                    );
181                    inner.connections.borrow_mut().push(client);
182                    inner
183                        .total_connections
184                        .set(inner.total_connections.get() + 1);
185                    Ok(())
186                }
187                Ok(Err(err)) => Err(ClientError::from(err)),
188                Err(_) => Err(ClientError::HandshakeTimeout),
189            };
190            inner.connecting.set(false);
191            for waiter in waiters.borrow_mut().drain(..) {
192                let _ = waiter.send(());
193            }
194
195            if res.is_err() {
196                inner.connect_errors.set(inner.connect_errors.get() + 1);
197            }
198            let _ = tx.send(res);
199        });
200
201        rx.await?
202    }
203
204    #[inline]
205    /// Check if client is allowed to send new request
206    ///
207    /// Readiness depends on number of opened streams and max concurrency setting
208    pub fn is_ready(&self) -> bool {
209        let connections = self.inner.connections.borrow();
210        for client in &*connections {
211            if client.is_ready() {
212                return true;
213            }
214        }
215
216        !self.inner.connecting.get() && connections.len() < self.inner.maxconn
217    }
218
219    #[inline]
220    /// Check client readiness
221    ///
222    /// Client is ready when it is possible to start new stream
223    pub async fn ready(&self) {
224        loop {
225            if !self.is_ready() {
226                // add waiter
227                let (tx, rx) = oneshot::channel();
228                self.waiters.borrow_mut().push_back(tx);
229                let _ = rx.await;
230                'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
231                    if tx.send(()).is_ok() {
232                        break 'inner;
233                    }
234                }
235            } else {
236                break;
237            }
238        }
239    }
240}
241
242#[doc(hidden)]
243impl Client {
244    pub fn stat_active_connections(&self) -> usize {
245        self.inner.connections.borrow().len()
246    }
247
248    pub fn stat_total_connections(&self) -> usize {
249        self.inner.total_connections.get()
250    }
251
252    pub fn stat_connect_errors(&self) -> usize {
253        self.inner.connect_errors.get()
254    }
255
256    pub fn stat_connections<F, R>(&self, f: F) -> R
257    where
258        F: FnOnce(&[SimpleClient]) -> R,
259    {
260        f(&self.inner.connections.borrow())
261    }
262}
263
264/// Manages http client network connectivity.
265///
266/// The `ClientBuilder` type uses a builder-like combinator pattern for service
267/// construction that finishes by calling the `.finish()` method.
268pub struct ClientBuilder(Inner);
269
270struct Inner {
271    minconn: usize,
272    maxconn: usize,
273    conn_timeout: Millis,
274    conn_lifetime: Duration,
275    disconnect_timeout: Millis,
276    max_streams: u32,
277    skip_unknown_streams: bool,
278    scheme: Scheme,
279    config: crate::Config,
280    authority: ByteString,
281    connector: Connector,
282    connecting: Cell<bool>,
283    connections: RefCell<Vec<SimpleClient>>,
284    total_connections: Cell<usize>,
285    connect_errors: Cell<usize>,
286}
287
288impl ClientBuilder {
289    fn new<A, U, T, F>(addr: U, connector: F) -> Self
290    where
291        A: Address + Clone,
292        F: IntoService<T, Connect<A>>,
293        T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
294        IoBoxed: From<T::Response>,
295        Connect<A>: From<U>,
296    {
297        let connect = Connect::from(addr);
298        let authority = ByteString::from(connect.host());
299        let connector = Pipeline::new(connector.into_service());
300
301        let connector = Box::new(move || {
302            log::trace!("Opening http/2 connection to {}", connect.host());
303            let connect = connect.clone();
304            let svc = connector.clone();
305            let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
306            f
307        });
308
309        ClientBuilder(Inner {
310            authority,
311            connector,
312            conn_timeout: Millis(1_000),
313            conn_lifetime: Duration::from_secs(0),
314            disconnect_timeout: Millis(15_000),
315            max_streams: 100,
316            skip_unknown_streams: false,
317            minconn: 1,
318            maxconn: 16,
319            scheme: Scheme::HTTP,
320            config: crate::Config::client(),
321            connecting: Cell::new(false),
322            connections: Default::default(),
323            total_connections: Cell::new(0),
324            connect_errors: Cell::new(0),
325        })
326    }
327
328    pub fn with_default<A, U>(addr: U) -> Self
329    where
330        A: Address + Clone,
331        Connect<A>: From<U>,
332    {
333        Self::new(addr, DefaultConnector::default())
334    }
335}
336
337impl ClientBuilder {
338    #[inline]
339    /// Set client's connection scheme
340    pub fn scheme(mut self, scheme: Scheme) -> Self {
341        self.0.scheme = scheme;
342        self
343    }
344
345    /// Connection timeout.
346    ///
347    /// i.e. max time to connect to remote host including dns name resolution.
348    /// Set to 1 second by default.
349    pub fn timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
350        self.0.conn_timeout = timeout.into();
351        self
352    }
353
354    /// Set total number of simultaneous streams per connection.
355    ///
356    /// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection
357    /// settings.
358    /// The default limit size is 100.
359    pub fn max_streams(mut self, limit: u32) -> Self {
360        self.0.max_streams = limit;
361        self
362    }
363
364    /// Do not return error for frames for unknown streams.
365    ///
366    /// This includes pending resets, data and window update frames.
367    pub fn skip_unknown_streams(mut self) -> Self {
368        self.0.skip_unknown_streams = true;
369        self
370    }
371
372    /// Set max lifetime period for connection.
373    ///
374    /// Connection lifetime is max lifetime of any opened connection
375    /// until it is closed regardless of keep-alive period.
376    ///
377    /// Default lifetime period is not set.
378    pub fn lifetime(mut self, dur: Seconds) -> Self {
379        self.0.conn_lifetime = dur.into();
380        self
381    }
382
383    /// Sets the minimum concurrent connections.
384    ///
385    /// By default min connections is set to a 1.
386    pub fn minconn(mut self, num: usize) -> Self {
387        self.0.minconn = num;
388        self
389    }
390
391    /// Sets the maximum concurrent connections.
392    ///
393    /// By default max connections is set to a 16.
394    pub fn maxconn(mut self, num: usize) -> Self {
395        self.0.maxconn = num;
396        self
397    }
398
399    /// Set client connection disconnect timeout.
400    ///
401    /// Defines a timeout for disconnect connection. Disconnecting connection
402    /// involes closing all active streams. If a disconnect procedure does not complete
403    /// within this time, the socket get dropped.
404    ///
405    /// To disable timeout set value to 0.
406    ///
407    /// By default disconnect timeout is set to 15 seconds.
408    pub fn disconnect_timeout<T: Into<Millis>>(mut self, timeout: T) -> Self {
409        self.0.disconnect_timeout = timeout.into();
410        self
411    }
412
413    /// Configure http2 connection settings
414    pub fn configure<O, R>(self, f: O) -> Self
415    where
416        O: FnOnce(&crate::Config) -> R,
417    {
418        let _ = f(&self.0.config);
419        self
420    }
421
422    /// Http/2 connection settings
423    pub fn config(&self) -> &crate::Config {
424        &self.0.config
425    }
426
427    /// Use custom connector
428    pub fn connector<A, U, T, F>(mut self, addr: U, connector: F) -> Self
429    where
430        A: Address + Clone,
431        F: IntoService<T, Connect<A>>,
432        T: Service<Connect<A>, Error = connect::ConnectError> + 'static,
433        IoBoxed: From<T::Response>,
434        Connect<A>: From<U>,
435    {
436        let connect = Connect::from(addr);
437        let authority = ByteString::from(connect.host());
438        let connector = Pipeline::new(connector.into_service());
439
440        let connector = Box::new(move || {
441            let connect = connect.clone();
442            let svc = connector.clone();
443            let f: Fut = Box::pin(async move { svc.call(connect).await.map(IoBoxed::from) });
444            f
445        });
446
447        self.0.authority = authority;
448        self.0.connector = connector;
449        self
450    }
451
452    /// Finish configuration process and create connections pool.
453    pub fn finish(self) -> Client {
454        Client {
455            inner: Rc::new(self.0),
456            waiters: Default::default(),
457        }
458    }
459}
460
461impl fmt::Debug for Client {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        f.debug_struct("Client")
464            .field("scheme", &self.inner.scheme)
465            .field("authority", &self.inner.authority)
466            .field("conn_timeout", &self.inner.conn_timeout)
467            .field("conn_lifetime", &self.inner.conn_lifetime)
468            .field("disconnect_timeout", &self.inner.disconnect_timeout)
469            .field("minconn", &self.inner.minconn)
470            .field("maxconn", &self.inner.maxconn)
471            .field("max-streams", &self.inner.max_streams)
472            .field("config", &self.inner.config)
473            .finish()
474    }
475}
476
477impl fmt::Debug for ClientBuilder {
478    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
479        f.debug_struct("ClientBuilder")
480            .field("scheme", &self.0.scheme)
481            .field("authority", &self.0.authority)
482            .field("conn_timeout", &self.0.conn_timeout)
483            .field("conn_lifetime", &self.0.conn_lifetime)
484            .field("disconnect_timeout", &self.0.disconnect_timeout)
485            .field("minconn", &self.0.minconn)
486            .field("maxconn", &self.0.maxconn)
487            .field("max-streams", &self.0.max_streams)
488            .field("config", &self.0.config)
489            .finish()
490    }
491}