use std::time::{Duration, Instant};
use futures::{
future::{self, Future},
try_ready, Async, Poll,
};
use crate::config::Builder;
use crate::config::DefaultPoolCheckoutMode;
use crate::pools::pool_internal::CheckoutManaged;
pub mod config;
pub mod instrumentation;
pub use crate::error::{CheckoutError, CheckoutErrorKind};
pub use commands::Commands;
pub use pool_connection::{ConnectionFlavour, PoolConnection};
pub mod connection_factory;
pub(crate) mod executor_flavour;
pub(crate) mod helpers;
mod activation_order;
mod backoff_strategy;
mod commands;
mod error;
mod pool_connection;
mod pools;
mod redis_rs;
pub trait Poolable: Send + Sized + 'static {
fn connected_to(&self) -> &str;
}
pub struct Checkout<T: Poolable = ConnectionFlavour>(CheckoutManaged<T>);
impl<T: Poolable> Checkout<T> {
pub fn error<E: Into<CheckoutError>>(err: E) -> Self {
Checkout(CheckoutManaged::error(err))
}
}
impl<T: Poolable> Future for Checkout<T> {
type Item = PoolConnection<T>;
type Error = CheckoutError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let managed = try_ready!(self.0.poll());
Ok(Async::Ready(PoolConnection {
managed,
connection_state_ok: true,
}))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckoutMode {
Immediately,
Wait,
PoolDefault,
Until(Instant),
}
impl CheckoutMode {
pub fn is_deadline_elapsed(self) -> bool {
match self {
CheckoutMode::Until(deadline) => deadline < Instant::now(),
_ => false,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Immediately;
#[derive(Debug, Clone, Copy)]
pub struct Wait;
#[derive(Debug, Clone, Copy)]
pub struct PoolDefault;
impl From<Immediately> for CheckoutMode {
fn from(_: Immediately) -> Self {
CheckoutMode::Immediately
}
}
impl From<Wait> for CheckoutMode {
fn from(_: Wait) -> Self {
CheckoutMode::Wait
}
}
impl From<PoolDefault> for CheckoutMode {
fn from(_: PoolDefault) -> Self {
CheckoutMode::PoolDefault
}
}
impl Default for CheckoutMode {
fn default() -> Self {
CheckoutMode::PoolDefault
}
}
impl From<Duration> for CheckoutMode {
fn from(d: Duration) -> Self {
let timeout = Instant::now() + d;
timeout.into()
}
}
impl From<Instant> for CheckoutMode {
fn from(until: Instant) -> Self {
CheckoutMode::Until(until)
}
}
enum RedisPoolFlavour<T: Poolable> {
Empty,
Single(pools::SinglePool<T>),
PerNode(pools::PoolPerNode<T>),
}
impl<T: Poolable> Clone for RedisPoolFlavour<T> {
fn clone(&self) -> Self {
use RedisPoolFlavour::*;
match self {
Empty => Empty,
Single(pool) => Single(pool.clone()),
PerNode(pool) => PerNode(pool.clone()),
}
}
}
pub struct RedisPool<T: Poolable = ConnectionFlavour> {
flavour: RedisPoolFlavour<T>,
default_checkout_mode: DefaultPoolCheckoutMode,
retry_on_checkout_limit: bool,
}
impl RedisPool {
pub fn builder() -> Builder {
Builder::default()
}
}
impl<T: Poolable> RedisPool<T> {
pub fn no_pool() -> Self {
RedisPool {
flavour: RedisPoolFlavour::Empty,
default_checkout_mode: DefaultPoolCheckoutMode::Wait,
retry_on_checkout_limit: false,
}
}
pub fn check_out_default(&self) -> Checkout<T> {
self.check_out(CheckoutMode::PoolDefault)
}
pub fn check_out<M: Into<CheckoutMode>>(&self, mode: M) -> Checkout<T> {
let constraint = pools::CheckoutConstraint::from_checkout_mode_and_pool_default(
mode,
self.default_checkout_mode,
);
match self.flavour {
RedisPoolFlavour::Single(ref pool) => {
Checkout(pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
))
}
RedisPoolFlavour::PerNode(ref pool) => {
Checkout(pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
))
}
RedisPoolFlavour::Empty => Checkout(CheckoutManaged::new(future::err(
CheckoutError::new(CheckoutErrorKind::NoPool),
))),
}
}
pub fn connected_to(&self) -> Vec<String> {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => vec![pool.connected_to().to_string()],
RedisPoolFlavour::PerNode(ref pool) => pool.connected_to().to_vec(),
RedisPoolFlavour::Empty => vec![],
}
}
pub fn state(&self) -> PoolState {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => pool.state(),
RedisPoolFlavour::PerNode(ref pool) => pool.state(),
RedisPoolFlavour::Empty => PoolState::default(),
}
}
pub fn ping<TO: Into<Timeout>>(
&self,
timeout: TO,
) -> impl Future<Item = Vec<Ping>, Error = ()> + Send {
let deadline = timeout.into().0;
match self.flavour {
RedisPoolFlavour::Single(ref pool) => Box::new(pool.ping(deadline).map(|p| vec![p]))
as Box<dyn Future<Item = _, Error = ()> + Send>,
RedisPoolFlavour::PerNode(ref pool) => Box::new(pool.ping(deadline)),
RedisPoolFlavour::Empty => Box::new(future::ok(vec![])),
}
}
}
impl<T: Poolable> Clone for RedisPool<T> {
fn clone(&self) -> Self {
RedisPool {
flavour: self.flavour.clone(),
default_checkout_mode: self.default_checkout_mode,
retry_on_checkout_limit: self.retry_on_checkout_limit,
}
}
}
pub struct Timeout(pub Instant);
impl From<Instant> for Timeout {
fn from(at: Instant) -> Self {
Self(at)
}
}
impl From<Duration> for Timeout {
fn from(d: Duration) -> Self {
Self(Instant::now() + d)
}
}
#[derive(Debug)]
pub enum PingState {
Ok,
Failed(Box<dyn std::error::Error + Send>),
}
impl PingState {
pub fn failed<E: std::error::Error + Send + 'static>(err: E) -> Self {
Self::Failed(Box::new(err))
}
pub fn failed_msg<T: Into<String>>(msg: T) -> Self {
#[derive(Debug)]
struct PingError(String);
impl std::fmt::Display for PingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for PingError {
fn description(&self) -> &str {
"ping failed"
}
fn cause(&self) -> Option<&dyn std::error::Error> {
None
}
}
Self::failed(PingError(msg.into()))
}
}
#[derive(Debug)]
pub struct Ping {
pub connect_time: Option<Duration>,
pub latency: Option<Duration>,
pub total_time: Duration,
pub uri: String,
pub state: PingState,
}
impl Ping {
pub fn is_ok(&self) -> bool {
match self.state {
PingState::Ok => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
!self.is_ok()
}
}
#[derive(Debug, Clone, Copy)]
pub struct PoolState {
pub in_flight: usize,
pub connections: usize,
pub reservations: usize,
pub idle: usize,
pub pools: usize,
}
impl std::ops::Add for PoolState {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
in_flight: self.in_flight + other.in_flight,
reservations: self.reservations + other.reservations,
connections: self.connections + other.connections,
idle: self.idle + other.idle,
pools: self.pools + other.pools,
}
}
}
impl Default for PoolState {
fn default() -> Self {
Self {
in_flight: 0,
reservations: 0,
connections: 0,
idle: 0,
pools: 0,
}
}
}