use r2d2::{Pool, PooledConnection};
use redis::{cmd, Commands, Connection, FromRedisValue, RedisResult, ToRedisArgs};
use std::env;
use std::fs;
use std::sync::OnceLock;
static REDIS_GO: OnceLock<RedisGo> = OnceLock::new();
const DEFAULT_POOL_SIZE: u32 = 16;
fn get_redis_url() -> Option<String> {
if let Ok(url) = env::var("REDIS_URL") {
return Some(url);
}
if let Ok(content) = fs::read_to_string(".env") {
for line in content.lines() {
let line = line.trim();
if line.starts_with('#') || line.is_empty() {
continue;
}
if let Some((k, v)) = line.split_once('=') {
if k.trim() == "REDIS_URL" {
return Some(v.trim().to_string());
}
}
}
}
None
}
pub struct RedisGo {
client: Option<redis::Client>,
pool: Option<Pool<redis::Client>>,
}
impl RedisGo {
pub fn new() -> RedisResult<Self> {
let redis_url = get_redis_url();
let client = match redis_url {
Some(url) => redis::Client::open(url).ok(),
None => None,
};
let pool = match &client {
Some(client) => Some(Self::create_pool(client)?),
None => None,
};
Ok(RedisGo { client, pool })
}
fn create_pool(client: &redis::Client) -> RedisResult<Pool<redis::Client>> {
Pool::builder()
.max_size(DEFAULT_POOL_SIZE)
.build(client.clone())
.map_err(|error| {
redis::RedisError::from((
redis::ErrorKind::Io,
"Failed to build Redis connection pool",
error.to_string(),
))
})
}
fn get_pool(&self) -> RedisResult<&Pool<redis::Client>> {
self.pool.as_ref().ok_or_else(|| {
redis::RedisError::from((redis::ErrorKind::Io, "Redis client not initialized"))
})
}
fn get_connection(&self) -> RedisResult<PooledConnection<redis::Client>> {
self.get_pool()?.get().map_err(|error| {
redis::RedisError::from((
redis::ErrorKind::Io,
"Failed to get Redis connection from pool",
error.to_string(),
))
})
}
fn should_reconnect(error: &redis::RedisError) -> bool {
error.is_connection_dropped() || error.is_io_error()
}
fn execute_operation<F, T>(
&self,
operation: &mut F,
) -> RedisResult<T>
where
F: FnMut(&mut Connection) -> RedisResult<T>,
{
let mut conn = self.get_connection()?;
operation(&mut conn)
}
fn execute_with_connection<F, T>(&self, operation: F) -> RedisResult<T>
where
F: FnMut(&mut Connection) -> RedisResult<T>,
{
let mut operation = operation;
match self.execute_operation(&mut operation) {
Ok(result) => Ok(result),
Err(error) if Self::should_reconnect(&error) => self.execute_operation(&mut operation),
Err(error) => Err(error),
}
}
pub fn set<K, V>(key: K, value: V) -> RedisResult<()>
where
K: ToRedisArgs,
V: ToRedisArgs,
{
get_redisgo().execute_with_connection(|conn| {
cmd("SET").arg(&key).arg(&value).query::<()>(conn)
})
}
pub fn set_ex<K, V>(key: K, value: V, ttl: u64) -> RedisResult<()>
where
K: ToRedisArgs,
V: ToRedisArgs,
{
get_redisgo().execute_with_connection(|conn| {
cmd("SETEX")
.arg(&key)
.arg(ttl)
.arg(&value)
.query::<()>(conn)
})
}
pub fn get<K, V>(key: K) -> RedisResult<V>
where
K: ToRedisArgs,
V: FromRedisValue,
{
get_redisgo().execute_with_connection(|conn| cmd("GET").arg(&key).query(conn))
}
pub fn delete<K>(key: K) -> RedisResult<()>
where
K: ToRedisArgs,
{
get_redisgo().execute_with_connection(|conn| cmd("DEL").arg(&key).query::<()>(conn))
}
pub fn exists<K>(key: K) -> RedisResult<bool>
where
K: ToRedisArgs,
{
get_redisgo().execute_with_connection(|conn| cmd("EXISTS").arg(&key).query(conn))
}
#[cfg(feature = "dangerous")]
pub fn flush_all() -> RedisResult<()> {
get_redisgo().execute_with_connection(|conn| conn.flushall())
}
pub fn get_client(&self) -> &redis::Client {
self.client.as_ref().expect("Redis client not initialized")
}
pub fn is_connected(&self) -> bool {
self.ping().is_ok()
}
pub fn ping(&self) -> RedisResult<String> {
self.execute_with_connection(|conn| conn.ping())
}
pub fn get_connection_status(&self) -> String {
if self.is_connected() {
"Connected".to_string()
} else {
"Not connected".to_string()
}
}
pub fn get_client_info(&self) -> String {
format!("Client Info: {:?}", self.client.as_ref().map(|c| c.get_connection_info()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use redis::cmd;
use std::sync::{Arc, Barrier, Mutex, OnceLock};
use std::thread;
use std::time::{Duration, Instant};
fn test_lock() -> &'static Mutex<()> {
static TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
TEST_LOCK.get_or_init(|| Mutex::new(()))
}
#[test]
fn test_concurrent_commands_use_separate_connections() {
let _guard = test_lock().lock().unwrap();
let redisgo = Arc::new(RedisGo::new().expect("Failed to initialize RedisGo"));
let barrier = Arc::new(Barrier::new(2));
let list_key = "redisgo_blocking_list";
redisgo
.execute_with_connection(|conn| cmd("DEL").arg(list_key).query::<usize>(conn).map(|_| ()))
.expect("Failed to reset blocking list");
let worker = {
let redisgo = Arc::clone(&redisgo);
let barrier = Arc::clone(&barrier);
thread::spawn(move || {
barrier.wait();
redisgo
.execute_with_connection(|conn| {
cmd("BLPOP")
.arg(list_key)
.arg(2)
.query::<Option<(String, String)>>(conn)
.map(|_| ())
})
.expect("Failed to run blocking Redis command");
})
};
barrier.wait();
thread::sleep(Duration::from_millis(100));
let start = Instant::now();
let response = redisgo.ping().expect("Failed to ping Redis");
let elapsed = start.elapsed();
assert_eq!(response, "PONG");
assert!(
elapsed < Duration::from_secs(1),
"Ping should not wait on another thread's blocking Redis command"
);
worker.join().unwrap();
}
}
impl Default for RedisGo {
fn default() -> Self {
Self::new().expect("Failed to initialize RedisGo")
}
}
pub fn get_redisgo() -> &'static RedisGo {
REDIS_GO.get_or_init(|| RedisGo::new().expect("Failed to initialize RedisGo"))
}