use crate::domain::{ConfigError, ConfigKey, ConfigValue, Result};
use crate::ports::ConfigSource;
use once_cell::sync::Lazy;
use redis::aio::MultiplexedConnection;
use redis::{AsyncCommands, Client};
use std::collections::HashMap;
use std::sync::Arc;
static RELOAD_RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Runtime::new().expect("Failed to create reload runtime for Redis adapter")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RedisStorageMode {
StringKeys,
Hash,
}
#[derive(Debug)]
pub struct RedisAdapter {
client: Arc<Client>,
namespace: String,
storage_mode: RedisStorageMode,
priority: u8,
cache: HashMap<String, String>,
}
impl RedisAdapter {
fn validate_namespace(namespace: &str) -> Result<()> {
if namespace.contains(['*', '?', '[', ']', '\\']) {
return Err(ConfigError::SourceError {
source_name: "redis".to_string(),
message: "Namespace contains invalid characters (* ? [ ] \\)".to_string(),
source: None,
});
}
Ok(())
}
pub async fn new(url: &str, namespace: &str, storage_mode: RedisStorageMode) -> Result<Self> {
Self::validate_namespace(namespace)?;
let client = Client::open(url).map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to create Redis client: {}", e),
source: Some(Box::new(e)),
})?;
let mut adapter = Self {
client: Arc::new(client),
namespace: namespace.to_string(),
storage_mode,
priority: 1,
cache: HashMap::new(),
};
adapter.load_all_keys().await?;
Ok(adapter)
}
pub async fn with_priority(
url: &str,
namespace: &str,
storage_mode: RedisStorageMode,
priority: u8,
) -> Result<Self> {
let mut adapter = Self::new(url, namespace, storage_mode).await?;
adapter.priority = priority;
Ok(adapter)
}
async fn get_connection(&self) -> Result<MultiplexedConnection> {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to connect to Redis: {}", e),
source: Some(Box::new(e)),
})
}
async fn load_all_keys(&mut self) -> Result<()> {
let mut conn = self.get_connection().await?;
self.cache.clear();
match self.storage_mode {
RedisStorageMode::Hash => {
let hash: HashMap<String, String> =
conn.hgetall(&self.namespace)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch hash from Redis: {}", e),
source: Some(Box::new(e)),
})?;
self.cache = hash;
}
RedisStorageMode::StringKeys => {
let pattern = format!("{}*", self.namespace);
let mut cursor: u64 = 0;
let mut all_keys = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to scan keys from Redis: {}", e),
source: Some(Box::new(e)),
})?;
all_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
for key in all_keys {
let value: String =
conn.get(&key).await.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch value from Redis: {}", e),
source: Some(Box::new(e)),
})?;
let key = if key.starts_with(&self.namespace) {
&key[self.namespace.len()..]
} else {
&key
};
self.cache.insert(key.to_string(), value);
}
}
}
Ok(())
}
fn reload_sync(&mut self) -> Result<()> {
let client = Arc::clone(&self.client);
let namespace = self.namespace.clone();
let storage_mode = self.storage_mode;
let new_cache = if tokio::runtime::Handle::try_current().is_ok() {
let handle = std::thread::spawn(move || {
RELOAD_RUNTIME.block_on(async move {
let mut conn =
client
.get_multiplexed_async_connection()
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to connect to Redis: {}", e),
source: Some(Box::new(e)),
})?;
let mut new_cache = HashMap::new();
match storage_mode {
RedisStorageMode::Hash => {
let hash: HashMap<String, String> = conn
.hgetall(&namespace)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch hash from Redis: {}", e),
source: Some(Box::new(e)),
})?;
new_cache = hash;
}
RedisStorageMode::StringKeys => {
let pattern = format!("{}*", namespace);
let mut cursor: u64 = 0;
let mut all_keys = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to scan keys from Redis: {}", e),
source: Some(Box::new(e)),
})?;
all_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
for key in all_keys {
let value: String =
conn.get(&key).await.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch value from Redis: {}", e),
source: Some(Box::new(e)),
})?;
let key = if key.starts_with(&namespace) {
&key[namespace.len()..]
} else {
&key
};
new_cache.insert(key.to_string(), value);
}
}
}
Ok::<HashMap<String, String>, ConfigError>(new_cache)
})
});
handle.join().map_err(|_| ConfigError::SourceError {
source_name: "redis".to_string(),
message: "Failed to join reload thread".to_string(),
source: None,
})?
} else {
RELOAD_RUNTIME.block_on(async move {
let mut conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to connect to Redis: {}", e),
source: Some(Box::new(e)),
})?;
let mut new_cache = HashMap::new();
match storage_mode {
RedisStorageMode::Hash => {
let hash: HashMap<String, String> = conn
.hgetall(&namespace)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch hash from Redis: {}", e),
source: Some(Box::new(e)),
})?;
new_cache = hash;
}
RedisStorageMode::StringKeys => {
let pattern = format!("{}*", namespace);
let mut cursor: u64 = 0;
let mut all_keys = Vec::new();
loop {
let (new_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut conn)
.await
.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to scan keys from Redis: {}", e),
source: Some(Box::new(e)),
})?;
all_keys.extend(keys);
cursor = new_cursor;
if cursor == 0 {
break;
}
}
for key in all_keys {
let value: String =
conn.get(&key).await.map_err(|e| ConfigError::SourceError {
source_name: "redis".to_string(),
message: format!("Failed to fetch value from Redis: {}", e),
source: Some(Box::new(e)),
})?;
let key = if key.starts_with(&namespace) {
&key[namespace.len()..]
} else {
&key
};
new_cache.insert(key.to_string(), value);
}
}
}
Ok::<HashMap<String, String>, ConfigError>(new_cache)
})
}?;
self.cache = new_cache;
Ok(())
}
}
impl ConfigSource for RedisAdapter {
fn name(&self) -> &str {
"redis"
}
fn priority(&self) -> u8 {
self.priority
}
fn get(&self, key: &ConfigKey) -> Result<Option<ConfigValue>> {
Ok(self
.cache
.get(key.as_str())
.map(|v| ConfigValue::from(v.as_str())))
}
fn all_keys(&self) -> Result<Vec<ConfigKey>> {
Ok(self
.cache
.keys()
.map(|k| ConfigKey::from(k.as_str()))
.collect())
}
fn reload(&mut self) -> Result<()> {
self.reload_sync()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_redis_storage_modes() {
assert_eq!(RedisStorageMode::StringKeys, RedisStorageMode::StringKeys);
assert_eq!(RedisStorageMode::Hash, RedisStorageMode::Hash);
assert_ne!(RedisStorageMode::StringKeys, RedisStorageMode::Hash);
}
}