use redis::{
Client, ErrorKind, RedisConnectionInfo, RedisError, RedisResult, cluster::ClusterClient,
};
#[derive(Clone)]
pub enum UniversalClient {
Client(Client),
Cluster(ClusterClient),
}
impl UniversalClient {
pub async fn get_connection(&self) -> RedisResult<UniversalConnection> {
match self {
Self::Client(cli) => cli
.get_multiplexed_async_connection()
.await
.map(UniversalConnection::Client),
Self::Cluster(cli) => cli
.get_async_connection()
.await
.map(|c| UniversalConnection::Cluster(Box::new(c))),
}
}
pub fn open<T: redis::IntoConnectionInfo + Clone>(
addrs: Vec<T>,
) -> RedisResult<UniversalClient> {
let mut addrs = addrs;
if addrs.is_empty() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"No address specified",
)));
}
if addrs.len() == 1 {
Client::open(addrs.remove(0)).map(Self::Client)
} else {
ClusterClient::new(addrs).map(Self::Cluster)
}
}
}
pub struct UniversalBuilder<T> {
addrs: Vec<T>,
cluster: bool,
username: Option<String>,
password: Option<String>,
}
impl<T> UniversalBuilder<T> {
pub fn new(addrs: Vec<T>) -> UniversalBuilder<T> {
UniversalBuilder {
addrs,
cluster: false,
username: None,
password: None,
}
}
pub fn cluster(mut self, flag: bool) -> UniversalBuilder<T> {
self.cluster = flag;
self
}
pub fn username(mut self, username: impl Into<String>) -> UniversalBuilder<T> {
self.username = Some(username.into());
self
}
pub fn password(mut self, password: impl Into<String>) -> UniversalBuilder<T> {
self.password = Some(password.into());
self
}
pub fn build(self) -> RedisResult<UniversalClient>
where
T: redis::IntoConnectionInfo + Clone,
{
let UniversalBuilder {
mut addrs,
cluster,
username,
password,
} = self;
if addrs.is_empty() {
return Err(RedisError::from((
ErrorKind::InvalidClientConfig,
"No address specified",
)));
}
if cluster {
let mut builder = ClusterClient::builder(addrs);
if let Some(u) = username {
builder = builder.username(u);
}
if let Some(p) = password {
builder = builder.password(p);
}
builder.build().map(UniversalClient::Cluster)
} else if username.is_some() || password.is_some() {
let conn_info = addrs.remove(0).into_connection_info()?;
let orig = conn_info.redis_settings();
let mut redis_info = RedisConnectionInfo::default()
.set_db(orig.db())
.set_protocol(orig.protocol());
if let Some(u) = username {
redis_info = redis_info.set_username(u);
}
if let Some(p) = password {
redis_info = redis_info.set_password(p);
}
let conn_info = conn_info.set_redis_settings(redis_info);
Client::open(conn_info).map(UniversalClient::Client)
} else {
Client::open(addrs.remove(0)).map(UniversalClient::Client)
}
}
}
#[derive(Clone)]
pub enum UniversalConnection {
Client(redis::aio::MultiplexedConnection),
Cluster(Box<redis::cluster_async::ClusterConnection>),
}
#[cfg(test)]
impl UniversalClient {
fn is_client(&self) -> bool {
matches!(self, Self::Client(_))
}
fn is_cluster(&self) -> bool {
matches!(self, Self::Cluster(_))
}
}
impl redis::aio::ConnectionLike for UniversalConnection {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
Self::Client(conn) => conn.req_packed_command(cmd),
Self::Cluster(conn) => conn.req_packed_command(cmd),
}
}
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
Self::Client(conn) => conn.req_packed_commands(cmd, offset, count),
Self::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
}
}
fn get_db(&self) -> i64 {
match self {
Self::Client(conn) => conn.get_db(),
Self::Cluster(conn) => conn.get_db(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_empty_addrs_error() {
let result = UniversalClient::open(Vec::<String>::new());
assert!(result.is_err());
}
#[test]
fn open_single_addr_is_client() {
let result = UniversalClient::open(vec!["redis://127.0.0.1:6379"]);
assert!(result.unwrap().is_client());
}
#[test]
fn open_multiple_addrs_is_cluster() {
let result =
UniversalClient::open(vec!["redis://127.0.0.1:7000", "redis://127.0.0.1:7001"]);
assert!(result.unwrap().is_cluster());
}
#[test]
fn builder_empty_addrs_error() {
let result = UniversalBuilder::new(Vec::<String>::new()).build();
assert!(result.is_err());
}
#[test]
fn builder_cluster_true_forces_cluster() {
let result = UniversalBuilder::new(vec!["redis://127.0.0.1:6379".to_string()])
.cluster(true)
.build();
assert!(result.unwrap().is_cluster());
}
#[test]
fn builder_cluster_false_uses_first_addr() {
let result = UniversalBuilder::new(vec![
"redis://127.0.0.1:7000".to_string(),
"redis://127.0.0.1:7001".to_string(),
])
.cluster(false)
.build();
assert!(result.unwrap().is_client());
}
#[test]
fn builder_with_password_is_client() {
let result = UniversalBuilder::new(vec!["redis://127.0.0.1:6379".to_string()])
.password("secret")
.build();
assert!(result.unwrap().is_client());
}
#[test]
fn builder_with_username_and_password_is_client() {
let result = UniversalBuilder::new(vec!["redis://127.0.0.1:6379".to_string()])
.username("alice")
.password("secret")
.build();
assert!(result.unwrap().is_client());
}
#[test]
fn builder_with_password_cluster_is_cluster() {
let result = UniversalBuilder::new(vec![
"redis://127.0.0.1:7000".to_string(),
"redis://127.0.0.1:7001".to_string(),
])
.password("secret")
.cluster(true)
.build();
assert!(result.unwrap().is_cluster());
}
}