#![allow(dead_code)]
use crate::config::RedisConfig;
use crate::utils::error::{GatewayError, Result};
use redis::{AsyncCommands, Client, RedisResult, aio::MultiplexedConnection};
use std::collections::HashMap;
use futures::StreamExt;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct RedisPool {
client: Client,
connection_manager: MultiplexedConnection,
config: RedisConfig,
}
pub struct RedisConnection {
conn: MultiplexedConnection,
}
pub struct Subscription {
pubsub: redis::aio::PubSub,
}
impl RedisPool {
pub async fn new(config: &RedisConfig) -> Result<Self> {
info!("Creating Redis connection pool");
debug!("Redis URL: {}", Self::sanitize_url(&config.url));
let client = Client::open(config.url.as_str()).map_err(|e| GatewayError::Redis(e))?;
let connection_manager = client
.get_multiplexed_async_connection()
.await
.map_err(|e| GatewayError::Redis(e))?;
info!("Redis connection pool created successfully");
Ok(Self {
client,
connection_manager,
config: config.clone(),
})
}
pub async fn get_connection(&self) -> Result<RedisConnection> {
Ok(RedisConnection {
conn: self.connection_manager.clone(),
})
}
pub async fn health_check(&self) -> Result<()> {
debug!("Performing Redis health check");
let mut conn = self.get_connection().await?;
let _: String = redis::cmd("PING")
.query_async(&mut conn.conn)
.await
.map_err(|e| GatewayError::Redis(e))?;
debug!("Redis health check passed");
Ok(())
}
pub async fn close(&self) -> Result<()> {
info!("Closing Redis connection pool");
info!("Redis connection pool closed");
Ok(())
}
pub async fn get(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.get_connection().await?;
let result: RedisResult<String> = conn.conn.get(key).await;
match result {
Ok(value) => Ok(Some(value)),
Err(e) if e.kind() == redis::ErrorKind::TypeError => Ok(None),
Err(e) => Err(GatewayError::Redis(e)),
}
}
pub async fn set(&self, key: &str, value: &str, ttl: Option<u64>) -> Result<()> {
let mut conn = self.get_connection().await?;
if let Some(ttl_seconds) = ttl {
let _: () = conn
.conn
.set_ex(key, value, ttl_seconds)
.await
.map_err(|e| GatewayError::Redis(e))?;
} else {
let _: () = conn
.conn
.set(key, value)
.await
.map_err(|e| GatewayError::Redis(e))?;
}
Ok(())
}
pub async fn delete(&self, key: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.del(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn exists(&self, key: &str) -> Result<bool> {
let mut conn = self.get_connection().await?;
let exists: bool = conn
.conn
.exists(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(exists)
}
pub async fn expire(&self, key: &str, ttl: u64) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.expire(key, ttl as i64)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn ttl(&self, key: &str) -> Result<i64> {
let mut conn = self.get_connection().await?;
let ttl: i64 = conn
.conn
.ttl(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(ttl)
}
pub async fn mget(&self, keys: &[String]) -> Result<Vec<Option<String>>> {
let mut conn = self.get_connection().await?;
let values: Vec<Option<String>> = conn
.conn
.mget(keys)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(values)
}
pub async fn mset(&self, pairs: &[(String, String)], ttl: Option<u64>) -> Result<()> {
let mut conn = self.get_connection().await?;
if pairs.is_empty() {
return Ok(());
}
let mut pipe = redis::pipe();
pipe.atomic();
for (key, value) in pairs {
if let Some(ttl_seconds) = ttl {
pipe.set_ex(key, value, ttl_seconds);
} else {
pipe.set(key, value);
}
}
let _: () = pipe
.query_async(&mut conn.conn)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn list_push(&self, key: &str, value: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.lpush(key, value)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn list_pop(&self, key: &str) -> Result<Option<String>> {
let mut conn = self.get_connection().await?;
let result: RedisResult<String> = conn.conn.rpop(key, None).await;
match result {
Ok(value) => Ok(Some(value)),
Err(e) if e.kind() == redis::ErrorKind::TypeError => Ok(None),
Err(e) => Err(GatewayError::Redis(e)),
}
}
pub async fn list_length(&self, key: &str) -> Result<usize> {
let mut conn = self.get_connection().await?;
let len: usize = conn
.conn
.llen(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(len)
}
pub async fn list_range(&self, key: &str, start: isize, stop: isize) -> Result<Vec<String>> {
let mut conn = self.get_connection().await?;
let values: Vec<String> = conn
.conn
.lrange(key, start, stop)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(values)
}
pub async fn set_add(&self, key: &str, member: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.sadd(key, member)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn set_remove(&self, key: &str, member: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.srem(key, member)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn set_members(&self, key: &str) -> Result<Vec<String>> {
let mut conn = self.get_connection().await?;
let members: Vec<String> = conn
.conn
.smembers(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(members)
}
pub async fn set_is_member(&self, key: &str, member: &str) -> Result<bool> {
let mut conn = self.get_connection().await?;
let is_member: bool = conn
.conn
.sismember(key, member)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(is_member)
}
pub async fn hash_set(&self, key: &str, field: &str, value: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.hset(key, field, value)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn hash_get(&self, key: &str, field: &str) -> Result<Option<String>> {
let mut conn = self.get_connection().await?;
let result: RedisResult<String> = conn.conn.hget(key, field).await;
match result {
Ok(value) => Ok(Some(value)),
Err(e) if e.kind() == redis::ErrorKind::TypeError => Ok(None),
Err(e) => Err(GatewayError::Redis(e)),
}
}
pub async fn hash_delete(&self, key: &str, field: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.hdel(key, field)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn hash_get_all(&self, key: &str) -> Result<HashMap<String, String>> {
let mut conn = self.get_connection().await?;
let hash: HashMap<String, String> = conn
.conn
.hgetall(key)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(hash)
}
pub async fn hash_exists(&self, key: &str, field: &str) -> Result<bool> {
let mut conn = self.get_connection().await?;
let exists: bool = conn
.conn
.hexists(key, field)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(exists)
}
pub async fn sorted_set_add(&self, key: &str, score: f64, member: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.zadd(key, score, member)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn sorted_set_range(
&self,
key: &str,
start: isize,
stop: isize,
) -> Result<Vec<String>> {
let mut conn = self.get_connection().await?;
let members: Vec<String> = conn
.conn
.zrange(key, start, stop)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(members)
}
pub async fn sorted_set_remove(&self, key: &str, member: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.zrem(key, member)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn publish(&self, channel: &str, message: &str) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = conn
.conn
.publish(channel, message)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
pub async fn subscribe(&self, channels: &[String]) -> Result<Subscription> {
let mut pubsub = self
.client
.get_async_connection()
.await
.map_err(|e| GatewayError::Redis(e))?
.into_pubsub();
for channel in channels {
pubsub
.subscribe(channel)
.await
.map_err(|e| GatewayError::Redis(e))?;
}
Ok(Subscription { pubsub })
}
pub async fn increment(&self, key: &str, delta: i64) -> Result<i64> {
let mut conn = self.get_connection().await?;
let new_value: i64 = conn
.conn
.incr(key, delta)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(new_value)
}
pub async fn decrement(&self, key: &str, delta: i64) -> Result<i64> {
let mut conn = self.get_connection().await?;
let new_value: i64 = conn
.conn
.decr(key, delta)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(new_value)
}
fn sanitize_url(url: &str) -> String {
if let Ok(parsed) = url::Url::parse(url) {
let mut sanitized = parsed.clone();
if sanitized.password().is_some() {
let _ = sanitized.set_password(Some("***"));
}
sanitized.to_string()
} else {
"invalid_url".to_string()
}
}
pub async fn info(&self) -> Result<String> {
let mut conn = self.get_connection().await?;
let info: String = redis::cmd("INFO")
.query_async(&mut conn.conn)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(info)
}
pub async fn flush_db(&self) -> Result<()> {
let mut conn = self.get_connection().await?;
let _: () = redis::cmd("FLUSHDB")
.query_async(&mut conn.conn)
.await
.map_err(|e| GatewayError::Redis(e))?;
Ok(())
}
}
impl Subscription {
pub async fn next_message(&mut self) -> Result<redis::Msg> {
self.pubsub.on_message().next().await.ok_or_else(|| {
GatewayError::Redis(redis::RedisError::from((
redis::ErrorKind::IoError,
"Subscription closed",
)))
})
}
pub async fn unsubscribe_all(&mut self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_url() {
let url = "redis://user:password@localhost:6379/0";
let sanitized = RedisPool::sanitize_url(url);
assert!(sanitized.contains("user:***@localhost"));
assert!(!sanitized.contains("password"));
}
#[tokio::test]
async fn test_redis_pool_creation() {
let config = RedisConfig {
url: "redis://localhost:6379".to_string(),
enabled: true,
max_connections: 10,
connection_timeout: 5,
cluster: false,
};
assert_eq!(config.url, "redis://localhost:6379");
assert_eq!(config.max_connections, 10);
}
}