hyper_client_pool/pool/
mod.rs1use std::{cmp, net::SocketAddr};
5
6use fpool::RoundRobinPool;
7use hyper::client::connect::dns::Name;
8use tower_service::Service;
9
10use crate::config::Config;
11use crate::deliverable::Deliverable;
12use crate::error::{Error, ErrorKind, RequestError, SpawnError};
13use crate::executor::{Executor, ExecutorHandle};
14use crate::transaction::Transaction;
15use crate::util::RwLockExt;
16
17mod builder;
18
19pub use self::builder::{
20 ConnectorAdaptor, CreateResolver, DefaultConnectorAdapator, PoolBuilder, PoolConnector,
21};
22
23pub struct Pool<D: Deliverable> {
31 executor_handles: RoundRobinPool<ExecutorHandle<D>, SpawnError>,
32}
33
34impl<D: Deliverable> Pool<D> {
35 pub fn builder(config: Config) -> PoolBuilder<D> {
36 PoolBuilder::new(config)
37 }
38
39 pub(in crate::pool) fn new<A, CR>(builder: PoolBuilder<D>) -> Result<Pool<D>, SpawnError>
40 where
41 A: ConnectorAdaptor<CR::Resolver>,
42 A::Connect: 'static + Clone + Send + Sync,
43 CR: CreateResolver,
44 CR::Resolver: 'static + Clone + Send + Sync + Service<Name>,
45 CR::Error: 'static + Send + Sync + std::error::Error,
46 CR::Future: Send + std::future::Future<Output = Result<CR::Response, CR::Error>>,
47 CR::Response: Iterator<Item = SocketAddr>,
48 {
49 let PoolBuilder {
50 mut config,
51 transaction_counters,
52 ..
53 } = builder;
54
55 let num_workers = cmp::max(1, config.workers);
57 config.workers = num_workers;
58
59 let executor_handles = RoundRobinPool::builder(config.workers, move || {
60 let resolver = CR::create_resolver();
61
62 let executor = Executor::spawn::<A, CR::Resolver>(&config, resolver);
63
64 if let (Ok(ref executor), Some(ref transaction_counters)) =
67 (executor.as_ref(), transaction_counters.as_ref())
68 {
69 transaction_counters
70 .write_ignore_poison()
71 .push(executor.transaction_counter())
72 }
73
74 executor
75 })
76 .build()?;
77
78 Ok(Pool { executor_handles })
79 }
80
81 pub fn request(&mut self, transaction: Transaction<D>) -> Result<(), Error<D>> {
86 let size = self.executor_handles.size();
87 self.request_inner(transaction, size)
88 }
89
90 fn request_inner(&mut self, transaction: Transaction<D>, count: usize) -> Result<(), Error<D>> {
91 if count == 0 {
92 return Err(Error::new(ErrorKind::PoolFull, transaction));
93 }
94
95 let transaction = match self.executor_handles.get() {
96 Err(spawn_err) => return Err(Error::new(ErrorKind::Spawn(spawn_err), transaction)),
97 Ok(handle) => {
98 match handle.send(transaction) {
99 Err(RequestError::PoolFull(transaction)) => transaction,
101 Err(RequestError::FailedSend(transaction)) => {
102 handle.invalidate();
104 transaction
105 }
106 Ok(_) => return Ok(()),
107 }
108 }
109 };
110
111 self.request_inner(transaction, count - 1)
112 }
113
114 pub async fn shutdown(self) {
118 let handles = self.executor_handles.into_items();
119 let join_handles: Vec<_> = handles
120 .into_iter()
121 .map(|handle| handle.shutdown())
122 .collect();
123
124 futures::future::join_all(join_handles).await;
125 }
126}