1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use crate::{ConnectionsCount, QueryAble, ReconnectBehavior};

use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use tokio::sync::{Mutex, MutexGuard};

use redis::RedisError;

/// An Init Struct to create a customized CiseauxSingle connections pool.
/// This is like a Builder, but using public fields instead of functions
#[derive(Debug)]
pub struct SingleInit {
    /// The redis-rs Client CiseauxSingle will use
    pub client: redis::Client,
    /// By default, 4 connections per Thread
    pub conns_count: ConnectionsCount,
    /// By default, Instant Retry
    pub reconnect_behavior: ReconnectBehavior,
}

impl SingleInit {
    /// This create a SingleInit with default settings and the provided redis::Client
    pub fn new(client: redis::Client) -> SingleInit {
        SingleInit {
            client,
            conns_count: ConnectionsCount::default(),
            reconnect_behavior: ReconnectBehavior::default(),
        }
    }

    /// Like SingleInit::new, but also opens a redis::Client on localhost (With redis default port : 6379)
    pub fn default_localhost() -> SingleInit {
        SingleInit {
            client: redis::Client::open("redis://127.0.0.1:6379").unwrap(), // Unwrap is OK since client open doesn't connect, but only checks URL Validity.
            conns_count: ConnectionsCount::default(),
            reconnect_behavior: ReconnectBehavior::default(),
        }
    }

    /// Asynchronously creates multiple connexions to a Redis instance
    pub async fn build(self) -> Result<CiseauxSingle, RedisError> {
        let conns_count = self.conns_count.into_flat();
        let mut conns_fut = Vec::with_capacity(conns_count);
        for _ in 0..conns_count {
            conns_fut.push(self.client.get_async_connection());
        }
        let mut conns = Vec::with_capacity(conns_count);
        for c in futures::future::join_all(conns_fut).await {
            conns.push(Mutex::new(c?));
        }
        Ok(CiseauxSingle {
            client: Arc::new(self.client),
            reconnect_behavior: self.reconnect_behavior,
            conns: Arc::new(conns),
            next: Arc::new(AtomicUsize::new(0)),
        })
    }
}

/// A connections pool to a single Redis instance
#[derive(Clone)]
pub struct CiseauxSingle {
    client: Arc<redis::Client>,
    reconnect_behavior: ReconnectBehavior,
    conns: Arc<Vec<Mutex<redis::aio::Connection>>>,
    next: Arc<AtomicUsize>,
}

impl CiseauxSingle {
    /// Shortcut to SingleInit::new
    pub fn builder(client: redis::Client) -> SingleInit {
        SingleInit::new(client)
    }

    /// Shortcut to SingleInit::new
    pub fn init(client: redis::Client) -> SingleInit {
        SingleInit::new(client)
    }

    /// Create a new pool using defaults settings
    pub async fn new(client: redis::Client) -> Result<CiseauxSingle, RedisError> {
        SingleInit::new(client).build().await
    }

    /// Asynchronously query QueryAble (trait, implemented for redis::Cmd and redis::Pipeline),
    /// but in case of network error, will try to reconnect once to the same database (by default),
    /// or follow the reconnect_behavior you provided
    pub async fn query<C: QueryAble, T: redis::FromRedisValue>(
        &self,
        cmd: C,
    ) -> Result<T, RedisError> {
        let mut conn = self.conns[self.next.fetch_add(1, Ordering::AcqRel) % self.conns.len()]
            .lock()
            .await;
        match cmd.query::<T>(&mut conn).await {
            Ok(v) => Ok(v),
            Err(e) => {
                if is_network_or_io_error(&e) {
                    if self.reconnect_behavior == ReconnectBehavior::NoReconnect {
                        return Err(e);
                    }
                    return self.retry_cmd(&mut conn, cmd).await;
                }
                return Err(e);
            }
        }
    }

    #[inline(always)]
    async fn try_reconnect<'a>(
        &self,
        conn: &mut MutexGuard<'a, redis::aio::Connection>,
    ) -> Result<(), RedisError> {
        match self.client.get_async_connection().await {
            Ok(c) => {
                **conn = c;
                Ok(())
            }
            Err(e) => Err(e),
        }
    }

    #[inline(always)]
    async fn retry_cmd<'a, C: QueryAble, T: redis::FromRedisValue>(
        &self,
        conn: &mut MutexGuard<'a, redis::aio::Connection>,
        cmd: C,
    ) -> Result<T, RedisError> {
        match self.try_reconnect(conn).await {
            Ok(()) => return cmd.query::<T>(conn).await,
            Err(e) => {
                if is_network_or_io_error(&e) {
                    match self.reconnect_behavior {
                        ReconnectBehavior::RetryWaitRetry(d) => {
                            tokio::time::delay_for(d.unwrap_or(crate::DEFAULT_WAIT_RETRY_DUR))
                                .await;
                            self.try_reconnect(conn).await?;
                            return cmd.query::<T>(conn).await;
                        }
                        _ => return Err(e),
                    }
                }
                return Err(e);
            }
        }
    }
}

fn is_network_or_io_error(error: &RedisError) -> bool {
    if error.is_timeout() || error.is_connection_dropped() || error.is_io_error() {
        return true;
    }
    false
}