use std::time::{Duration, Instant};
use futures::prelude::*;
use crate::config::Builder;
use crate::config::DefaultPoolCheckoutMode;
pub mod config;
pub mod instrumentation;
use future::BoxFuture;
pub use redis::{
aio::ConnectionLike, cmd, Cmd, FromRedisValue, NumericBehavior, RedisError, RedisFuture,
ToRedisArgs, Value,
};
pub use crate::error::{CheckoutError, CheckoutErrorKind};
pub use commands::Commands;
pub use pool_connection::{ConnectionFlavour, PoolConnection};
pub mod connection_factory;
pub mod error;
pub(crate) mod executor_flavour;
pub(crate) mod helpers;
mod activation_order;
mod backoff_strategy;
mod commands;
mod pool_connection;
mod pools;
mod redis_rs;
pub trait Poolable: Send + Sized + 'static {
fn connected_to(&self) -> &str;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckoutMode {
Immediately,
Wait,
PoolDefault,
Until(Instant),
}
impl CheckoutMode {
pub fn is_deadline_elapsed(self) -> bool {
match self {
CheckoutMode::Until(deadline) => deadline < Instant::now(),
_ => false,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Immediately;
#[derive(Debug, Clone, Copy)]
pub struct Wait;
#[derive(Debug, Clone, Copy)]
pub struct PoolDefault;
impl From<Immediately> for CheckoutMode {
fn from(_: Immediately) -> Self {
CheckoutMode::Immediately
}
}
impl From<Wait> for CheckoutMode {
fn from(_: Wait) -> Self {
CheckoutMode::Wait
}
}
impl From<PoolDefault> for CheckoutMode {
fn from(_: PoolDefault) -> Self {
CheckoutMode::PoolDefault
}
}
impl Default for CheckoutMode {
fn default() -> Self {
CheckoutMode::PoolDefault
}
}
impl From<Duration> for CheckoutMode {
fn from(d: Duration) -> Self {
let timeout = Instant::now() + d;
timeout.into()
}
}
impl From<Instant> for CheckoutMode {
fn from(until: Instant) -> Self {
CheckoutMode::Until(until)
}
}
enum RedisPoolFlavour<T: Poolable> {
Empty,
Single(pools::SinglePool<T>),
PerNode(pools::PoolPerNode<T>),
}
impl<T: Poolable> Clone for RedisPoolFlavour<T> {
fn clone(&self) -> Self {
use RedisPoolFlavour::*;
match self {
Empty => Empty,
Single(pool) => Single(pool.clone()),
PerNode(pool) => PerNode(pool.clone()),
}
}
}
pub struct RedisPool<T: Poolable = ConnectionFlavour> {
flavour: RedisPoolFlavour<T>,
default_checkout_mode: DefaultPoolCheckoutMode,
retry_on_checkout_limit: bool,
}
impl RedisPool {
pub fn builder() -> Builder {
Builder::default()
}
}
impl<T: Poolable> RedisPool<T> {
pub fn no_pool() -> Self {
RedisPool {
flavour: RedisPoolFlavour::Empty,
default_checkout_mode: DefaultPoolCheckoutMode::Wait,
retry_on_checkout_limit: false,
}
}
pub fn check_out_default<'a>(
&'a self,
) -> impl Future<Output = Result<PoolConnection<T>, CheckoutError>> + 'a {
self.check_out(CheckoutMode::PoolDefault)
}
pub async fn check_out<M: Into<CheckoutMode>>(
&self,
mode: M,
) -> Result<PoolConnection<T>, CheckoutError> {
let constraint = pools::CheckoutConstraint::from_checkout_mode_and_pool_default(
mode,
self.default_checkout_mode,
);
let managed = match self.flavour {
RedisPoolFlavour::Single(ref pool) => {
pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
)
.await?
}
RedisPoolFlavour::PerNode(ref pool) => {
pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
)
.await?
}
RedisPoolFlavour::Empty => {
let err = CheckoutError::new(CheckoutErrorKind::NoPool);
return Err(err);
}
};
Ok(PoolConnection {
managed,
connection_state_ok: true,
})
}
pub fn connected_to(&self) -> Vec<String> {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => vec![pool.connected_to().to_string()],
RedisPoolFlavour::PerNode(ref pool) => pool.connected_to().to_vec(),
RedisPoolFlavour::Empty => vec![],
}
}
pub fn state(&self) -> PoolState {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => pool.state(),
RedisPoolFlavour::PerNode(ref pool) => pool.state(),
RedisPoolFlavour::Empty => PoolState::default(),
}
}
pub fn ping<TO: Into<Timeout>>(&self, timeout: TO) -> BoxFuture<Vec<Ping>> {
let deadline = timeout.into().0;
match self.flavour {
RedisPoolFlavour::Single(ref pool) => pool.ping(deadline).map(|p| vec![p]).boxed(),
RedisPoolFlavour::PerNode(ref pool) => pool.ping(deadline).boxed(),
RedisPoolFlavour::Empty => future::ready(vec![]).boxed(),
}
}
}
impl<T: Poolable> Clone for RedisPool<T> {
fn clone(&self) -> Self {
RedisPool {
flavour: self.flavour.clone(),
default_checkout_mode: self.default_checkout_mode,
retry_on_checkout_limit: self.retry_on_checkout_limit,
}
}
}
pub struct Timeout(pub Instant);
impl From<Instant> for Timeout {
fn from(at: Instant) -> Self {
Self(at)
}
}
impl From<Duration> for Timeout {
fn from(d: Duration) -> Self {
Self(Instant::now() + d)
}
}
#[derive(Debug)]
pub enum PingState {
Ok,
Failed(Box<dyn std::error::Error + Send>),
}
impl PingState {
pub fn failed<E: std::error::Error + Send + 'static>(err: E) -> Self {
Self::Failed(Box::new(err))
}
pub fn failed_msg<T: Into<String>>(msg: T) -> Self {
#[derive(Debug)]
struct PingError(String);
impl std::fmt::Display for PingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for PingError {
fn description(&self) -> &str {
"ping failed"
}
fn cause(&self) -> Option<&dyn std::error::Error> {
None
}
}
Self::failed(PingError(msg.into()))
}
}
#[derive(Debug)]
pub struct Ping {
pub connect_time: Option<Duration>,
pub latency: Option<Duration>,
pub total_time: Duration,
pub uri: String,
pub state: PingState,
}
impl Ping {
pub fn is_ok(&self) -> bool {
match self.state {
PingState::Ok => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
!self.is_ok()
}
}
#[derive(Debug, Clone, Copy)]
pub struct PoolState {
pub in_flight: usize,
pub connections: usize,
pub reservations: usize,
pub idle: usize,
pub pools: usize,
}
impl std::ops::Add for PoolState {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
in_flight: self.in_flight + other.in_flight,
reservations: self.reservations + other.reservations,
connections: self.connections + other.connections,
idle: self.idle + other.idle,
pools: self.pools + other.pools,
}
}
}
impl Default for PoolState {
fn default() -> Self {
Self {
in_flight: 0,
reservations: 0,
connections: 0,
idle: 0,
pools: 0,
}
}
}