ntex_redis/
connector.rs

1use ntex::connect::{self, Address, Connect, Connector};
2use ntex::service::{Pipeline, Service};
3use ntex::{io::IoBoxed, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};
4
5use super::{cmd, errors::ConnectError, Client, SimpleClient};
6
7/// Redis connector
8pub struct RedisConnector<A, T> {
9    address: A,
10    connector: Pipeline<T>,
11    passwords: Vec<ByteString>,
12    pool: PoolRef,
13}
14
15impl<A> RedisConnector<A, ()>
16where
17    A: Address + Clone,
18{
19    #[allow(clippy::new_ret_no_self)]
20    /// Create new redis connector
21    pub fn new(address: A) -> RedisConnector<A, Connector<A>> {
22        RedisConnector {
23            address,
24            passwords: Vec::new(),
25            connector: Pipeline::new(Connector::default()),
26            pool: PoolId::P7.pool_ref(),
27        }
28    }
29}
30
31impl<A, T> RedisConnector<A, T>
32where
33    A: Address + Clone,
34{
35    /// Add redis auth password
36    pub fn password<U>(mut self, password: U) -> Self
37    where
38        U: AsRef<str>,
39    {
40        self.passwords
41            .push(ByteString::from(password.as_ref().to_string()));
42        self
43    }
44
45    /// Set memory pool.
46    ///
47    /// Use specified memory pool for memory allocations. By default P7
48    /// memory pool is used.
49    pub fn memory_pool(mut self, id: PoolId) -> Self {
50        self.pool = id.pool_ref();
51        self
52    }
53
54    /// Use custom connector
55    pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
56    where
57        U: Service<Connect<A>, Error = connect::ConnectError>,
58        IoBoxed: From<U::Response>,
59    {
60        RedisConnector {
61            connector: Pipeline::new(connector),
62            address: self.address,
63            passwords: self.passwords,
64            pool: self.pool,
65        }
66    }
67}
68
69impl<A, T> RedisConnector<A, T>
70where
71    A: Address + Clone,
72    T: Service<Connect<A>, Error = connect::ConnectError>,
73    IoBoxed: From<T::Response>,
74{
75    async fn _connect(&self) -> Result<IoBoxed, ConnectError> {
76        let io: IoBoxed = self
77            .connector
78            .call(Connect::new(self.address.clone()))
79            .await?
80            .into();
81        io.set_memory_pool(self.pool);
82        io.set_disconnect_timeout(Seconds::ZERO);
83
84        if self.passwords.is_empty() {
85            Ok(io)
86        } else {
87            let client = SimpleClient::new(io);
88
89            for password in &self.passwords {
90                if client.exec(cmd::Auth(password)).await? {
91                    return Ok(client.into_inner());
92                }
93            }
94            Err(ConnectError::Unauthorized)
95        }
96    }
97
98    /// Connect to redis server and create shared client
99    pub async fn connect(&self) -> Result<Client, ConnectError> {
100        self._connect().await.map(Client::new)
101    }
102
103    /// Connect to redis server and create simple client
104    pub async fn connect_simple(&self) -> Result<SimpleClient, ConnectError> {
105        self._connect().await.map(SimpleClient::new)
106    }
107}