1use ntex::connect::{self, Address, Connect, Connector};
2use ntex::service::{Pipeline, Service};
3use ntex::{io::IoBoxed, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};
4
5use super::{cmd, errors::ConnectError, Client, SimpleClient};
6
7pub struct RedisConnector<A, T> {
9 address: A,
10 connector: Pipeline<T>,
11 passwords: Vec<ByteString>,
12 pool: PoolRef,
13}
14
15impl<A> RedisConnector<A, ()>
16where
17 A: Address + Clone,
18{
19 #[allow(clippy::new_ret_no_self)]
20 pub fn new(address: A) -> RedisConnector<A, Connector<A>> {
22 RedisConnector {
23 address,
24 passwords: Vec::new(),
25 connector: Pipeline::new(Connector::default()),
26 pool: PoolId::P7.pool_ref(),
27 }
28 }
29}
30
31impl<A, T> RedisConnector<A, T>
32where
33 A: Address + Clone,
34{
35 pub fn password<U>(mut self, password: U) -> Self
37 where
38 U: AsRef<str>,
39 {
40 self.passwords
41 .push(ByteString::from(password.as_ref().to_string()));
42 self
43 }
44
45 pub fn memory_pool(mut self, id: PoolId) -> Self {
50 self.pool = id.pool_ref();
51 self
52 }
53
54 pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
56 where
57 U: Service<Connect<A>, Error = connect::ConnectError>,
58 IoBoxed: From<U::Response>,
59 {
60 RedisConnector {
61 connector: Pipeline::new(connector),
62 address: self.address,
63 passwords: self.passwords,
64 pool: self.pool,
65 }
66 }
67}
68
69impl<A, T> RedisConnector<A, T>
70where
71 A: Address + Clone,
72 T: Service<Connect<A>, Error = connect::ConnectError>,
73 IoBoxed: From<T::Response>,
74{
75 async fn _connect(&self) -> Result<IoBoxed, ConnectError> {
76 let io: IoBoxed = self
77 .connector
78 .call(Connect::new(self.address.clone()))
79 .await?
80 .into();
81 io.set_memory_pool(self.pool);
82 io.set_disconnect_timeout(Seconds::ZERO);
83
84 if self.passwords.is_empty() {
85 Ok(io)
86 } else {
87 let client = SimpleClient::new(io);
88
89 for password in &self.passwords {
90 if client.exec(cmd::Auth(password)).await? {
91 return Ok(client.into_inner());
92 }
93 }
94 Err(ConnectError::Unauthorized)
95 }
96 }
97
98 pub async fn connect(&self) -> Result<Client, ConnectError> {
100 self._connect().await.map(Client::new)
101 }
102
103 pub async fn connect_simple(&self) -> Result<SimpleClient, ConnectError> {
105 self._connect().await.map(SimpleClient::new)
106 }
107}