1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use ntex::connect::{self, Address, Connect, Connector};
use ntex::service::{Pipeline, Service};
use ntex::{io::IoBoxed, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};

use super::{cmd, errors::ConnectError, Client, SimpleClient};

/// Redis connector
pub struct RedisConnector<A, T> {
    address: A,
    connector: Pipeline<T>,
    passwords: Vec<ByteString>,
    pool: PoolRef,
}

impl<A> RedisConnector<A, ()>
where
    A: Address + Clone,
{
    #[allow(clippy::new_ret_no_self)]
    /// Create new redis connector
    pub fn new(address: A) -> RedisConnector<A, Connector<A>> {
        RedisConnector {
            address,
            passwords: Vec::new(),
            connector: Pipeline::new(Connector::default()),
            pool: PoolId::P7.pool_ref(),
        }
    }
}

impl<A, T> RedisConnector<A, T>
where
    A: Address + Clone,
{
    /// Add redis auth password
    pub fn password<U>(mut self, password: U) -> Self
    where
        U: AsRef<str>,
    {
        self.passwords
            .push(ByteString::from(password.as_ref().to_string()));
        self
    }

    /// Set memory pool.
    ///
    /// Use specified memory pool for memory allocations. By default P7
    /// memory pool is used.
    pub fn memory_pool(mut self, id: PoolId) -> Self {
        self.pool = id.pool_ref();
        self
    }

    /// Use custom connector
    pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
    where
        U: Service<Connect<A>, Error = connect::ConnectError>,
        IoBoxed: From<U::Response>,
    {
        RedisConnector {
            connector: Pipeline::new(connector),
            address: self.address,
            passwords: self.passwords,
            pool: self.pool,
        }
    }
}

impl<A, T> RedisConnector<A, T>
where
    A: Address + Clone,
    T: Service<Connect<A>, Error = connect::ConnectError>,
    IoBoxed: From<T::Response>,
{
    async fn _connect(&self) -> Result<IoBoxed, ConnectError> {
        let io: IoBoxed = self
            .connector
            .call(Connect::new(self.address.clone()))
            .await?
            .into();
        io.set_memory_pool(self.pool);
        io.set_disconnect_timeout(Seconds::ZERO);

        if self.passwords.is_empty() {
            Ok(io)
        } else {
            let client = SimpleClient::new(io);

            for password in &self.passwords {
                if client.exec(cmd::Auth(password)).await? {
                    return Ok(client.into_inner());
                }
            }
            Err(ConnectError::Unauthorized)
        }
    }

    /// Connect to redis server and create shared client
    pub async fn connect(&self) -> Result<Client, ConnectError> {
        self._connect().await.map(Client::new)
    }

    /// Connect to redis server and create simple client
    pub async fn connect_simple(&self) -> Result<SimpleClient, ConnectError> {
        self._connect().await.map(SimpleClient::new)
    }
}