use ntex::connect::{self, Address, Connect, Connector};
use ntex::service::{Pipeline, Service};
use ntex::{io::IoBoxed, time::Seconds, util::ByteString, util::PoolId, util::PoolRef};
use super::{cmd, errors::ConnectError, Client, SimpleClient};
pub struct RedisConnector<A, T> {
address: A,
connector: Pipeline<T>,
passwords: Vec<ByteString>,
pool: PoolRef,
}
impl<A> RedisConnector<A, ()>
where
A: Address + Clone,
{
#[allow(clippy::new_ret_no_self)]
pub fn new(address: A) -> RedisConnector<A, Connector<A>> {
RedisConnector {
address,
passwords: Vec::new(),
connector: Pipeline::new(Connector::default()),
pool: PoolId::P7.pool_ref(),
}
}
}
impl<A, T> RedisConnector<A, T>
where
A: Address + Clone,
{
pub fn password<U>(mut self, password: U) -> Self
where
U: AsRef<str>,
{
self.passwords
.push(ByteString::from(password.as_ref().to_string()));
self
}
pub fn memory_pool(mut self, id: PoolId) -> Self {
self.pool = id.pool_ref();
self
}
pub fn connector<U>(self, connector: U) -> RedisConnector<A, U>
where
U: Service<Connect<A>, Error = connect::ConnectError>,
IoBoxed: From<U::Response>,
{
RedisConnector {
connector: Pipeline::new(connector),
address: self.address,
passwords: self.passwords,
pool: self.pool,
}
}
}
impl<A, T> RedisConnector<A, T>
where
A: Address + Clone,
T: Service<Connect<A>, Error = connect::ConnectError>,
IoBoxed: From<T::Response>,
{
async fn _connect(&self) -> Result<IoBoxed, ConnectError> {
let io: IoBoxed = self
.connector
.call(Connect::new(self.address.clone()))
.await?
.into();
io.set_memory_pool(self.pool);
io.set_disconnect_timeout(Seconds::ZERO);
if self.passwords.is_empty() {
Ok(io)
} else {
let client = SimpleClient::new(io);
for password in &self.passwords {
if client.exec(cmd::Auth(password)).await? {
return Ok(client.into_inner());
}
}
Err(ConnectError::Unauthorized)
}
}
pub async fn connect(&self) -> Result<Client, ConnectError> {
self._connect().await.map(Client::new)
}
pub async fn connect_simple(&self) -> Result<SimpleClient, ConnectError> {
self._connect().await.map(SimpleClient::new)
}
}