#[cfg(feature = "redis")]
use redis::Client;
use super::store_trait::Store;
use crate::{TorshDistributedError, TorshResult};
use async_trait::async_trait;
#[cfg(feature = "redis")]
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "redis")]
use tokio::sync::RwLock;
#[cfg(feature = "redis")]
use tracing::info;
#[cfg(feature = "redis")]
pub struct RedisStore {
redis_url: String,
timeout: Duration,
client: Arc<RwLock<Option<Client>>>,
}
#[cfg(not(feature = "redis"))]
pub struct RedisStore {
#[allow(dead_code)]
redis_url: String,
timeout: Duration,
}
#[cfg(feature = "redis")]
impl std::fmt::Debug for RedisStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisStore")
.field("redis_url", &"<redacted>")
.field("timeout", &self.timeout)
.finish()
}
}
#[cfg(not(feature = "redis"))]
impl std::fmt::Debug for RedisStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisStore (disabled)")
.field("redis_url", &"<redacted>")
.field("timeout", &self.timeout)
.finish()
}
}
#[cfg(feature = "redis")]
impl RedisStore {
pub fn new(redis_url: String, timeout: Duration) -> TorshResult<Self> {
Ok(Self {
redis_url,
timeout,
client: Arc::new(RwLock::new(None)),
})
}
pub async fn connect(&mut self) -> TorshResult<()> {
let client = Client::open(self.redis_url.as_str()).map_err(|e| {
TorshDistributedError::backend_error(
"RedisStore",
format!("Failed to create Redis client: {}", e),
)
})?;
info!(
"Connected to Redis at {} (simplified client)",
self.redis_url
);
let mut client_lock = self.client.write().await;
*client_lock = Some(client);
Ok(())
}
}
#[cfg(not(feature = "redis"))]
impl RedisStore {
pub fn new(redis_url: String, timeout: Duration) -> TorshResult<Self> {
Ok(Self { redis_url, timeout })
}
pub async fn connect(&mut self) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
}
#[cfg(feature = "redis")]
#[async_trait]
impl Store for RedisStore {
async fn set(&self, _key: &str, _value: &[u8]) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn get(&self, _key: &str) -> TorshResult<Option<Vec<u8>>> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn wait(&self, _keys: &[String]) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn delete(&self, _key: &str) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn num_keys(&self) -> TorshResult<usize> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn contains(&self, _key: &str) -> TorshResult<bool> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn set_with_expiry(&self, _key: &str, _value: &[u8], _ttl: Duration) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn compare_and_swap(
&self,
_key: &str,
_expected: Option<&[u8]>,
_value: &[u8],
) -> TorshResult<bool> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
async fn add(&self, _key: &str, _value: i64) -> TorshResult<i64> {
Err(TorshDistributedError::not_implemented(
"RedisStore ConnectionManager not available - awaiting redis crate update",
))
}
}
#[cfg(not(feature = "redis"))]
#[async_trait]
impl Store for RedisStore {
async fn set(&self, _key: &str, _value: &[u8]) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn get(&self, _key: &str) -> TorshResult<Option<Vec<u8>>> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn wait(&self, _keys: &[String]) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn delete(&self, _key: &str) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn num_keys(&self) -> TorshResult<usize> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn contains(&self, _key: &str) -> TorshResult<bool> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn set_with_expiry(&self, _key: &str, _value: &[u8], _ttl: Duration) -> TorshResult<()> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn compare_and_swap(
&self,
_key: &str,
_expected: Option<&[u8]>,
_value: &[u8],
) -> TorshResult<bool> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
async fn add(&self, _key: &str, _value: i64) -> TorshResult<i64> {
Err(TorshDistributedError::not_implemented(
"RedisStore requires 'redis' feature to be enabled",
))
}
}
#[cfg(all(test, feature = "redis"))]
mod tests {
use super::*;
async fn create_test_store() -> RedisStore {
let mut store =
RedisStore::new("redis://127.0.0.1:6379".to_string(), Duration::from_secs(5)).unwrap();
store.connect().await.expect("operation should succeed");
store
}
#[tokio::test]
#[ignore] async fn test_redis_store_basic_operations() {
let store = create_test_store().await;
store
.set("test_key1", b"test_value1")
.await
.expect("operation should succeed");
let value = store
.get("test_key1")
.await
.expect("operation should succeed");
assert_eq!(value, Some(b"test_value1".to_vec()));
assert!(store
.contains("test_key1")
.await
.expect("operation should succeed"));
assert!(!store
.contains("nonexistent")
.await
.expect("operation should succeed"));
store
.delete("test_key1")
.await
.expect("operation should succeed");
assert!(!store
.contains("test_key1")
.await
.expect("operation should succeed"));
}
#[tokio::test]
#[ignore] async fn test_redis_store_atomic_operations() {
let store = create_test_store().await;
let _ = store.delete("test_counter").await;
let _ = store.delete("test_num").await;
let success = store
.compare_and_swap("test_counter", None, b"0")
.await
.expect("operation should succeed");
assert!(success);
let success = store
.compare_and_swap("test_counter", Some(b"0"), b"1")
.await
.expect("operation should succeed");
assert!(success);
let success = store
.compare_and_swap("test_counter", Some(b"0"), b"2")
.await
.expect("operation should succeed");
assert!(!success);
let result = store
.add("test_num", 5)
.await
.expect("operation should succeed");
assert_eq!(result, 5);
let result = store
.add("test_num", 3)
.await
.expect("operation should succeed");
assert_eq!(result, 8);
store
.delete("test_counter")
.await
.expect("operation should succeed");
store
.delete("test_num")
.await
.expect("operation should succeed");
}
#[tokio::test]
#[ignore] async fn test_redis_store_expiry() {
let store = create_test_store().await;
store
.set_with_expiry("temp_key", b"temp_value", Duration::from_secs(2))
.await
.expect("operation should succeed");
assert!(store
.contains("temp_key")
.await
.expect("operation should succeed"));
tokio::time::sleep(Duration::from_secs(3)).await;
assert!(!store
.contains("temp_key")
.await
.expect("operation should succeed"));
}
#[tokio::test]
#[ignore] async fn test_redis_store_wait() {
let store = create_test_store().await;
let _ = store.delete("wait_key1").await;
let _ = store.delete("wait_key2").await;
let store_clone = create_test_store().await;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
store_clone
.set("wait_key1", b"value1")
.await
.expect("operation should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
store_clone
.set("wait_key2", b"value2")
.await
.expect("operation should succeed");
});
store
.wait(&["wait_key1".to_string(), "wait_key2".to_string()])
.await
.expect("operation should succeed");
assert!(store
.contains("wait_key1")
.await
.expect("operation should succeed"));
assert!(store
.contains("wait_key2")
.await
.expect("operation should succeed"));
store
.delete("wait_key1")
.await
.expect("operation should succeed");
store
.delete("wait_key2")
.await
.expect("operation should succeed");
}
}