#[cfg(feature = "redis")]
use deadpool_redis::{
redis::{cmd, AsyncCommands, RedisResult, Value},
Config, Pool, Runtime,
};
use std::sync::Arc;
use super::{RedisConfig, RedisError, RedisMode, Result};
#[derive(Clone)]
#[cfg(feature = "redis")]
pub struct RedisClient {
mode: RedisMode,
pool: Arc<Pool>,
}
#[cfg(feature = "redis")]
impl RedisClient {
pub async fn new(config: RedisConfig) -> Result<Self> {
config.validate()?;
let pool = match config.mode {
RedisMode::Standalone => Self::create_standalone_pool(&config).await?,
RedisMode::Sentinel => Self::create_sentinel_pool(&config).await?,
RedisMode::Cluster => Self::create_cluster_pool(&config).await?,
};
Ok(Self {
mode: config.mode,
pool: Arc::new(pool),
})
}
pub async fn get_conn(&self) -> Result<deadpool_redis::Connection> {
self.pool
.get()
.await
.map_err(|e| RedisError::Connection(e.to_string()))
}
async fn create_standalone_pool(config: &RedisConfig) -> Result<Pool> {
let node = config.node.as_ref().unwrap();
let mut url = format!("redis://{}:{}", node.host, node.port);
if let Some(password) = &config.password {
url = format!("redis://:{}@{}:{}", password, node.host, node.port);
}
if let Some(database) = config.database {
url = format!("{}/{}", url, database);
}
let cfg = Config::from_url(url);
let pool = cfg
.builder()
.map_err(|e| RedisError::Connection(e.to_string()))?
.max_size(config.pool.max_connections as usize)
.runtime(Runtime::Tokio1)
.build()
.map_err(|e| RedisError::Connection(e.to_string()))?;
Ok(pool)
}
async fn create_sentinel_pool(config: &RedisConfig) -> Result<Pool> {
let sentinel = config.sentinel.as_ref().unwrap();
let mut url = format!(
"redis://{}:{}",
sentinel.nodes[0].host, sentinel.nodes[0].port
);
if let Some(password) = &sentinel.password {
url = format!(
"redis://:{}@{}:{}",
password, sentinel.nodes[0].host, sentinel.nodes[0].port
);
}
if let Some(database) = config.database {
url = format!("{}/{}", url, database);
}
let cfg = Config::from_url(url);
let pool = cfg
.builder()
.map_err(|e| RedisError::Sentinel(e.to_string()))?
.max_size(config.pool.max_connections as usize)
.runtime(Runtime::Tokio1)
.build()
.map_err(|e| RedisError::Connection(e.to_string()))?;
Ok(pool)
}
async fn create_cluster_pool(config: &RedisConfig) -> Result<Pool> {
let cluster = config.cluster.as_ref().unwrap();
let mut url = format!(
"redis://{}:{}",
cluster.nodes[0].host, cluster.nodes[0].port
);
if let Some(password) = &config.password {
url = format!(
"redis://:{}@{}:{}",
password, cluster.nodes[0].host, cluster.nodes[0].port
);
}
if let Some(database) = config.database {
url = format!("{}/{}", url, database);
}
let cfg = Config::from_url(url);
let pool = cfg
.builder()
.map_err(|e| RedisError::Cluster(e.to_string()))?
.max_size(config.pool.max_connections as usize)
.runtime(Runtime::Tokio1)
.build()
.map_err(|e| RedisError::Connection(e.to_string()))?;
Ok(pool)
}
pub async fn set<T: ToString>(&self, key: &str, value: T) -> Result<()> {
let mut conn = self.get_conn().await?;
conn.set(key, value.to_string())
.await
.map_err(|e| RedisError::Redis(e))
}
pub async fn get(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.get_conn().await?;
conn.get(key).await.map_err(|e| RedisError::Redis(e))
}
pub async fn del(&self, key: &str) -> Result<bool> {
let mut conn = self.get_conn().await?;
let result: i64 = conn.del(key).await.map_err(|e| RedisError::Redis(e))?;
Ok(result > 0)
}
pub async fn keys(&self, pattern: &str) -> Result<Vec<String>> {
let mut conn = self.get_conn().await?;
let keys: Vec<String> = conn.keys(pattern).await.map_err(|e| RedisError::Redis(e))?;
Ok(keys)
}
pub async fn execute<T>(&self, command: &str, args: &[&str]) -> Result<T>
where
T: TryFrom<Value>,
T::Error: std::error::Error + Send + Sync + 'static,
{
let mut conn = self.get_conn().await?;
let mut cmd = cmd(command);
for arg in args {
cmd.arg(arg);
}
let result: RedisResult<Value> = cmd.query_async(&mut *conn).await;
match result {
Ok(value) => T::try_from(value).map_err(|e| RedisError::Operation(e.to_string())),
Err(e) => Err(RedisError::Redis(e)),
}
}
pub fn mode(&self) -> RedisMode {
self.mode
}
pub fn is_cluster(&self) -> bool {
self.mode == RedisMode::Cluster
}
pub fn is_sentinel(&self) -> bool {
self.mode == RedisMode::Sentinel
}
pub fn is_standalone(&self) -> bool {
self.mode == RedisMode::Standalone
}
pub async fn clear(&self) -> Result<()> {
let mut conn = self.get_conn().await?;
cmd("FLUSHDB")
.query_async(&mut *conn)
.await
.map_err(|e| RedisError::Redis(e))
}
}
#[cfg(all(test, feature = "redis"))]
mod tests {
use crate::{PoolConfig, RedisNode};
use super::*;
#[tokio::test]
async fn test_redis_basic_operations() {
let config = RedisConfig {
mode: RedisMode::Standalone,
node: Some(RedisNode {
host: "localhost".to_string(),
port: 6379,
}),
sentinel: None,
cluster: None,
pool: PoolConfig::default(),
password: Some("1qaz!QAZ".to_string()),
database: None,
};
let client = RedisClient::new(config).await.unwrap();
client.set("test_key", "test_value").await.unwrap();
let value = client.get("test_key").await.unwrap();
assert_eq!(value, Some("test_value".to_string()));
let keys = client.keys("*").await.unwrap();
assert!(keys.contains(&"test_key".to_string()));
let deleted = client.del("test_key").await.unwrap();
assert!(deleted);
let value = client.get("test_key").await.unwrap();
assert_eq!(value, None);
}
}