hyper_client_pool/pool/
mod.rs

1//! HTTP Client Worker Pool
2//!
3//! This module provides a simple API wrapping a pool of HTTP clients
4use std::{cmp, net::SocketAddr};
5
6use fpool::RoundRobinPool;
7use hyper::client::connect::dns::Name;
8use tower_service::Service;
9
10use crate::config::Config;
11use crate::deliverable::Deliverable;
12use crate::error::{Error, ErrorKind, RequestError, SpawnError};
13use crate::executor::{Executor, ExecutorHandle};
14use crate::transaction::Transaction;
15use crate::util::RwLockExt;
16
17mod builder;
18
19pub use self::builder::{
20    ConnectorAdaptor, CreateResolver, DefaultConnectorAdapator, PoolBuilder, PoolConnector,
21};
22
23/// A pool of [`hyper::Client`]s.
24///
25/// Manages a set of `hyper::Client` for maximizing throughput while presenting
26/// a `request` API similar to using a `hyper::Client` directly. The number of
27/// active transactions running on each client is tracked so that max_transactions_per_worker
28/// is respected. When all clients are full, backpressure is provided in the
29/// form of an Error variant saying "busy; try again later".
30pub struct Pool<D: Deliverable> {
31    executor_handles: RoundRobinPool<ExecutorHandle<D>, SpawnError>,
32}
33
34impl<D: Deliverable> Pool<D> {
35    pub fn builder(config: Config) -> PoolBuilder<D> {
36        PoolBuilder::new(config)
37    }
38
39    pub(in crate::pool) fn new<A, CR>(builder: PoolBuilder<D>) -> Result<Pool<D>, SpawnError>
40    where
41        A: ConnectorAdaptor<CR::Resolver>,
42        A::Connect: 'static + Clone + Send + Sync,
43        CR: CreateResolver,
44        CR::Resolver: 'static + Clone + Send + Sync + Service<Name>,
45        CR::Error: 'static + Send + Sync + std::error::Error,
46        CR::Future: Send + std::future::Future<Output = Result<CR::Response, CR::Error>>,
47        CR::Response: Iterator<Item = SocketAddr>,
48    {
49        let PoolBuilder {
50            mut config,
51            transaction_counters,
52            ..
53        } = builder;
54
55        // Make sure config.workers is a reasonable value
56        let num_workers = cmp::max(1, config.workers);
57        config.workers = num_workers;
58
59        let executor_handles = RoundRobinPool::builder(config.workers, move || {
60            let resolver = CR::create_resolver();
61
62            let executor = Executor::spawn::<A, CR::Resolver>(&config, resolver);
63
64            // Push a transaction counter to the synchronized transaction_counters
65            // if executor creation was successful
66            if let (Ok(ref executor), Some(ref transaction_counters)) =
67                (executor.as_ref(), transaction_counters.as_ref())
68            {
69                transaction_counters
70                    .write_ignore_poison()
71                    .push(executor.transaction_counter())
72            }
73
74            executor
75        })
76        .build()?;
77
78        Ok(Pool { executor_handles })
79    }
80
81    /// Start or queue a request
82    ///
83    /// The request will be started immediately assuming one of the clients in
84    /// this pool is not at max_sockets.
85    pub fn request(&mut self, transaction: Transaction<D>) -> Result<(), Error<D>> {
86        let size = self.executor_handles.size();
87        self.request_inner(transaction, size)
88    }
89
90    fn request_inner(&mut self, transaction: Transaction<D>, count: usize) -> Result<(), Error<D>> {
91        if count == 0 {
92            return Err(Error::new(ErrorKind::PoolFull, transaction));
93        }
94
95        let transaction = match self.executor_handles.get() {
96            Err(spawn_err) => return Err(Error::new(ErrorKind::Spawn(spawn_err), transaction)),
97            Ok(handle) => {
98                match handle.send(transaction) {
99                    // Returning the transaction means that we will retry in next iteration
100                    Err(RequestError::PoolFull(transaction)) => transaction,
101                    Err(RequestError::FailedSend(transaction)) => {
102                        // invalidate the thread as it didn't send
103                        handle.invalidate();
104                        transaction
105                    }
106                    Ok(_) => return Ok(()),
107                }
108            }
109        };
110
111        self.request_inner(transaction, count - 1)
112    }
113
114    /// Shutdown the pool
115    ///
116    /// Waits for all workers to be empty before stopping.
117    pub async fn shutdown(self) {
118        let handles = self.executor_handles.into_items();
119        let join_handles: Vec<_> = handles
120            .into_iter()
121            .map(|handle| handle.shutdown())
122            .collect();
123
124        futures::future::join_all(join_handles).await;
125    }
126}