1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
//! This is a simple asynchronous Redis Pool, currently only supports a connection pool to a Single Redis instance, and will probably provide Cluster support later.
//! If you want to understand how to use it, see examples and/or CiseauxSingle struct.
//!
//! The library currently supports tokio only (Because of redis-rs, async-std support is coming), and require at least Rust 1.39
//!
//! ```toml
//! [dependencies]
//! ciseaux_client = "0.3"
//! ```
//!
//! # Example
//!
//! Create a connection pool default settings and the provided redis::Client (from [redis-rs](https://crates.io/crates/redis))
//!
//! ```rust
//! use redis::{Client, Cmd};
//! use ciseaux_client::CiseauxSingle;
//!
//! #[tokio::main]
//! async fn main() {
//!     let redis_client = Client::open("redis://127.0.0.1:6379").unwrap();
//!     let db_pool = CiseauxSingle::new(redis_client).await.expect("Failed to create Pool");
//!     // You can safely share CiseauxSingle between threads
//!     // since it use Arcs and Mutexes under the hood)
//!     let res = match db_pool.query::<_, Option<String>>(&redis::Cmd::get("hello")).await {
//!         Ok(v) => v,
//!         Err(e) => return,// Handle Error
//!     };
//!     let hello = match res {
//!         Some(v) => v,
//!         None => return,// Handle Nil value
//!     };
//!     println!("{}", hello);
//! }
//! ```
//!

pub use redis;

mod single;
#[cfg(test)]
mod tests;
pub use single::CiseauxSingle;
pub use single::SingleInit;

use std::time::Duration;

const DEFAULT_CONNS_COUNT: ConnectionsCount = ConnectionsCount::Global(4);
const DEFAULT_RECONNECT_BEHAVIOR: ReconnectBehavior = ReconnectBehavior::InstantRetry;
const DEFAULT_WAIT_RETRY_DUR: Duration = Duration::from_secs(2);

/// To change the default pool size
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ConnectionsCount {
    /// This will define the entire pool size
    Global(usize),
    /// This will define the pool size using the provided usize * cpu cores (Including virtual ones)
    PerThread(usize),
}

impl std::default::Default for ConnectionsCount {
    fn default() -> Self {
        DEFAULT_CONNS_COUNT
    }
}

impl ConnectionsCount {
    fn into_flat(self) -> usize {
        match self {
            ConnectionsCount::Global(c) => c,
            ConnectionsCount::PerThread(c) => num_cpus::get() * c,
        }
    }
}

/// To change the default behavior of the pool on network/io error.
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum ReconnectBehavior {
    /// Will just return a RedisError, and the entire connection pool will not work after a disconnection, you should probably not use this
    NoReconnect,
    /// This is the default one, on error it will drop the previous connection and create a new one instantly, then retry the redis command, if it keeps failing, then it will just return a RedisError
    InstantRetry,
    /// Try to reconnect, but on fail, it will wait (2 seconds by default), then try again, if it keeps failing, then return a RedisError
    RetryWaitRetry(Option<Duration>),
}

impl std::default::Default for ReconnectBehavior {
    fn default() -> Self {
        DEFAULT_RECONNECT_BEHAVIOR
    }
}

#[async_trait::async_trait]
pub trait QueryAble {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError>;
}

#[async_trait::async_trait]
impl QueryAble for redis::Cmd {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}

#[async_trait::async_trait]
impl QueryAble for &redis::Cmd {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}

#[async_trait::async_trait]
impl QueryAble for &mut redis::Cmd {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}

#[async_trait::async_trait]
impl QueryAble for redis::Pipeline {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}

#[async_trait::async_trait]
impl QueryAble for &redis::Pipeline {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}

#[async_trait::async_trait]
impl QueryAble for &mut redis::Pipeline {
    async fn query<T: redis::FromRedisValue>(
        &self,
        conn: &mut redis::aio::Connection,
    ) -> Result<T, redis::RedisError> {
        self.query_async::<redis::aio::Connection, T>(conn).await
    }
}