#![allow(dead_code)]
use std::sync::Arc;
use redis::aio::MultiplexedConnection;
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use crate::error::{Result, ShoveError};
use super::constants::DEFAULT_GROUP;
pub enum RedisMode {
Standalone { url: String },
Cluster { urls: Vec<String> },
}
pub struct RedisConfig {
pub mode: RedisMode,
pub group: Option<String>,
}
impl RedisConfig {
pub fn resolved_group(&self) -> &str {
self.group.as_deref().unwrap_or(DEFAULT_GROUP)
}
}
pub(crate) enum RedisConnection {
Standalone(MultiplexedConnection),
Cluster(ClusterConnection),
}
impl RedisConnection {
pub(crate) async fn query<T: redis::FromRedisValue + Send>(
&mut self,
cmd: &mut redis::Cmd,
) -> Result<T> {
match self {
RedisConnection::Standalone(conn) => cmd
.query_async(conn)
.await
.map_err(|e| ShoveError::Connection(e.to_string())),
RedisConnection::Cluster(conn) => cmd
.query_async(conn)
.await
.map_err(|e| ShoveError::Connection(e.to_string())),
}
}
}
enum ClientInner {
Standalone(redis::Client),
Cluster(ClusterClient),
}
#[derive(Clone)]
pub struct RedisClient {
inner: Arc<ClientInner>,
pub(super) group: String,
}
impl RedisClient {
pub(super) async fn connect(config: RedisConfig) -> Result<Self> {
let group = config.resolved_group().to_owned();
let inner = match config.mode {
RedisMode::Standalone { url } => {
let client = redis::Client::open(url.as_str())
.map_err(|e| ShoveError::Connection(e.to_string()))?;
client
.get_multiplexed_async_connection()
.await
.map_err(|e| ShoveError::Connection(format!("standalone ping failed: {e}")))?;
ClientInner::Standalone(client)
}
RedisMode::Cluster { ref urls } => {
if urls.is_empty() {
return Err(ShoveError::Connection(
"cluster URLs must not be empty".into(),
));
}
let nodes: Vec<&str> = urls.iter().map(String::as_str).collect();
let client =
ClusterClient::new(nodes).map_err(|e| ShoveError::Connection(e.to_string()))?;
client
.get_async_connection()
.await
.map_err(|e| ShoveError::Connection(format!("cluster ping failed: {e}")))?;
ClientInner::Cluster(client)
}
};
Ok(Self {
inner: Arc::new(inner),
group,
})
}
pub(super) async fn multiplexed_conn(&self) -> Result<RedisConnection> {
match self.inner.as_ref() {
ClientInner::Standalone(client) => client
.get_multiplexed_async_connection()
.await
.map(RedisConnection::Standalone)
.map_err(|e| ShoveError::Connection(e.to_string())),
ClientInner::Cluster(client) => client
.get_async_connection()
.await
.map(RedisConnection::Cluster)
.map_err(|e| ShoveError::Connection(e.to_string())),
}
}
pub(super) async fn dedicated_conn(&self) -> Result<RedisConnection> {
match self.inner.as_ref() {
ClientInner::Standalone(client) => client
.get_multiplexed_async_connection_with_config(
&redis::AsyncConnectionConfig::new().set_response_timeout(None),
)
.await
.map(RedisConnection::Standalone)
.map_err(|e| ShoveError::Connection(e.to_string())),
ClientInner::Cluster(client) => client
.get_async_connection()
.await
.map(RedisConnection::Cluster)
.map_err(|e| ShoveError::Connection(e.to_string())),
}
}
pub(super) fn group(&self) -> &str {
&self.group
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_default_group() {
let cfg = RedisConfig {
mode: RedisMode::Standalone {
url: "redis://127.0.0.1:6379/".to_string(),
},
group: None,
};
assert_eq!(cfg.resolved_group(), "shove");
}
#[test]
fn config_custom_group() {
let cfg = RedisConfig {
mode: RedisMode::Standalone {
url: "redis://127.0.0.1:6379/".to_string(),
},
group: Some("myapp".to_string()),
};
assert_eq!(cfg.resolved_group(), "myapp");
}
#[test]
fn standalone_url_preserved() {
let url = "rediss://user:pass@myhost:6380/".to_string();
let config = RedisConfig {
mode: RedisMode::Standalone { url: url.clone() },
group: None,
};
match config.mode {
RedisMode::Standalone { url: stored } => assert_eq!(stored, url),
_ => panic!("expected Standalone"),
}
}
#[test]
fn cluster_urls_preserved() {
let urls = vec![
"redis://node1:6379/".to_string(),
"redis://node2:6379/".to_string(),
"redis://node3:6379/".to_string(),
];
let config = RedisConfig {
mode: RedisMode::Cluster { urls: urls.clone() },
group: None,
};
match config.mode {
RedisMode::Cluster { urls: stored } => assert_eq!(stored, urls),
_ => panic!("expected Cluster"),
}
}
#[test]
fn resolved_group_empty_string_preserved() {
let cfg = RedisConfig {
mode: RedisMode::Standalone {
url: "redis://127.0.0.1:6379/".to_string(),
},
group: Some(String::new()),
};
assert_eq!(cfg.resolved_group(), "");
}
#[test]
fn redis_mode_standalone_variant_matches() {
let cfg = RedisConfig {
mode: RedisMode::Standalone {
url: "redis://localhost/".to_string(),
},
group: None,
};
assert!(matches!(cfg.mode, RedisMode::Standalone { .. }));
}
#[test]
fn redis_mode_cluster_variant_matches() {
let cfg = RedisConfig {
mode: RedisMode::Cluster {
urls: vec!["redis://node1/".to_string()],
},
group: None,
};
assert!(matches!(cfg.mode, RedisMode::Cluster { .. }));
}
}