use std::{cmp, net::SocketAddr};
use fpool::RoundRobinPool;
use hyper::client::connect::dns::Name;
use tower_service::Service;
use crate::config::Config;
use crate::deliverable::Deliverable;
use crate::error::{Error, ErrorKind, RequestError, SpawnError};
use crate::executor::{Executor, ExecutorHandle};
use crate::transaction::Transaction;
use crate::util::RwLockExt;
mod builder;
pub use self::builder::{
ConnectorAdaptor, CreateResolver, DefaultConnectorAdapator, PoolBuilder, PoolConnector,
};
pub struct Pool<D: Deliverable> {
executor_handles: RoundRobinPool<ExecutorHandle<D>, SpawnError>,
}
impl<D: Deliverable> Pool<D> {
pub fn builder(config: Config) -> PoolBuilder<D> {
PoolBuilder::new(config)
}
pub(in crate::pool) fn new<A, CR>(builder: PoolBuilder<D>) -> Result<Pool<D>, SpawnError>
where
A: ConnectorAdaptor<CR::Resolver>,
A::Connect: 'static + Clone + Send + Sync,
CR: CreateResolver,
CR::Resolver: 'static + Clone + Send + Sync + Service<Name>,
CR::Error: 'static + Send + Sync + std::error::Error,
CR::Future: Send + std::future::Future<Output = Result<CR::Response, CR::Error>>,
CR::Response: Iterator<Item = SocketAddr>,
{
let PoolBuilder {
mut config,
transaction_counters,
..
} = builder;
let num_workers = cmp::max(1, config.workers);
config.workers = num_workers;
let executor_handles = RoundRobinPool::builder(config.workers, move || {
let resolver = CR::create_resolver();
let executor = Executor::spawn::<A, CR::Resolver>(&config, resolver);
if let (Ok(ref executor), Some(ref transaction_counters)) =
(executor.as_ref(), transaction_counters.as_ref())
{
transaction_counters
.write_ignore_poison()
.push(executor.transaction_counter())
}
executor
})
.build()?;
Ok(Pool { executor_handles })
}
pub fn request(&mut self, transaction: Transaction<D>) -> Result<(), Error<D>> {
let size = self.executor_handles.size();
self.request_inner(transaction, size)
}
fn request_inner(&mut self, transaction: Transaction<D>, count: usize) -> Result<(), Error<D>> {
if count == 0 {
return Err(Error::new(ErrorKind::PoolFull, transaction));
}
let transaction = match self.executor_handles.get() {
Err(spawn_err) => return Err(Error::new(ErrorKind::Spawn(spawn_err), transaction)),
Ok(handle) => {
match handle.send(transaction) {
Err(RequestError::PoolFull(transaction)) => transaction,
Err(RequestError::FailedSend(transaction)) => {
handle.invalidate();
transaction
}
Ok(_) => return Ok(()),
}
}
};
self.request_inner(transaction, count - 1)
}
pub async fn shutdown(self) {
let handles = self.executor_handles.into_items();
let join_handles: Vec<_> = handles
.into_iter()
.map(|handle| handle.shutdown())
.collect();
futures::future::join_all(join_handles).await;
}
}