use crate::time::Duration;
#[cfg(feature = "time_stores")]
use crate::CacheTtl;
use crate::ConcurrentCached;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Display;
use std::marker::PhantomData;
pub struct RedisCacheBuilder<K, V> {
ttl: Duration,
refresh: bool,
namespace: String,
prefix: String,
connection_string: Option<String>,
pool_max_size: Option<u32>,
pool_min_idle: Option<u32>,
pool_max_lifetime: Option<Duration>,
pool_idle_timeout: Option<Duration>,
_phantom: PhantomData<fn() -> (K, V)>,
}
const ENV_KEY: &str = "CACHED_REDIS_CONNECTION_STRING";
const DEFAULT_NAMESPACE: &str = "cached-redis-store:";
fn ttl_seconds(ttl: Duration) -> Result<u64, RedisCacheError> {
if ttl.is_zero() {
return Err(redis::RedisError::from((
redis::ErrorKind::InvalidClientConfig,
"TTL must be greater than zero for Redis",
format!("got {ttl:?}"),
))
.into());
}
let secs = ttl
.as_secs()
.saturating_add(u64::from(ttl.subsec_nanos() > 0));
Ok(secs.min(i64::MAX as u64))
}
fn ttl_seconds_i64(ttl: Duration) -> Result<i64, RedisCacheError> {
Ok(ttl_seconds(ttl)? as i64)
}
fn generate_redis_key(namespace: &str, prefix: &str, key: &str) -> String {
let namespace = namespace.trim_end_matches(':');
let cap = namespace.len()
+ if !namespace.is_empty() { 1 } else { 0 }
+ prefix.len()
+ if !prefix.is_empty() { 1 } else { 0 }
+ key.len();
let mut out = String::with_capacity(cap);
if !namespace.is_empty() {
out.push_str(namespace);
out.push(':');
}
if !prefix.is_empty() {
out.push_str(prefix);
out.push(':');
}
out.push_str(key);
out
}
#[cfg(test)]
mod generate_key_tests {
use super::{generate_redis_key, DEFAULT_NAMESPACE};
#[test]
fn default_namespace_trailing_colon_trimmed_and_rejoined() {
assert_eq!(
generate_redis_key(DEFAULT_NAMESPACE, "my_prefix", "my_key"),
"cached-redis-store:my_prefix:my_key"
);
assert!(DEFAULT_NAMESPACE.ends_with(':'));
}
#[test]
fn empty_segments_are_skipped() {
assert_eq!(generate_redis_key("", "p", "k"), "p:k");
assert_eq!(generate_redis_key("ns", "", "k"), "ns:k");
assert_eq!(generate_redis_key("", "", "k"), "k");
assert_eq!(generate_redis_key(":", "", "k"), "k"); }
#[test]
fn full_form_and_multiple_trailing_colons() {
assert_eq!(generate_redis_key("ns", "p", "k"), "ns:p:k");
assert_eq!(generate_redis_key("ns:::", "p", "k"), "ns:p:k");
assert_eq!(generate_redis_key("a:b", "p", "k"), "a:b:p:k");
}
#[test]
fn interior_colon_collision() {
let with_interior = generate_redis_key("ns:evil", "", "k");
let split_across = generate_redis_key("ns", "evil", "k");
assert_eq!(
with_interior, split_across,
"interior colons can cause key collisions"
);
}
}
#[cfg(test)]
mod ttl_seconds_tests {
use super::{ttl_seconds, ttl_seconds_i64};
use crate::time::Duration;
#[test]
fn zero_is_rejected() {
assert!(ttl_seconds(Duration::ZERO).is_err());
assert!(ttl_seconds_i64(Duration::ZERO).is_err());
}
#[test]
fn whole_seconds_pass_through() {
assert_eq!(ttl_seconds(Duration::from_secs(1)).unwrap(), 1);
assert_eq!(ttl_seconds(Duration::from_secs(60)).unwrap(), 60);
assert_eq!(ttl_seconds_i64(Duration::from_secs(60)).unwrap(), 60);
}
#[test]
fn subsecond_rounds_up_to_one() {
assert_eq!(ttl_seconds(Duration::from_nanos(1)).unwrap(), 1);
assert_eq!(ttl_seconds(Duration::from_millis(1)).unwrap(), 1);
assert_eq!(ttl_seconds(Duration::from_millis(999)).unwrap(), 1);
}
#[test]
fn fractional_rounds_up() {
assert_eq!(ttl_seconds(Duration::from_millis(1_500)).unwrap(), 2);
assert_eq!(ttl_seconds(Duration::new(5, 1)).unwrap(), 6);
}
#[test]
fn very_large_clamps_to_i64_max() {
let huge = Duration::from_secs(u64::MAX);
assert_eq!(ttl_seconds(huge).unwrap(), i64::MAX as u64);
assert_eq!(ttl_seconds_i64(huge).unwrap(), i64::MAX);
}
}
#[derive(Clone)]
struct ConnectionString(String);
impl ConnectionString {
fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Debug for ConnectionString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("[REDACTED connection string]")
}
}
impl std::fmt::Display for ConnectionString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("[REDACTED connection string]")
}
}
use thiserror::Error;
#[derive(Error, Debug)]
pub enum RedisCacheBuildError {
#[error("redis connection error")]
Connection(#[from] redis::RedisError),
#[error("redis pool error")]
Pool(#[from] r2d2::Error),
#[error("Connection string not specified or invalid in env var {env_key:?}: {error:?}")]
MissingConnectionString {
env_key: String,
error: std::env::VarError,
},
}
impl<K, V> RedisCacheBuilder<K, V>
where
K: Display,
V: Serialize + DeserializeOwned,
{
pub fn new<S: AsRef<str>>(prefix: S, ttl: Duration) -> RedisCacheBuilder<K, V> {
Self {
ttl,
refresh: false,
namespace: DEFAULT_NAMESPACE.to_string(),
prefix: prefix.as_ref().to_string(),
connection_string: None,
pool_max_size: None,
pool_min_idle: None,
pool_max_lifetime: None,
pool_idle_timeout: None,
_phantom: PhantomData,
}
}
#[must_use]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
#[must_use]
pub fn refresh(mut self, refresh: bool) -> Self {
self.refresh = refresh;
self
}
#[must_use]
pub fn namespace<S: AsRef<str>>(mut self, namespace: S) -> Self {
self.namespace = namespace.as_ref().to_string();
self
}
#[must_use]
pub fn prefix<S: AsRef<str>>(mut self, prefix: S) -> Self {
self.prefix = prefix.as_ref().to_string();
self
}
#[must_use]
pub fn connection_string(mut self, cs: &str) -> Self {
self.connection_string = Some(cs.to_string());
self
}
#[must_use]
pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
self.pool_max_size = Some(max_size);
self
}
#[must_use]
pub fn connection_pool_min_idle(mut self, min_idle: u32) -> Self {
self.pool_min_idle = Some(min_idle);
self
}
#[must_use]
pub fn connection_pool_max_lifetime(mut self, max_lifetime: Duration) -> Self {
self.pool_max_lifetime = Some(max_lifetime);
self
}
#[must_use]
pub fn connection_pool_idle_timeout(mut self, idle_timeout: Duration) -> Self {
self.pool_idle_timeout = Some(idle_timeout);
self
}
pub fn resolve_connection_string(&self) -> Result<String, RedisCacheBuildError> {
match self.connection_string {
Some(ref s) => Ok(s.to_string()),
None => {
std::env::var(ENV_KEY).map_err(|e| RedisCacheBuildError::MissingConnectionString {
env_key: ENV_KEY.to_string(),
error: e,
})
}
}
}
fn create_pool(&self) -> Result<r2d2::Pool<redis::Client>, RedisCacheBuildError> {
let s = self.resolve_connection_string()?;
let client: redis::Client = redis::Client::open(s)?;
let pool_builder = r2d2::Pool::builder();
let pool_builder = if let Some(max_size) = self.pool_max_size {
pool_builder.max_size(max_size)
} else {
pool_builder
};
let pool_builder = if let Some(min_idle) = self.pool_min_idle {
pool_builder.min_idle(Some(min_idle))
} else {
pool_builder
};
let pool_builder = if let Some(max_lifetime) = self.pool_max_lifetime {
pool_builder.max_lifetime(Some(max_lifetime))
} else {
pool_builder
};
let pool_builder = if let Some(idle_timeout) = self.pool_idle_timeout {
pool_builder.idle_timeout(Some(idle_timeout))
} else {
pool_builder
};
let pool: r2d2::Pool<redis::Client> = pool_builder.build(client)?;
Ok(pool)
}
pub fn build(self) -> Result<RedisCache<K, V>, RedisCacheBuildError> {
Ok(RedisCache {
ttl: self.ttl,
refresh: self.refresh,
connection_string: ConnectionString(self.resolve_connection_string()?),
pool: self.create_pool()?,
namespace: self.namespace,
prefix: self.prefix,
_phantom: PhantomData,
})
}
}
pub struct RedisCache<K, V> {
pub(super) ttl: Duration,
pub(super) refresh: bool,
pub(super) namespace: String,
pub(super) prefix: String,
connection_string: ConnectionString,
pool: r2d2::Pool<redis::Client>,
_phantom: PhantomData<fn() -> (K, V)>,
}
impl<K, V> RedisCache<K, V>
where
K: Display,
V: Serialize + DeserializeOwned,
{
#[allow(clippy::new_ret_no_self)]
pub fn new<S: AsRef<str>>(prefix: S, ttl: Duration) -> RedisCacheBuilder<K, V> {
RedisCacheBuilder::new(prefix, ttl)
}
fn generate_key(&self, key: &K) -> String {
generate_redis_key(&self.namespace, &self.prefix, &key.to_string())
}
#[must_use]
pub fn connection_string(&self) -> String {
self.connection_string.as_str().to_string()
}
}
#[derive(Error, Debug)]
pub enum RedisCacheError {
#[error("redis error")]
RedisCacheError(#[from] redis::RedisError),
#[error("redis pool error")]
PoolError(#[from] r2d2::Error),
#[error("Error deserializing cached value {cached_value:?}: {error:?}")]
CacheDeserializationError {
cached_value: String,
error: serde_json::Error,
},
#[error("Error serializing cached value: {error:?}")]
CacheSerializationError { error: serde_json::Error },
}
#[derive(serde::Serialize, serde::Deserialize)]
struct CachedRedisValue<V> {
pub(crate) value: V,
pub(crate) version: Option<u64>,
}
impl<V> CachedRedisValue<V> {
fn new(value: V) -> Self {
Self {
value,
version: Some(1),
}
}
}
impl<K, V> ConcurrentCached<K, V> for RedisCache<K, V>
where
K: Display,
V: Serialize + DeserializeOwned,
{
type Error = RedisCacheError;
fn cache_get(&self, key: &K) -> Result<Option<V>, RedisCacheError> {
let mut conn = self.pool.get()?;
let mut pipe = redis::pipe();
let key = self.generate_key(key);
pipe.get(&key);
if self.refresh {
pipe.expire(key, ttl_seconds_i64(self.ttl)?).ignore();
}
let res: (Option<String>,) = pipe.query(&mut *conn)?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
fn cache_set(&self, key: K, val: V) -> Result<Option<V>, RedisCacheError> {
let mut conn = self.pool.get()?;
let mut pipe = redis::pipe();
let key = self.generate_key(&key);
let ttl_secs = ttl_seconds(self.ttl)?;
let val = CachedRedisValue::new(val);
pipe.get(&key);
pipe.set_ex::<String, String>(
key,
serde_json::to_string(&val)
.map_err(|e| RedisCacheError::CacheSerializationError { error: e })?,
ttl_secs,
)
.ignore();
let res: (Option<String>,) = pipe.query(&mut *conn)?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
fn cache_remove(&self, key: &K) -> Result<Option<V>, RedisCacheError> {
let mut conn = self.pool.get()?;
let mut pipe = redis::pipe();
let key = self.generate_key(key);
pipe.get(&key);
pipe.del::<String>(key).ignore();
let res: (Option<String>,) = pipe.query(&mut *conn)?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
fn cache_delete(&self, key: &K) -> Result<bool, RedisCacheError> {
let mut conn = self.pool.get()?;
let key = self.generate_key(key);
let removed: usize = redis::cmd("DEL").arg(key).query(&mut *conn)?;
Ok(removed > 0)
}
fn ttl(&self) -> Option<Duration> {
Some(self.ttl)
}
fn set_ttl(&mut self, ttl: Duration) -> Option<Duration> {
let old = self.ttl;
self.ttl = ttl;
Some(old)
}
fn set_refresh_on_hit(&mut self, refresh: bool) -> bool {
let old = self.refresh;
self.refresh = refresh;
old
}
fn unset_ttl(&mut self) -> Option<Duration> {
None
}
}
#[cfg(feature = "time_stores")]
impl<K, V> CacheTtl for RedisCache<K, V> {
fn ttl(&self) -> Option<Duration> {
Some(self.ttl)
}
fn set_ttl(&mut self, ttl: Duration) -> Option<Duration> {
let old = self.ttl;
self.ttl = ttl;
Some(old)
}
fn unset_ttl(&mut self) -> Option<Duration> {
None
}
fn refresh_on_hit(&self) -> bool {
self.refresh
}
fn set_refresh_on_hit(&mut self, refresh: bool) -> bool {
let old = self.refresh;
self.refresh = refresh;
old
}
}
#[cfg(all(
feature = "async",
any(
feature = "redis_smol",
feature = "redis_tokio",
feature = "redis_connection_manager"
)
))]
mod async_redis {
use crate::time::Duration;
use super::{
CachedRedisValue, ConnectionString, DeserializeOwned, Display, PhantomData,
RedisCacheBuildError, RedisCacheError, Serialize, DEFAULT_NAMESPACE, ENV_KEY,
};
#[cfg(feature = "time_stores")]
use crate::CacheTtl;
use crate::ConcurrentCachedAsync;
#[cfg(feature = "redis_async_cache")]
use redis::IntoConnectionInfo;
pub struct AsyncRedisCacheBuilder<K, V> {
ttl: Duration,
refresh: bool,
namespace: String,
prefix: String,
connection_string: Option<String>,
#[cfg(feature = "redis_async_cache")]
client_side_caching: bool,
_phantom: PhantomData<fn() -> (K, V)>,
}
impl<K, V> AsyncRedisCacheBuilder<K, V>
where
K: Display,
V: Serialize + DeserializeOwned,
{
pub fn new<S: AsRef<str>>(prefix: S, ttl: Duration) -> AsyncRedisCacheBuilder<K, V> {
Self {
ttl,
refresh: false,
namespace: DEFAULT_NAMESPACE.to_string(),
prefix: prefix.as_ref().to_string(),
connection_string: None,
#[cfg(feature = "redis_async_cache")]
client_side_caching: false,
_phantom: PhantomData,
}
}
#[must_use]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
#[must_use]
pub fn refresh(mut self, refresh: bool) -> Self {
self.refresh = refresh;
self
}
#[must_use]
pub fn namespace<S: AsRef<str>>(mut self, namespace: S) -> Self {
self.namespace = namespace.as_ref().to_string();
self
}
#[must_use]
pub fn prefix<S: AsRef<str>>(mut self, prefix: S) -> Self {
self.prefix = prefix.as_ref().to_string();
self
}
#[must_use]
pub fn connection_string(mut self, cs: &str) -> Self {
self.connection_string = Some(cs.to_string());
self
}
#[cfg(feature = "redis_async_cache")]
#[must_use]
pub fn client_side_caching(mut self, enable: bool) -> Self {
self.client_side_caching = enable;
self
}
pub fn resolve_connection_string(&self) -> Result<String, RedisCacheBuildError> {
match self.connection_string {
Some(ref s) => Ok(s.to_string()),
None => std::env::var(ENV_KEY).map_err(|e| {
RedisCacheBuildError::MissingConnectionString {
env_key: ENV_KEY.to_string(),
error: e,
}
}),
}
}
#[cfg(not(feature = "redis_connection_manager"))]
async fn create_multiplexed_connection(
&self,
) -> Result<redis::aio::MultiplexedConnection, RedisCacheBuildError> {
let s = self.resolve_connection_string()?;
#[cfg(feature = "redis_async_cache")]
if self.client_side_caching {
let mut connection_info = s.into_connection_info()?;
let mut config = redis::AsyncConnectionConfig::default();
let redis_settings = connection_info
.redis_settings()
.clone()
.set_protocol(redis::ProtocolVersion::RESP3);
connection_info = connection_info.set_redis_settings(redis_settings);
config = config.set_cache_config(redis::caching::CacheConfig::default());
let client = redis::Client::open(connection_info)?;
let conn = client
.get_multiplexed_async_connection_with_config(&config)
.await?;
return Ok(conn);
}
let client = redis::Client::open(s)?;
let conn = client.get_multiplexed_async_connection().await?;
Ok(conn)
}
#[cfg(feature = "redis_connection_manager")]
async fn create_connection_manager(
&self,
) -> Result<redis::aio::ConnectionManager, RedisCacheBuildError> {
let s = self.resolve_connection_string()?;
#[cfg(feature = "redis_async_cache")]
if self.client_side_caching {
let mut connection_info = s.into_connection_info()?;
let redis_settings = connection_info
.redis_settings()
.clone()
.set_protocol(redis::ProtocolVersion::RESP3);
connection_info = connection_info.set_redis_settings(redis_settings);
let config = redis::aio::ConnectionManagerConfig::default()
.set_cache_config(redis::caching::CacheConfig::default());
let client = redis::Client::open(connection_info)?;
let conn = redis::aio::ConnectionManager::new_with_config(client, config).await?;
return Ok(conn);
}
let client = redis::Client::open(s)?;
let conn = redis::aio::ConnectionManager::new(client).await?;
Ok(conn)
}
pub async fn build(self) -> Result<AsyncRedisCache<K, V>, RedisCacheBuildError> {
Ok(AsyncRedisCache {
ttl: self.ttl,
refresh: self.refresh,
connection_string: ConnectionString(self.resolve_connection_string()?),
#[cfg(not(feature = "redis_connection_manager"))]
connection: self.create_multiplexed_connection().await?,
#[cfg(feature = "redis_connection_manager")]
connection: self.create_connection_manager().await?,
namespace: self.namespace,
prefix: self.prefix,
_phantom: PhantomData,
})
}
}
pub struct AsyncRedisCache<K, V> {
pub(super) ttl: Duration,
pub(super) refresh: bool,
pub(super) namespace: String,
pub(super) prefix: String,
connection_string: ConnectionString,
#[cfg(not(feature = "redis_connection_manager"))]
connection: redis::aio::MultiplexedConnection,
#[cfg(feature = "redis_connection_manager")]
connection: redis::aio::ConnectionManager,
_phantom: PhantomData<fn() -> (K, V)>,
}
impl<K, V> AsyncRedisCache<K, V>
where
K: Display + Send + Sync,
V: Serialize + DeserializeOwned + Send,
{
#[allow(clippy::new_ret_no_self)]
pub fn new<S: AsRef<str>>(prefix: S, ttl: Duration) -> AsyncRedisCacheBuilder<K, V> {
AsyncRedisCacheBuilder::new(prefix, ttl)
}
fn generate_key(&self, key: &K) -> String {
super::generate_redis_key(&self.namespace, &self.prefix, &key.to_string())
}
#[must_use]
pub fn connection_string(&self) -> String {
self.connection_string.as_str().to_string()
}
}
impl<K, V> ConcurrentCachedAsync<K, V> for AsyncRedisCache<K, V>
where
K: Display + Send + Sync,
V: Serialize + DeserializeOwned + Send,
{
type Error = RedisCacheError;
async fn cache_get(&self, key: &K) -> Result<Option<V>, Self::Error> {
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(key);
pipe.get(&key);
if self.refresh {
pipe.expire(key, super::ttl_seconds_i64(self.ttl)?).ignore();
}
let res: (Option<String>,) = pipe.query_async(&mut conn).await?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
async fn cache_set(&self, key: K, val: V) -> Result<Option<V>, Self::Error> {
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(&key);
let ttl_secs = super::ttl_seconds(self.ttl)?;
let val = CachedRedisValue::new(val);
pipe.get(&key);
pipe.set_ex::<String, String>(
key,
serde_json::to_string(&val)
.map_err(|e| RedisCacheError::CacheSerializationError { error: e })?,
ttl_secs,
)
.ignore();
let res: (Option<String>,) = pipe.query_async(&mut conn).await?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
async fn cache_remove(&self, key: &K) -> Result<Option<V>, Self::Error> {
let mut conn = self.connection.clone();
let mut pipe = redis::pipe();
let key = self.generate_key(key);
pipe.get(&key);
pipe.del::<String>(key).ignore();
let res: (Option<String>,) = pipe.query_async(&mut conn).await?;
match res.0 {
None => Ok(None),
Some(s) => {
let v: CachedRedisValue<V> = serde_json::from_str(&s).map_err(|e| {
RedisCacheError::CacheDeserializationError {
cached_value: s,
error: e,
}
})?;
Ok(Some(v.value))
}
}
}
async fn cache_delete(&self, key: &K) -> Result<bool, Self::Error> {
let mut conn = self.connection.clone();
let key = self.generate_key(key);
let removed: usize = redis::cmd("DEL").arg(key).query_async(&mut conn).await?;
Ok(removed > 0)
}
fn set_refresh_on_hit(&mut self, refresh: bool) -> bool {
let old = self.refresh;
self.refresh = refresh;
old
}
fn ttl(&self) -> Option<Duration> {
Some(self.ttl)
}
fn set_ttl(&mut self, ttl: Duration) -> Option<Duration> {
let old = self.ttl;
self.ttl = ttl;
Some(old)
}
fn unset_ttl(&mut self) -> Option<Duration> {
None
}
}
#[cfg(feature = "time_stores")]
impl<K, V> CacheTtl for AsyncRedisCache<K, V> {
fn ttl(&self) -> Option<Duration> {
Some(self.ttl)
}
fn set_ttl(&mut self, ttl: Duration) -> Option<Duration> {
let old = self.ttl;
self.ttl = ttl;
Some(old)
}
fn unset_ttl(&mut self) -> Option<Duration> {
None
}
fn refresh_on_hit(&self) -> bool {
self.refresh
}
fn set_refresh_on_hit(&mut self, refresh: bool) -> bool {
let old = self.refresh;
self.refresh = refresh;
old
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::time::Duration;
use std::thread::sleep;
fn now_millis() -> u128 {
crate::time::SystemTime::now()
.duration_since(crate::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}
#[tokio::test]
async fn test_async_redis_cache() {
let mut c: AsyncRedisCache<u32, u32> = AsyncRedisCache::new(
format!("{}:async-redis-cache-test", now_millis()),
Duration::from_secs(2),
)
.build()
.await
.unwrap();
assert!(c.cache_get(&1).await.unwrap().is_none());
assert!(c.cache_set(1, 100).await.unwrap().is_none());
assert!(c.cache_get(&1).await.unwrap().is_some());
sleep(Duration::new(2, 500_000));
assert!(c.cache_get(&1).await.unwrap().is_none());
let old = ConcurrentCachedAsync::set_ttl(&mut c, Duration::from_secs(1)).unwrap();
assert_eq!(2, old.as_secs());
assert!(c.cache_set(1, 100).await.unwrap().is_none());
assert!(c.cache_get(&1).await.unwrap().is_some());
sleep(Duration::new(1, 600_000));
assert!(c.cache_get(&1).await.unwrap().is_none());
ConcurrentCachedAsync::set_ttl(&mut c, Duration::from_secs(10)).unwrap();
assert!(c.cache_set(1, 100).await.unwrap().is_none());
assert!(c.cache_set(2, 100).await.unwrap().is_none());
assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100);
assert_eq!(c.cache_get(&1).await.unwrap().unwrap(), 100);
}
}
}
#[cfg(all(
feature = "async",
any(
feature = "redis_smol",
feature = "redis_tokio",
feature = "redis_connection_manager"
)
))]
#[cfg_attr(
docsrs,
doc(cfg(all(
feature = "async",
any(
feature = "redis_smol",
feature = "redis_tokio",
feature = "redis_connection_manager"
)
)))
)]
pub use async_redis::{AsyncRedisCache, AsyncRedisCacheBuilder};
#[cfg(test)]
mod tests {
use crate::time::Duration;
use std::thread::sleep;
use super::*;
fn now_millis() -> u128 {
crate::time::SystemTime::now()
.duration_since(crate::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}
#[test]
fn redis_cache() {
let mut c: RedisCache<u32, u32> = RedisCache::new(
format!("{}:redis-cache-test", now_millis()),
Duration::from_secs(2),
)
.namespace("in-tests:")
.build()
.unwrap();
assert!(c.cache_get(&1).unwrap().is_none());
assert!(c.cache_set(1, 100).unwrap().is_none());
assert!(c.cache_get(&1).unwrap().is_some());
sleep(Duration::new(2, 500_000));
assert!(c.cache_get(&1).unwrap().is_none());
let old = ConcurrentCached::set_ttl(&mut c, Duration::from_secs(1)).unwrap();
assert_eq!(2, old.as_secs());
assert!(c.cache_set(1, 100).unwrap().is_none());
assert!(c.cache_get(&1).unwrap().is_some());
sleep(Duration::new(1, 600_000));
assert!(c.cache_get(&1).unwrap().is_none());
ConcurrentCached::set_ttl(&mut c, Duration::from_secs(10)).unwrap();
assert!(c.cache_set(1, 100).unwrap().is_none());
assert!(c.cache_set(2, 100).unwrap().is_none());
assert_eq!(c.cache_get(&1).unwrap().unwrap(), 100);
assert_eq!(c.cache_get(&1).unwrap().unwrap(), 100);
}
#[test]
fn remove() {
let c: RedisCache<u32, u32> = RedisCache::new(
format!("{}:redis-cache-test-remove", now_millis()),
Duration::from_secs(3600),
)
.build()
.unwrap();
assert!(c.cache_set(1, 100).unwrap().is_none());
assert!(c.cache_set(2, 200).unwrap().is_none());
assert!(c.cache_set(3, 300).unwrap().is_none());
assert_eq!(100, c.cache_remove(&1).unwrap().unwrap());
}
}