use std::time::{Duration, Instant};
use futures::prelude::*;
use crate::config::{Builder, DefaultCommandTimeout, DefaultPoolCheckoutMode};
pub mod config;
pub mod instrumentation;
use future::BoxFuture;
pub use redis::{
aio::ConnectionLike, cmd, AsyncCommands, Cmd, FromRedisValue, NumericBehavior, RedisError,
RedisFuture, ToRedisArgs, Value,
};
pub use crate::error::{CheckoutError, CheckoutErrorKind};
pub use pool_connection::{ConnectionFlavour, PoolConnection};
pub use redis_ops::RedisOps;
pub mod connection_factory;
pub mod error;
pub(crate) mod executor_flavour;
pub(crate) mod helpers;
mod activation_order;
mod backoff_strategy;
mod pool_connection;
mod pools;
mod redis_ops;
mod redis_rs;
pub use redis;
pub trait Poolable: Send + Sized + 'static {
fn connected_to(&self) -> &str;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CheckoutMode {
Immediately,
Wait,
PoolDefault,
Until(Instant),
}
impl CheckoutMode {
pub fn is_deadline_elapsed(self) -> bool {
match self {
CheckoutMode::Until(deadline) => deadline < Instant::now(),
_ => false,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Immediately;
#[derive(Debug, Clone, Copy)]
pub struct Wait;
#[derive(Debug, Clone, Copy)]
pub struct Millis(pub u64);
#[derive(Debug, Clone, Copy)]
pub struct Seconds(pub u64);
#[derive(Debug, Clone, Copy)]
pub struct PoolDefault;
impl From<Immediately> for CheckoutMode {
fn from(_: Immediately) -> Self {
CheckoutMode::Immediately
}
}
impl From<Wait> for CheckoutMode {
fn from(_: Wait) -> Self {
CheckoutMode::Wait
}
}
impl From<PoolDefault> for CheckoutMode {
fn from(_: PoolDefault) -> Self {
CheckoutMode::PoolDefault
}
}
impl Default for CheckoutMode {
fn default() -> Self {
CheckoutMode::PoolDefault
}
}
impl From<Duration> for CheckoutMode {
fn from(d: Duration) -> Self {
let timeout = Instant::now() + d;
timeout.into()
}
}
impl From<Millis> for CheckoutMode {
fn from(d: Millis) -> Self {
let timeout = Instant::now() + d.into();
timeout.into()
}
}
impl From<Seconds> for CheckoutMode {
fn from(d: Seconds) -> Self {
let timeout = Instant::now() + d.into();
timeout.into()
}
}
impl From<Instant> for CheckoutMode {
fn from(until: Instant) -> Self {
CheckoutMode::Until(until)
}
}
impl From<Duration> for Seconds {
fn from(d: Duration) -> Self {
Seconds(d.as_secs())
}
}
impl From<Seconds> for Duration {
fn from(v: Seconds) -> Self {
Duration::from_secs(v.0)
}
}
impl From<Millis> for Duration {
fn from(v: Millis) -> Self {
Duration::from_millis(v.0)
}
}
enum RedisPoolFlavour<T: Poolable> {
Empty,
Single(pools::SinglePool<T>),
PerNode(pools::PoolPerNode<T>),
}
impl<T: Poolable> Clone for RedisPoolFlavour<T> {
fn clone(&self) -> Self {
use RedisPoolFlavour::*;
match self {
Empty => Empty,
Single(pool) => Single(pool.clone()),
PerNode(pool) => PerNode(pool.clone()),
}
}
}
pub struct RedisPool<T: Poolable = ConnectionFlavour> {
flavour: RedisPoolFlavour<T>,
default_checkout_mode: DefaultPoolCheckoutMode,
retry_on_checkout_limit: bool,
default_command_timeout: DefaultCommandTimeout,
}
impl RedisPool {
pub fn builder() -> Builder {
Builder::default()
}
}
impl<T: Poolable> RedisPool<T> {
pub fn no_pool() -> Self {
RedisPool {
flavour: RedisPoolFlavour::Empty,
default_checkout_mode: DefaultPoolCheckoutMode::default(),
retry_on_checkout_limit: false,
default_command_timeout: DefaultCommandTimeout::default(),
}
}
pub fn check_out_default<'a>(
&'a self,
) -> impl Future<Output = Result<PoolConnection<T>, CheckoutError>> + 'a {
self.check_out(CheckoutMode::PoolDefault)
}
pub async fn check_out<M: Into<CheckoutMode>>(
&self,
mode: M,
) -> Result<PoolConnection<T>, CheckoutError> {
let constraint = pools::CheckoutConstraint::from_checkout_mode_and_pool_default(
mode,
self.default_checkout_mode,
);
let managed = match self.flavour {
RedisPoolFlavour::Single(ref pool) => {
pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
)
.await?
}
RedisPoolFlavour::PerNode(ref pool) => {
pools::check_out_maybe_retry_on_queue_limit_reached(
pool,
constraint,
self.retry_on_checkout_limit,
)
.await?
}
RedisPoolFlavour::Empty => {
let err = CheckoutError::new(CheckoutErrorKind::NoPool);
return Err(err);
}
};
Ok(PoolConnection {
managed: Some(managed),
connection_state_ok: true,
command_timeout: self.default_command_timeout.to_duration_opt(),
})
}
pub fn default_command_timeout<TO: Into<DefaultCommandTimeout>>(
&self,
default_command_timeout: TO,
) -> Self {
let mut me = self.clone();
me.default_command_timeout = default_command_timeout.into();
me
}
pub fn connected_to(&self) -> Vec<String> {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => vec![pool.connected_to().to_string()],
RedisPoolFlavour::PerNode(ref pool) => pool.connected_to().to_vec(),
RedisPoolFlavour::Empty => vec![],
}
}
pub fn state(&self) -> PoolState {
match self.flavour {
RedisPoolFlavour::Single(ref pool) => pool.state(),
RedisPoolFlavour::PerNode(ref pool) => pool.state(),
RedisPoolFlavour::Empty => PoolState::default(),
}
}
pub fn ping_nodes<TO: Into<Timeout>>(&self, timeout: TO) -> BoxFuture<Vec<Ping>> {
let deadline = timeout.into().0;
match self.flavour {
RedisPoolFlavour::Single(ref pool) => pool.ping(deadline).map(|p| vec![p]).boxed(),
RedisPoolFlavour::PerNode(ref pool) => pool.ping(deadline).boxed(),
RedisPoolFlavour::Empty => future::ready(vec![]).boxed(),
}
}
}
impl<T: Poolable> Clone for RedisPool<T> {
fn clone(&self) -> Self {
RedisPool {
flavour: self.flavour.clone(),
default_checkout_mode: self.default_checkout_mode,
retry_on_checkout_limit: self.retry_on_checkout_limit,
default_command_timeout: self.default_command_timeout,
}
}
}
pub struct Timeout(pub Instant);
impl From<Instant> for Timeout {
fn from(at: Instant) -> Self {
Self(at)
}
}
impl From<Duration> for Timeout {
fn from(d: Duration) -> Self {
Self(Instant::now() + d)
}
}
#[derive(Debug)]
pub enum PingState {
Ok,
Failed(Box<dyn std::error::Error + Send>),
}
impl PingState {
pub fn failed<E: std::error::Error + Send + 'static>(err: E) -> Self {
Self::Failed(Box::new(err))
}
pub fn failed_msg<T: Into<String>>(msg: T) -> Self {
#[derive(Debug)]
struct PingError(String);
impl std::fmt::Display for PingError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for PingError {
fn description(&self) -> &str {
"ping failed"
}
fn cause(&self) -> Option<&dyn std::error::Error> {
None
}
}
Self::failed(PingError(msg.into()))
}
}
#[derive(Debug)]
pub struct Ping {
pub connect_time: Option<Duration>,
pub latency: Option<Duration>,
pub total_time: Duration,
pub uri: String,
pub state: PingState,
}
impl Ping {
pub fn is_ok(&self) -> bool {
match self.state {
PingState::Ok => true,
_ => false,
}
}
pub fn is_failed(&self) -> bool {
!self.is_ok()
}
}
#[derive(Debug, Clone, Copy)]
pub struct PoolState {
pub in_flight: usize,
pub connections: usize,
pub reservations: usize,
pub idle: usize,
pub pools: usize,
}
impl std::ops::Add for PoolState {
type Output = Self;
fn add(self, other: Self) -> Self {
Self {
in_flight: self.in_flight + other.in_flight,
reservations: self.reservations + other.reservations,
connections: self.connections + other.connections,
idle: self.idle + other.idle,
pools: self.pools + other.pools,
}
}
}
impl Default for PoolState {
fn default() -> Self {
Self {
in_flight: 0,
reservations: 0,
connections: 0,
idle: 0,
pools: 0,
}
}
}
impl ConnectionLike for RedisPool {
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
async move {
let mut conn = self.check_out_default().await?;
conn.req_packed_command(cmd).await
}
.boxed()
}
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> RedisFuture<'a, Vec<Value>> {
async move {
let mut conn = self.check_out_default().await?;
conn.req_packed_commands(cmd, offset, count).await
}
.boxed()
}
fn get_db(&self) -> i64 {
-1
}
}