ntex_h2/client/
pool.rs

1use std::collections::VecDeque;
2use std::{cell::Cell, cell::RefCell, fmt, marker::PhantomData, rc::Rc, time::Duration};
3
4use nanorand::{Rng, WyRand};
5use ntex_bytes::ByteString;
6use ntex_http::{HeaderMap, Method, uri::Scheme};
7use ntex_io::IoBoxed;
8use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
9use ntex_service::cfg::{Cfg, SharedCfg};
10use ntex_service::{IntoServiceFactory, Pipeline, ServiceFactory};
11use ntex_util::time::{Millis, Seconds, timeout_checked};
12use ntex_util::{channel::oneshot, channel::pool, future::BoxFuture};
13
14use super::stream::{InflightStorage, RecvStream, SendStream};
15use super::{ClientError, simple::SimpleClient};
16use crate::ServiceConfig;
17
18type Fut = BoxFuture<'static, Result<IoBoxed, ConnectError>>;
19type Connector = Box<dyn Fn() -> BoxFuture<'static, Result<IoBoxed, ConnectError>>>;
20
21#[derive(Clone)]
22/// Manages http client network connectivity.
23pub struct Client {
24    inner: Rc<Inner>,
25    waiters: Rc<RefCell<VecDeque<pool::Sender<()>>>>,
26}
27
28struct Inner {
29    cfg: Cfg<ServiceConfig>,
30    config: InnerConfig,
31    connector: Connector,
32}
33
34/// Notify one active waiter
35fn notify(waiters: &mut VecDeque<pool::Sender<()>>) {
36    log::debug!("Notify waiter, total {:?}", waiters.len());
37    while let Some(waiter) = waiters.pop_front() {
38        if waiter.send(()).is_ok() {
39            break;
40        }
41    }
42}
43
44impl Client {
45    #[inline]
46    /// Configure and build client
47    pub fn builder<A, U, T, F>(addr: U, connector: F) -> ClientBuilder<A, T>
48    where
49        A: Address + Clone,
50        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
51        T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
52        IoBoxed: From<T::Response>,
53        Connect<A>: From<U>,
54    {
55        ClientBuilder::new(addr, connector)
56    }
57
58    /// Send request to the peer
59    pub async fn send(
60        &self,
61        method: Method,
62        path: ByteString,
63        headers: HeaderMap,
64        eof: bool,
65    ) -> Result<(SendStream, RecvStream), ClientError> {
66        self.client()
67            .await?
68            .send(method, path, headers, eof)
69            .await
70            .map_err(From::from)
71    }
72
73    /// Get client from the pool
74    pub async fn client(&self) -> Result<SimpleClient, ClientError> {
75        loop {
76            let (client, num) = self.get_client();
77
78            if let Some(client) = client {
79                return Ok(client);
80            } else {
81                self.connect(num).await?;
82            }
83        }
84    }
85
86    async fn connect(&self, num: usize) -> Result<(), ClientError> {
87        let cfg = &self.inner.config;
88
89        // can create new connection
90        if !cfg.connecting.get() && (num < cfg.maxconn || (cfg.minconn > 0 && num < cfg.minconn)) {
91            // create new connection
92            cfg.connecting.set(true);
93
94            self.create_connection().await?;
95        } else {
96            log::debug!(
97                "New connection is being established {:?} or number of existing cons {} greater than allowed {}",
98                cfg.connecting.get(),
99                num,
100                cfg.maxconn
101            );
102
103            // wait for available connection
104            let (tx, rx) = cfg.pool.channel();
105            self.waiters.borrow_mut().push_back(tx);
106            rx.await?;
107        }
108        Ok(())
109    }
110
111    fn get_client(&self) -> (Option<SimpleClient>, usize) {
112        let cfg = &self.inner.config;
113        let mut connections = cfg.connections.borrow_mut();
114
115        // cleanup connections
116        let mut idx = 0;
117        while idx < connections.len() {
118            if connections[idx].is_closed() {
119                connections.remove(idx);
120            } else if connections[idx].is_disconnecting() {
121                let con = connections.remove(idx);
122                let timeout = cfg.disconnect_timeout;
123                let _ = ntex_util::spawn(async move {
124                    let _ = con.disconnect().disconnect_timeout(timeout).await;
125                });
126            } else {
127                idx += 1;
128            }
129        }
130        let num = connections.len();
131        if cfg.minconn > 0 && num < cfg.minconn {
132            // create new connection
133            (None, num)
134        } else {
135            // first search for connections with less than 50% capacity usage
136            let client = connections.iter().find(|item| {
137                let cap = item.max_streams().unwrap_or(cfg.max_streams) >> 1;
138                item.active_streams() <= cap
139            });
140            if let Some(client) = client {
141                (Some(client.clone()), num)
142            } else {
143                // check existing connections
144                let available = connections.iter().filter(|item| item.is_ready()).count();
145                let client = if available > 0 {
146                    let idx = WyRand::new().generate_range(0_usize..available);
147                    connections
148                        .iter()
149                        .filter(|item| item.is_ready())
150                        .nth(idx)
151                        .cloned()
152                } else {
153                    None
154                };
155
156                (client, num)
157            }
158        }
159    }
160
161    async fn create_connection(&self) -> Result<(), ClientError> {
162        let (tx, rx) = oneshot::channel();
163
164        let inner = self.inner.clone();
165        let waiters = self.waiters.clone();
166
167        let _ = ntex_util::spawn(async move {
168            let res = match timeout_checked(inner.config.conn_timeout, (*inner.connector)()).await {
169                Ok(Ok(io)) => {
170                    // callbacks for end of stream
171                    let waiters2 = waiters.clone();
172                    let storage = InflightStorage::new(move |_| {
173                        notify(&mut waiters2.borrow_mut());
174                    });
175                    // construct client
176                    let client = SimpleClient::with_params(
177                        io,
178                        inner.cfg,
179                        inner.config.scheme.clone(),
180                        inner.config.authority.clone(),
181                        inner.config.skip_unknown_streams,
182                        storage,
183                        inner.config.pool.clone(),
184                    );
185                    inner.config.connections.borrow_mut().push(client);
186                    inner
187                        .config
188                        .total_connections
189                        .set(inner.config.total_connections.get() + 1);
190                    Ok(())
191                }
192                Ok(Err(err)) => Err(ClientError::from(err)),
193                Err(_) => Err(ClientError::HandshakeTimeout),
194            };
195            inner.config.connecting.set(false);
196            for waiter in waiters.borrow_mut().drain(..) {
197                let _ = waiter.send(());
198            }
199
200            if res.is_err() {
201                inner
202                    .config
203                    .connect_errors
204                    .set(inner.config.connect_errors.get() + 1);
205            }
206            let _ = tx.send(res);
207        });
208
209        rx.await?
210    }
211
212    #[inline]
213    /// Check if client is allowed to send new request
214    ///
215    /// Readiness depends on number of opened streams and max concurrency setting
216    pub fn is_ready(&self) -> bool {
217        let connections = self.inner.config.connections.borrow();
218        for client in &*connections {
219            if client.is_ready() {
220                return true;
221            }
222        }
223
224        !self.inner.config.connecting.get() && connections.len() < self.inner.config.maxconn
225    }
226
227    #[inline]
228    /// Check client readiness
229    ///
230    /// Client is ready when it is possible to start new stream
231    pub async fn ready(&self) {
232        loop {
233            if !self.is_ready() {
234                // add waiter
235                let (tx, rx) = self.inner.config.pool.channel();
236                self.waiters.borrow_mut().push_back(tx);
237                let _ = rx.await;
238                'inner: while let Some(tx) = self.waiters.borrow_mut().pop_front() {
239                    if tx.send(()).is_ok() {
240                        break 'inner;
241                    }
242                }
243            } else {
244                break;
245            }
246        }
247    }
248}
249
250#[doc(hidden)]
251impl Client {
252    pub fn stat_active_connections(&self) -> usize {
253        self.inner.config.connections.borrow().len()
254    }
255
256    pub fn stat_total_connections(&self) -> usize {
257        self.inner.config.total_connections.get()
258    }
259
260    pub fn stat_connect_errors(&self) -> usize {
261        self.inner.config.connect_errors.get()
262    }
263
264    pub fn stat_connections<F, R>(&self, f: F) -> R
265    where
266        F: FnOnce(&[SimpleClient]) -> R,
267    {
268        f(&self.inner.config.connections.borrow())
269    }
270}
271
272/// Manages http client network connectivity.
273///
274/// The `ClientBuilder` type uses a builder-like combinator pattern for service
275/// construction that finishes by calling the `.finish()` method.
276pub struct ClientBuilder<A, T> {
277    connect: Connect<A>,
278    inner: InnerConfig,
279    connector: T,
280    _t: PhantomData<A>,
281}
282
283struct InnerConfig {
284    minconn: usize,
285    maxconn: usize,
286    conn_timeout: Millis,
287    conn_lifetime: Duration,
288    disconnect_timeout: Millis,
289    max_streams: u32,
290    skip_unknown_streams: bool,
291    scheme: Scheme,
292    authority: ByteString,
293    connecting: Cell<bool>,
294    connections: RefCell<Vec<SimpleClient>>,
295    total_connections: Cell<usize>,
296    connect_errors: Cell<usize>,
297    pool: pool::Pool<()>,
298}
299
300impl<A, T> ClientBuilder<A, T>
301where
302    A: Address + Clone,
303    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
304    IoBoxed: From<T::Response>,
305{
306    fn new<U, F>(addr: U, connector: F) -> Self
307    where
308        Connect<A>: From<U>,
309        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
310    {
311        let connect = Connect::from(addr);
312        let authority = ByteString::from(connect.host());
313        let connector = connector.into_factory();
314
315        ClientBuilder {
316            connect,
317            connector,
318            inner: InnerConfig {
319                authority,
320                conn_timeout: Millis(1_000),
321                conn_lifetime: Duration::from_secs(0),
322                disconnect_timeout: Millis(15_000),
323                max_streams: 100,
324                skip_unknown_streams: false,
325                minconn: 1,
326                maxconn: 16,
327                scheme: Scheme::HTTP,
328                connecting: Cell::new(false),
329                connections: Default::default(),
330                total_connections: Cell::new(0),
331                connect_errors: Cell::new(0),
332                pool: pool::new(),
333            },
334            _t: PhantomData,
335        }
336    }
337}
338
339impl<A> ClientBuilder<A, DefaultConnector<A>>
340where
341    A: Address + Clone,
342{
343    pub fn with_default<U>(addr: U) -> Self
344    where
345        Connect<A>: From<U>,
346    {
347        Self::new(addr, DefaultConnector::default())
348    }
349}
350
351impl<A, T> ClientBuilder<A, T>
352where
353    A: Address + Clone,
354{
355    #[inline]
356    /// Set client's connection scheme
357    pub fn scheme(mut self, scheme: Scheme) -> Self {
358        self.inner.scheme = scheme;
359        self
360    }
361
362    /// Set total number of simultaneous streams per connection.
363    ///
364    /// If limit is 0, the connector uses "MAX_CONCURRENT_STREAMS" config from connection
365    /// settings.
366    /// The default limit size is 100.
367    pub fn max_streams(mut self, limit: u32) -> Self {
368        self.inner.max_streams = limit;
369        self
370    }
371
372    /// Do not return error for frames for unknown streams.
373    ///
374    /// This includes pending resets, data and window update frames.
375    pub fn skip_unknown_streams(mut self) -> Self {
376        self.inner.skip_unknown_streams = true;
377        self
378    }
379
380    /// Set max lifetime period for connection.
381    ///
382    /// Connection lifetime is max lifetime of any opened connection
383    /// until it is closed regardless of keep-alive period.
384    ///
385    /// Default lifetime period is not set.
386    pub fn lifetime(mut self, dur: Seconds) -> Self {
387        self.inner.conn_lifetime = dur.into();
388        self
389    }
390
391    /// Sets the minimum concurrent connections.
392    ///
393    /// By default min connections is set to a 1.
394    pub fn minconn(mut self, num: usize) -> Self {
395        self.inner.minconn = num;
396        self
397    }
398
399    /// Sets the maximum concurrent connections.
400    ///
401    /// By default max connections is set to a 16.
402    pub fn maxconn(mut self, num: usize) -> Self {
403        self.inner.maxconn = num;
404        self
405    }
406
407    /// Use custom connector
408    pub fn connector<U, F>(self, connector: F) -> ClientBuilder<A, U>
409    where
410        F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
411        U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
412        IoBoxed: From<U::Response>,
413    {
414        ClientBuilder {
415            connect: self.connect,
416            connector: connector.into_factory(),
417            inner: self.inner,
418            _t: PhantomData,
419        }
420    }
421}
422
423impl<A, T> ClientBuilder<A, T>
424where
425    A: Address + Clone,
426    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError> + 'static,
427    IoBoxed: From<T::Response>,
428{
429    /// Finish configuration process and create connections pool.
430    pub async fn build(self, cfg: SharedCfg) -> Result<Client, T::InitError> {
431        let connect = self.connect;
432        let svc = Pipeline::new(self.connector.create(cfg).await?);
433
434        let connector = Box::new(move || {
435            log::trace!(
436                "{}: Opening http/2 connection to {}",
437                cfg.tag(),
438                connect.host()
439            );
440            let fut = svc.call_static(connect.clone());
441            Box::pin(async move { fut.await.map(IoBoxed::from) }) as Fut
442        });
443
444        Ok(Client {
445            inner: Rc::new(Inner {
446                connector,
447                cfg: cfg.get(),
448                config: self.inner,
449            }),
450            waiters: Default::default(),
451        })
452    }
453}
454
455impl fmt::Debug for Client {
456    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457        f.debug_struct("Client")
458            .field("scheme", &self.inner.config.scheme)
459            .field("authority", &self.inner.config.authority)
460            .field("conn_timeout", &self.inner.config.conn_timeout)
461            .field("conn_lifetime", &self.inner.config.conn_lifetime)
462            .field("disconnect_timeout", &self.inner.config.disconnect_timeout)
463            .field("minconn", &self.inner.config.minconn)
464            .field("maxconn", &self.inner.config.maxconn)
465            .field("max-streams", &self.inner.config.max_streams)
466            .finish()
467    }
468}
469
470impl<A, T> fmt::Debug for ClientBuilder<A, T> {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        f.debug_struct("ClientBuilder")
473            .field("scheme", &self.inner.scheme)
474            .field("authority", &self.inner.authority)
475            .field("conn_timeout", &self.inner.conn_timeout)
476            .field("conn_lifetime", &self.inner.conn_lifetime)
477            .field("disconnect_timeout", &self.inner.disconnect_timeout)
478            .field("minconn", &self.inner.minconn)
479            .field("maxconn", &self.inner.maxconn)
480            .field("max-streams", &self.inner.max_streams)
481            .finish()
482    }
483}