use std::sync::Arc;
use std::time::Instant;
use future::BoxFuture;
use futures::prelude::*;
use log::info;
use crate::config::Config;
use crate::connection_factory::ConnectionFactory;
use crate::error::InitializationResult;
use crate::executor_flavour::ExecutorFlavour;
use crate::instrumentation::InstrumentationFlavour;
use crate::{CheckoutError, Ping, PoolState, Poolable};
use super::{pool_internal::Managed, CanCheckout, CheckoutConstraint};
mod inner;
use self::inner::*;
pub(crate) struct PoolPerNode<T: Poolable> {
inner: Arc<(Inner<T>, Vec<String>)>,
}
impl<T: Poolable> PoolPerNode<T> {
pub fn new<F, CF>(
config: Config,
create_connection_factory: F,
executor_flavour: ExecutorFlavour,
instrumentation: InstrumentationFlavour,
) -> InitializationResult<PoolPerNode<T>>
where
CF: ConnectionFactory<Connection = T> + Send + Sync + 'static,
F: Fn(String) -> InitializationResult<CF>,
{
info!(
"Creating pool per node for {:?} nodes",
config.connect_to_nodes
);
let connected_to = config.connect_to_nodes.clone();
let inner = Inner::new(
config,
create_connection_factory,
executor_flavour,
instrumentation,
)?;
Ok(PoolPerNode {
inner: Arc::new((inner, connected_to)),
})
}
pub fn connected_to(&self) -> &[String] {
&(self.inner.1)
}
pub fn state(&self) -> PoolState {
self.inner.0.state()
}
pub fn ping<'a>(&'a self, timeout: Instant) -> impl Future<Output = Vec<Ping>> + 'a {
self.inner.0.ping(timeout)
}
}
impl<T: Poolable> CanCheckout<T> for PoolPerNode<T> {
fn check_out<'a, M: Into<CheckoutConstraint> + Send + 'static>(
&'a self,
constraint: M,
) -> BoxFuture<'a, Result<Managed<T>, CheckoutError>> {
self.inner.0.check_out(constraint.into()).boxed()
}
}
impl<T: Poolable> Clone for PoolPerNode<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}