ntex_h2/client/
pool.rs

1use std::collections::VecDeque;
2use std::{cell::Cell, cell::RefCell, fmt, marker::PhantomData, rc::Rc, time::Duration};
3
4use nanorand::{Rng, WyRand};
5use ntex_bytes::ByteString;
6use ntex_http::{HeaderMap, Method, uri::Scheme};
7use ntex_io::IoBoxed;
8use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
9use ntex_service::cfg::{Cfg, SharedCfg};
10use ntex_service::{IntoServiceFactory, Pipeline, ServiceFactory};
11use ntex_util::time::{Millis, Seconds, timeout_checked};
12use ntex_util::{channel::oneshot, channel::pool, future::BoxFuture};
13
14use super::stream::{InflightStorage, RecvStream, SendStream};
15use super::{ClientError, simple::SimpleClient};
16use crate::ServiceConfig;
17
18type Fut = BoxFuture<'static, Result<IoBoxed, ConnectError>>;
19type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, ConnectError>>>;
20
21#[derive(Clone)]
22/// Manages http client network connectivity.
23pub struct Client {
24    inner: Rc<Inner>,
25    waiters: Rc<RefCell<VecDeque<pool::Sender<()>>>>,
26}
27
28struct Inner {
29    cfg: Cfg<ServiceConfig>,
30    config: InnerConfig,
31    connector: Connector,
32}
33
34/// Notify one active waiter
35fn notify(waiters: &mut VecDeque<pool::Sender<()>>) {
36    log::debug!("Notify waiter, total {:?}", waiters.len());
37    while let Some(waiter) = waiters.pop_front() {
38        if waiter.send(()).is_ok() {
39            break;
40        }
41    }
42}
43
44impl Client {
45    #[inline]
46    /// Configure and build client
47    pub fn build<A, U, T, F>(addr: U, connector: F) -> ClientBuilder<A, T>
48    where
49        A: Address + Clone,
50        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
51        T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
52        IoBoxed: From<T::Response>,
53        Connect<A>: From<U>,
54    {
55        ClientBuilder::new(addr, connector)
56    }
57
58    #[inline]
59    /// Configure and build client
60    pub fn with_default<A, U>(addr: U) -> ClientBuilder<A, DefaultConnector<A>>
61    where
62        A: Address + Clone,
63        Connect<A>: From<U>,
64    {
65        ClientBuilder::with_default(addr)
66    }
67
68    /// Send request to the peer
69    pub async fn send(
70        &self,
71        method: Method,
72        path: ByteString,
73        headers: HeaderMap,
74        eof: bool,
75    ) -> Result<(SendStream, RecvStream), ClientError> {
76        self.client()
77            .await?
78            .send(method, path, headers, eof)
79            .await
80            .map_err(From::from)
81    }
82
83    /// Get client from the pool
84    pub async fn client(&self) -> Result<SimpleClient, ClientError> {
85        loop {
86            let (client, num) = self.get_client();
87
88            if let Some(client) = client {
89                return Ok(client);
90            } else {
91                self.connect(num).await?;
92            }
93        }
94    }
95
96    async fn connect(&self, num: usize) -> Result<(), ClientError> {
97        let cfg = &self.inner.config;
98
99        // can create new connection
100        if !cfg.connecting.get() && (num < cfg.maxconn || (cfg.minconn > 0 && num < cfg.minconn)) {
101            // create new connection
102            cfg.connecting.set(true);
103
104            self.create_connection().await?;
105        } else {
106            log::debug!(
107                "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
108                cfg.connecting.get(),
109                num,
110                cfg.maxconn
111            );
112
113            // wait for available connection
114            let (tx, rx) = cfg.pool.channel();
115            self.waiters.borrow_mut().push_back(tx);
116            rx.await?;
117        }
118        Ok(())
119    }
120
121    fn get_client(&self) -> (Option<SimpleClient>, usize) {
122        let cfg = &self.inner.config;
123        let mut connections = cfg.connections.borrow_mut();
124
125        // cleanup connections
126        let mut idx = 0;
127        while idx < connections.len() {
128            if connections[idx].is_closed() {
129                connections.remove(idx);
130            } else if connections[idx].is_disconnecting() {
131                let con = connections.remove(idx);
132                let timeout = cfg.disconnect_timeout;
133                let _ = ntex_util::spawn(async move {
134                    let _ = con.disconnect().disconnect_timeout(timeout).await;
135                });
136            } else {
137                idx += 1;
138            }
139        }
140        let num = connections.len();
141        if cfg.minconn > 0 && num < cfg.minconn {
142            // create new connection
143            (None, num)
144        } else {
145            // first search for connections with less than 50% capacity usage
146            let client = connections.iter().find(|item| {
147                let cap = item.max_streams().unwrap_or(cfg.max_streams) >> 1;
148                item.active_streams() <= cap
149            });
150            if let Some(client) = client {
151                (Some(client.clone()), num)
152            } else {
153                // check existing connections
154                let available = connections.iter().filter(|item| item.is_ready()).count();
155                let client = if available > 0 {
156                    let idx = WyRand::new().generate_range(0_usize..available);
157                    connections
158                        .iter()
159                        .filter(|item| item.is_ready())
160                        .nth(idx)
161                        .cloned()
162                } else {
163                    None
164                };
165
166                (client, num)
167            }
168        }
169    }
170
171    async fn create_connection(&self) -> Result<(), ClientError> {
172        let (tx, rx) = oneshot::channel();
173
174        let inner = self.inner.clone();
175        let waiters = self.waiters.clone();
176
177        let _ = ntex_util::spawn(async move {
178            let res = match timeout_checked(inner.config.conn_timeout, (*inner.connector)()).await {
179                Ok(Ok(io)) => {
180                    // callbacks for end of stream
181                    let waiters2 = waiters.clone();
182                    let storage = InflightStorage::new(move |_| {
183                        notify(&mut waiters2.borrow_mut());
184                    });
185                    // construct client
186                    let client = SimpleClient::with_params(
187                        io,
188                        inner.cfg,
189                        inner.config.scheme.clone(),
190                        inner.config.authority.clone(),
191                        inner.config.skip_unknown_streams,
192                        storage,
193                        inner.config.pool.clone(),
194                    );
195                    inner.config.connections.borrow_mut().push(client);
196                    inner
197                        .config
198                        .total_connections
199                        .set(inner.config.total_connections.get() + 1);
200                    Ok(())
201                }
202                Ok(Err(err)) => Err(ClientError::from(err)),
203                Err(_) => Err(ClientError::HandshakeTimeout),
204            };
205            inner.config.connecting.set(false);
206            for waiter in waiters.borrow_mut().drain(..) {
207                let _ = waiter.send(());
208            }
209
210            if res.is_err() {
211                inner
212                    .config
213                    .connect_errors
214                    .set(inner.config.connect_errors.get() + 1);
215            }
216            let _ = tx.send(res);
217        });
218
219        rx.await?
220    }
221
222    #[inline]
223    /// Check if client is allowed to send new request
224    ///
225    /// Readiness depends on number of opened streams and max concurrency setting
226    pub fn is_ready(&self) -> bool {
227        let connections = self.inner.config.connections.borrow();
228        for client in &*connections {
229            if client.is_ready() {
230                return true;
231            }
232        }
233
234        !self.inner.config.connecting.get() && connections.len() < self.inner.config.maxconn
235    }
236
237    #[inline]
238    /// Check client readiness
239    ///
240    /// Client is ready when it is possible to start new stream
241    pub async fn ready(&self) {
242        loop {
243            if !self.is_ready() {
244                // add waiter
245                let (tx, rx) = self.inner.config.pool.channel();
246                self.waiters.borrow_mut().push_back(tx);
247                let _ = rx.await;
248                'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
249                    if tx.send(()).is_ok() {
250                        break 'inner;
251                    }
252                }
253            } else {
254                break;
255            }
256        }
257    }
258}
259
260#[doc(hidden)]
261impl Client {
262    pub fn stat_active_connections(&self) -> usize {
263        self.inner.config.connections.borrow().len()
264    }
265
266    pub fn stat_total_connections(&self) -> usize {
267        self.inner.config.total_connections.get()
268    }
269
270    pub fn stat_connect_errors(&self) -> usize {
271        self.inner.config.connect_errors.get()
272    }
273
274    pub fn stat_connections<F, R>(&self, f: F) -> R
275    where
276        F: FnOnce(&[SimpleClient]) -> R,
277    {
278        f(&self.inner.config.connections.borrow())
279    }
280}
281
282/// Manages http client network connectivity.
283///
284/// The `ClientBuilder` type uses a builder-like combinator pattern for service
285/// construction that finishes by calling the `.finish()` method.
286pub struct ClientBuilder<A, T> {
287    connect: Connect<A>,
288    inner: InnerConfig,
289    connector: T,
290    _t: PhantomData<A>,
291}
292
293struct InnerConfig {
294    minconn: usize,
295    maxconn: usize,
296    conn_timeout: Millis,
297    conn_lifetime: Duration,
298    disconnect_timeout: Millis,
299    max_streams: u32,
300    skip_unknown_streams: bool,
301    scheme: Scheme,
302    authority: ByteString,
303    connecting: Cell<bool>,
304    connections: RefCell<Vec<SimpleClient>>,
305    total_connections: Cell<usize>,
306    connect_errors: Cell<usize>,
307    pool: pool::Pool<()>,
308}
309
310impl<A, T> ClientBuilder<A, T>
311where
312    A: Address + Clone,
313    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
314    IoBoxed: From<T::Response>,
315{
316    fn new<U, F>(addr: U, connector: F) -> Self
317    where
318        Connect<A>: From<U>,
319        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
320    {
321        let connect = Connect::from(addr);
322        let authority = ByteString::from(connect.host());
323        let connector = connector.into_factory();
324
325        ClientBuilder {
326            connect,
327            connector,
328            inner: InnerConfig {
329                authority,
330                conn_timeout: Millis(1_000),
331                conn_lifetime: Duration::from_secs(0),
332                disconnect_timeout: Millis(15_000),
333                max_streams: 100,
334                skip_unknown_streams: false,
335                minconn: 1,
336                maxconn: 16,
337                scheme: Scheme::HTTP,
338                connecting: Cell::new(false),
339                connections: Default::default(),
340                total_connections: Cell::new(0),
341                connect_errors: Cell::new(0),
342                pool: pool::new(),
343            },
344            _t: PhantomData,
345        }
346    }
347}
348
349impl<A> ClientBuilder<A, DefaultConnector<A>>
350where
351    A: Address + Clone,
352{
353    pub fn with_default<U>(addr: U) -> Self
354    where
355        Connect<A>: From<U>,
356    {
357        Self::new(addr, DefaultConnector::default())
358    }
359}
360
361impl<A, T> ClientBuilder<A, T>
362where
363    A: Address + Clone,
364{
365    #[inline]
366    /// Set client's connection scheme
367    pub fn scheme(mut self, scheme: Scheme) -> Self {
368        self.inner.scheme = scheme;
369        self
370    }
371
372    /// Set total number of simultaneous streams per connection.
373    ///
374    /// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection
375    /// settings.
376    /// The default limit size is 100.
377    pub fn max_streams(mut self, limit: u32) -> Self {
378        self.inner.max_streams = limit;
379        self
380    }
381
382    /// Do not return error for frames for unknown streams.
383    ///
384    /// This includes pending resets, data and window update frames.
385    pub fn skip_unknown_streams(mut self) -> Self {
386        self.inner.skip_unknown_streams = true;
387        self
388    }
389
390    /// Set max lifetime period for connection.
391    ///
392    /// Connection lifetime is max lifetime of any opened connection
393    /// until it is closed regardless of keep-alive period.
394    ///
395    /// Default lifetime period is not set.
396    pub fn lifetime(mut self, dur: Seconds) -> Self {
397        self.inner.conn_lifetime = dur.into();
398        self
399    }
400
401    /// Sets the minimum concurrent connections.
402    ///
403    /// By default min connections is set to a 1.
404    pub fn minconn(mut self, num: usize) -> Self {
405        self.inner.minconn = num;
406        self
407    }
408
409    /// Sets the maximum concurrent connections.
410    ///
411    /// By default max connections is set to a 16.
412    pub fn maxconn(mut self, num: usize) -> Self {
413        self.inner.maxconn = num;
414        self
415    }
416
417    /// Use custom connector
418    pub fn connector<U, F>(self, connector: F) -> ClientBuilder<A, U>
419    where
420        F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
421        U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
422        IoBoxed: From<U::Response>,
423    {
424        ClientBuilder {
425            connect: self.connect,
426            connector: connector.into_factory(),
427            inner: self.inner,
428            _t: PhantomData,
429        }
430    }
431}
432
433impl<A, T> ClientBuilder<A, T>
434where
435    A: Address + Clone,
436    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
437    IoBoxed: From<T::Response>,
438{
439    /// Finish configuration process and create connections pool.
440    pub async fn finish(self, cfg: SharedCfg) -> Result<Client, T::InitError> {
441        let connect = self.connect;
442        let svc = Pipeline::new(self.connector.create(cfg).await?);
443
444        let connector = Box::new(move || {
445            log::trace!(
446                "{}: Opening http/2 connection to {}",
447                cfg.tag(),
448                connect.host()
449            );
450            let fut = svc.call_static(connect.clone());
451            Box::pin(async move { fut.await.map(IoBoxed::from) }) as Fut
452        });
453
454        Ok(Client {
455            inner: Rc::new(Inner {
456                connector,
457                cfg: cfg.get(),
458                config: self.inner,
459            }),
460            waiters: Default::default(),
461        })
462    }
463}
464
465impl fmt::Debug for Client {
466    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467        f.debug_struct("Client")
468            .field("scheme", &self.inner.config.scheme)
469            .field("authority", &self.inner.config.authority)
470            .field("conn_timeout", &self.inner.config.conn_timeout)
471            .field("conn_lifetime", &self.inner.config.conn_lifetime)
472            .field("disconnect_timeout", &self.inner.config.disconnect_timeout)
473            .field("minconn", &self.inner.config.minconn)
474            .field("maxconn", &self.inner.config.maxconn)
475            .field("max-streams", &self.inner.config.max_streams)
476            .finish()
477    }
478}
479
480impl<A, T> fmt::Debug for ClientBuilder<A, T> {
481    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
482        f.debug_struct("ClientBuilder")
483            .field("scheme", &self.inner.scheme)
484            .field("authority", &self.inner.authority)
485            .field("conn_timeout", &self.inner.conn_timeout)
486            .field("conn_lifetime", &self.inner.conn_lifetime)
487            .field("disconnect_timeout", &self.inner.disconnect_timeout)
488            .field("minconn", &self.inner.minconn)
489            .field("maxconn", &self.inner.maxconn)
490            .field("max-streams", &self.inner.max_streams)
491            .finish()
492    }
493}