use std::time::Duration;
use redis::{r#async::Connection, Client, IntoConnectionInfo};
use crate::error::{InitializationError, InitializationResult};
use crate::executor_flavour::ExecutorFlavour;
use crate::helpers;
use crate::instrumentation::{Instrumentation, NoInstrumentation};
use crate::pool::{Config as PoolConfig, Pool};
use crate::{Checkout, RedisPool};
pub use crate::backoff_strategy::BackoffStrategy;
pub use crate::pool::PoolStats;
pub struct Config {
pub desired_pool_size: usize,
pub checkout_timeout: Option<Duration>,
pub backoff_strategy: BackoffStrategy,
pub reservation_limit: Option<usize>,
pub stats_interval: Duration,
}
impl Config {
pub fn desired_pool_size(mut self, v: usize) -> Self {
self.desired_pool_size = v;
self
}
pub fn checkout_timeout(mut self, v: Option<Duration>) -> Self {
self.checkout_timeout = v;
self
}
pub fn backoff_strategy(mut self, v: BackoffStrategy) -> Self {
self.backoff_strategy = v;
self
}
pub fn reservation_limit(mut self, v: Option<usize>) -> Self {
self.reservation_limit = v;
self
}
pub fn stats_interval(mut self, v: Duration) -> Self {
self.stats_interval = v;
self
}
pub fn update_from_environment(mut self, prefix: Option<&str>) -> InitializationResult<Self> {
helpers::set_desired_pool_size(prefix, |v| {
self.desired_pool_size = v;
})?;
helpers::set_checkout_timeout(prefix, |v| {
self.checkout_timeout = v;
})?;
helpers::set_reservation_limit(prefix, |v| {
self.reservation_limit = v;
})?;
helpers::set_stats_interval(prefix, |v| {
self.stats_interval = v;
})?;
Ok(self)
}
pub fn builder(&self) -> Builder<(), NoInstrumentation> {
Builder::default()
.desired_pool_size(self.desired_pool_size)
.checkout_timeout(self.checkout_timeout)
.backoff_strategy(self.backoff_strategy)
.reservation_limit(self.reservation_limit)
.stats_interval(self.stats_interval)
}
}
impl Default for Config {
fn default() -> Self {
Self {
desired_pool_size: 20,
checkout_timeout: Some(Duration::from_millis(20)),
backoff_strategy: BackoffStrategy::default(),
reservation_limit: Some(100),
stats_interval: Duration::from_millis(100),
}
}
}
pub struct Builder<T, I> {
config: Config,
executor_flavour: ExecutorFlavour,
connect_to: T,
instrumentation: Option<I>,
}
impl Default for Builder<(), ()> {
fn default() -> Self {
Self {
config: Config::default(),
executor_flavour: ExecutorFlavour::Runtime,
connect_to: (),
instrumentation: None,
}
}
}
impl<T, I> Builder<T, I> {
pub fn desired_pool_size(mut self, v: usize) -> Self {
self.config.desired_pool_size = v;
self
}
pub fn checkout_timeout(mut self, v: Option<Duration>) -> Self {
self.config.checkout_timeout = v;
self
}
pub fn backoff_strategy(mut self, v: BackoffStrategy) -> Self {
self.config.backoff_strategy = v;
self
}
pub fn reservation_limit(mut self, v: Option<usize>) -> Self {
self.config.reservation_limit = v;
self
}
pub fn stats_interval(mut self, v: Duration) -> Self {
self.config.stats_interval = v;
self
}
pub fn connect_to<C: IntoConnectionInfo>(self, connect_to: C) -> Builder<C, I> {
Builder {
config: self.config,
executor_flavour: self.executor_flavour,
connect_to,
instrumentation: self.instrumentation,
}
}
pub fn task_executor(mut self, executor: ::tokio::runtime::TaskExecutor) -> Self {
self.executor_flavour = ExecutorFlavour::TokioTaskExecutor(executor);
self
}
pub fn update_config_from_environment(
self,
prefix: Option<&str>,
) -> InitializationResult<Builder<T, I>> {
let config = self.config.update_from_environment(prefix)?;
Ok(Builder {
config,
executor_flavour: self.executor_flavour,
connect_to: self.connect_to,
instrumentation: self.instrumentation,
})
}
pub fn instrumented<II>(self, instrumentation: II) -> Builder<T, II>
where
II: Instrumentation + Send + Sync + 'static,
{
Builder {
config: self.config,
executor_flavour: self.executor_flavour,
connect_to: self.connect_to,
instrumentation: Some(instrumentation),
}
}
#[cfg(feature = "metrix")]
pub fn instrumented_with_metrix<A: metrix::processor::AggregatesProcessors>(
self,
aggregates_processors: &mut A,
config: crate::instrumentation::MetrixConfig,
) -> Builder<T, crate::instrumentation::metrix::MetrixInstrumentation> {
let instrumentation = crate::instrumentation::metrix::create(aggregates_processors, config);
Builder {
config: self.config,
executor_flavour: self.executor_flavour,
connect_to: self.connect_to,
instrumentation: Some(instrumentation),
}
}
}
impl<I> Builder<(), I>
where
I: Instrumentation + Send + Sync + 'static,
{
pub fn update_from_environment(
self,
prefix: Option<&str>,
) -> InitializationResult<Builder<String, I>> {
let config = self.config.update_from_environment(prefix)?;
if let Some(mut connect_to) = helpers::get_connect_to(prefix)? {
if connect_to.is_empty() {
Err(InitializationError::message_only(
"'CONNECT_TO' was found but empty",
))
} else {
Ok(Builder {
config,
executor_flavour: self.executor_flavour,
connect_to: connect_to.remove(0),
instrumentation: self.instrumentation,
})
}
} else {
Err(InitializationError::message_only("'CONNECT_TO' was empty"))
}
}
}
impl<T, I> Builder<T, I>
where
T: IntoConnectionInfo,
I: Instrumentation + Send + Sync + 'static,
{
pub fn finish(self) -> InitializationResult<SingleNodePool> {
SingleNodePool::create(
self.config,
self.connect_to,
self.executor_flavour,
self.instrumentation,
)
}
}
impl<I> Builder<String, I>
where
I: Instrumentation + Send + Sync + 'static,
{
pub fn finish2(self) -> InitializationResult<SingleNodePool> {
SingleNodePool::create(
self.config,
&*self.connect_to,
self.executor_flavour,
self.instrumentation,
)
}
}
#[derive(Clone)]
pub struct SingleNodePool {
pool: Pool<Connection>,
checkout_timeout: Option<Duration>,
}
impl SingleNodePool {
pub fn builder() -> Builder<(), ()> {
Builder::default()
}
pub fn new<T>(config: Config, connect_to: T) -> InitializationResult<Self>
where
T: IntoConnectionInfo,
{
Self::create::<T, NoInstrumentation>(config, connect_to, ExecutorFlavour::Runtime, None)
}
pub(crate) fn create<T, I>(
config: Config,
connect_to: T,
executor_flavour: ExecutorFlavour,
instrumentation: Option<I>,
) -> InitializationResult<Self>
where
T: IntoConnectionInfo,
I: Instrumentation + Send + Sync + 'static,
{
if config.desired_pool_size == 0 {
return Err(InitializationError::message_only(
"'desired_pool_size' must be at least 1",
));
}
let client =
Client::open(connect_to).map_err(|err| InitializationError::cause_only(err))?;
let pool_conf = PoolConfig {
desired_pool_size: config.desired_pool_size,
backoff_strategy: config.backoff_strategy,
reservation_limit: config.reservation_limit,
stats_interval: config.stats_interval,
};
let pool = Pool::new(pool_conf, client, executor_flavour, instrumentation);
Ok(Self {
pool,
checkout_timeout: config.checkout_timeout,
})
}
pub fn add_connections(&self, n: usize) {
(0..n).for_each(|_| {
self.pool.add_new_connection();
});
}
pub fn remove_connection(&self) {
self.pool.remove_connection();
}
pub fn stats(&self) -> PoolStats {
self.pool.stats()
}
}
impl RedisPool for SingleNodePool {
fn check_out(&self) -> Checkout {
Checkout(self.pool.check_out(self.checkout_timeout))
}
fn check_out_explicit_timeout(&self, timeout: Option<Duration>) -> Checkout {
Checkout(self.pool.check_out(timeout))
}
}