use crate::config::models::storage::RedisConfig;
use crate::utils::error::gateway_error::{GatewayError, Result};
use redis::{AsyncConnectionConfig, Client, aio::MultiplexedConnection};
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct RedisPool {
pub(crate) client: Option<Client>,
pub(crate) connection_manager: Option<MultiplexedConnection>,
pub(crate) config: RedisConfig,
pub(crate) noop_mode: bool,
pub(crate) semaphore: Arc<Semaphore>,
}
pub struct RedisConnection {
pub(crate) conn: Option<MultiplexedConnection>,
pub(crate) _permit: Option<OwnedSemaphorePermit>,
}
impl RedisPool {
pub async fn new(config: &RedisConfig) -> Result<Self> {
if !config.enabled {
info!("Redis disabled in config; using no-op Redis pool");
return Ok(Self {
client: None,
connection_manager: None,
config: config.clone(),
noop_mode: true,
semaphore: Arc::new(Semaphore::new(1)),
});
}
info!("Creating Redis connection pool");
debug!("Redis URL: {}", Self::sanitize_url(&config.url));
debug!(
"Redis max_connections: {}, connection_timeout: {}s",
config.max_connections, config.connection_timeout
);
let client = Client::open(config.url.as_str()).map_err(GatewayError::from)?;
let async_config = AsyncConnectionConfig::new().set_connection_timeout(Some(
std::time::Duration::from_secs(config.connection_timeout),
));
let connection_manager = client
.get_multiplexed_async_connection_with_config(&async_config)
.await
.map_err(GatewayError::from)?;
let max_connections = config.max_connections.max(1) as usize;
info!(
"Redis connection pool created successfully (max_connections={})",
max_connections
);
Ok(Self {
client: Some(client),
connection_manager: Some(connection_manager),
config: config.clone(),
noop_mode: false,
semaphore: Arc::new(Semaphore::new(max_connections)),
})
}
pub fn create_noop() -> Self {
info!("Creating no-op Redis pool (Redis unavailable)");
Self {
client: None,
connection_manager: None,
config: RedisConfig {
url: String::new(),
enabled: false,
max_connections: 0,
connection_timeout: 0,
cluster: false,
},
noop_mode: true,
semaphore: Arc::new(Semaphore::new(1)),
}
}
pub fn is_noop(&self) -> bool {
self.noop_mode
}
pub async fn get_connection(&self) -> Result<RedisConnection> {
if self.noop_mode {
return Ok(RedisConnection {
conn: None,
_permit: None,
});
}
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| GatewayError::Internal("Redis semaphore closed".to_string()))?;
Ok(RedisConnection {
conn: self.connection_manager.clone(),
_permit: Some(permit),
})
}
pub async fn health_check(&self) -> Result<()> {
if self.noop_mode {
debug!("Redis health check skipped (no-op mode)");
return Ok(());
}
debug!("Performing Redis health check");
let mut conn = self.get_connection().await?;
if let Some(ref mut c) = conn.conn {
let _: String = redis::cmd("PING")
.query_async(c)
.await
.map_err(GatewayError::from)?;
}
debug!("Redis health check passed");
Ok(())
}
pub async fn close(&self) -> Result<()> {
info!("Closing Redis connection pool");
info!("Redis connection pool closed");
Ok(())
}
pub(crate) fn sanitize_url(url: &str) -> String {
if let Ok(parsed) = url::Url::parse(url) {
let mut sanitized = parsed.clone();
if sanitized.password().is_some() {
let _ = sanitized.set_password(Some("***"));
}
sanitized.to_string()
} else {
"invalid_url".to_string()
}
}
}