use crate::time::Duration;
use crate::time::SystemTime;
use crate::{ConcurrentCacheBase, ConcurrentCacheTtl, ConcurrentCached};
use directories::BaseDirs;
use parking_lot::Mutex;
use redb::{Builder, Database, Durability, ReadableDatabase, ReadableTable, TableDefinition};
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
const TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("cached_disk_cache");
pub struct RedbCacheBuilder<K, V> {
ttl: Option<Duration>,
refresh: bool,
durable: bool,
disk_dir: Option<PathBuf>,
cache_name: Option<String>,
_phantom: PhantomData<fn() -> (K, V)>,
}
use thiserror::Error;
macro_rules! impl_from_redb {
($t:ty; $($s:ty),+ $(,)?) => {
$(
impl From<$s> for $t {
fn from(e: $s) -> Self {
<$t>::from(redb::Error::from(e))
}
}
)+
};
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum RedbCacheBuildError {
#[error("Storage error")]
Storage {
#[from]
source: redb::Error,
},
#[error(transparent)]
Build(#[from] super::BuildError),
#[error("I/O error preparing the disk cache directory")]
Io(#[from] std::io::Error),
#[error(
"invalid cache_name: must not be empty, must not contain a path separator ('/' or '\\\\'), \
must not contain a NUL byte, and must not be '.' or '..'; cache_name is used as a filename component"
)]
InvalidCacheName,
}
impl_from_redb!(
RedbCacheBuildError;
redb::DatabaseError,
redb::TransactionError,
redb::TableError,
redb::CommitError,
);
static DISK_FILE_PREFIX: &str = "cached_disk_cache";
const DISK_FILE_VERSION: u64 = 3;
impl<K, V> Default for RedbCacheBuilder<K, V>
where
K: ToString,
V: Serialize + DeserializeOwned,
{
fn default() -> Self {
Self::new()
}
}
impl<K, V> RedbCacheBuilder<K, V>
where
K: ToString,
V: Serialize + DeserializeOwned,
{
#[must_use]
pub fn new() -> RedbCacheBuilder<K, V> {
Self {
ttl: None,
refresh: false,
durable: true,
disk_dir: None,
cache_name: None,
_phantom: Default::default(),
}
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.cache_name = Some(name.into());
self
}
#[must_use]
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
#[must_use]
pub fn ttl_secs(self, secs: u64) -> Self {
self.ttl(Duration::from_secs(secs))
}
#[must_use]
pub fn ttl_millis(self, millis: u64) -> Self {
self.ttl(Duration::from_millis(millis))
}
#[must_use]
pub fn refresh_on_hit(mut self, refresh: bool) -> Self {
self.refresh = refresh;
self
}
#[must_use]
pub fn disk_directory<P: AsRef<Path>>(mut self, dir: P) -> Self {
self.disk_dir = Some(dir.as_ref().into());
self
}
#[must_use]
pub fn durable(mut self, durable: bool) -> Self {
self.durable = durable;
self
}
fn default_disk_dir_candidates() -> Vec<PathBuf> {
let exe_name = std::env::current_exe()
.ok()
.and_then(|path| {
path.file_name()
.and_then(|os_str| os_str.to_str().map(|s| format!("{}_", s)))
})
.unwrap_or_default();
let dir_prefix = format!("{}{}", exe_name, DISK_FILE_PREFIX);
let mut candidates = Vec::new();
if let Some(base_dirs) = BaseDirs::new() {
candidates.push(base_dirs.cache_dir().join(&dir_prefix));
}
candidates.push(std::env::temp_dir().join(dir_prefix));
candidates
}
fn default_disk_path() -> Result<PathBuf, std::io::Error> {
let mut last_error = None;
for disk_dir in Self::default_disk_dir_candidates() {
match std::fs::create_dir_all(&disk_dir) {
Ok(()) => return Ok(disk_dir),
Err(error) if error.kind() == ErrorKind::PermissionDenied => {
last_error = Some(error);
}
Err(error) => return Err(error),
}
}
Err(last_error.unwrap_or_else(|| {
std::io::Error::new(
ErrorKind::PermissionDenied,
"unable to create a writable default disk cache directory",
)
}))
}
pub fn build(self) -> Result<RedbCache<K, V>, RedbCacheBuildError> {
let cache_name = self
.cache_name
.ok_or(super::BuildError::MissingRequired("name"))?;
{
let n = &cache_name;
if n.is_empty()
|| n.contains('/')
|| n.contains('\\')
|| n.contains('\0')
|| n == "."
|| n == ".."
{
return Err(RedbCacheBuildError::InvalidCacheName);
}
}
if let Some(ttl) = self.ttl {
super::validate_ttl(ttl)?;
}
let cache_dir_name = format!("{}_v{}", cache_name, DISK_FILE_VERSION);
let disk_dir = match self.disk_dir {
Some(disk_dir) => {
std::fs::create_dir_all(&disk_dir)?;
disk_dir
}
None => Self::default_disk_path()?,
};
let disk_path = disk_dir.join(format!("{}.redb", cache_dir_name));
let db = Builder::new().create(&disk_path)?;
{
let wtxn = db.begin_write()?;
wtxn.open_table(TABLE)?;
wtxn.commit()?;
}
Ok(RedbCache {
ttl: Mutex::new(self.ttl),
refresh: AtomicBool::new(self.refresh),
durable: self.durable,
disk_path,
connection: Arc::new(db),
_phantom: self._phantom,
})
}
}
pub struct RedbCache<K, V> {
pub(super) ttl: Mutex<Option<Duration>>,
pub(super) refresh: AtomicBool,
durable: bool,
disk_path: PathBuf,
connection: Arc<Database>,
_phantom: PhantomData<fn() -> (K, V)>,
}
impl<K, V> std::fmt::Debug for RedbCache<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbCache")
.field("disk_path", &self.disk_path)
.field("ttl", &*self.ttl.lock())
.field("refresh", &self.refresh.load(Ordering::Relaxed))
.field("durable", &self.durable)
.finish_non_exhaustive()
}
}
impl<K, V> RedbCache<K, V>
where
K: ToString,
V: Serialize + DeserializeOwned,
{
#[must_use]
pub fn builder() -> RedbCacheBuilder<K, V> {
RedbCacheBuilder::new()
}
#[must_use]
pub fn disk_path(&self) -> &std::path::Path {
&self.disk_path
}
pub fn remove_expired_entries(&self) -> Result<usize, RedbCacheError> {
let now = SystemTime::now();
let ttl = *self.ttl.lock();
let mut expired_keys: Vec<String> = Vec::new();
{
let rtxn = self.connection.begin_read()?;
let table = rtxn.open_table(TABLE)?;
for item in table.iter()? {
let (key, value) = item?;
let raw = value.value();
let cached =
rmp_serde::from_slice::<CachedDiskValue<V>>(raw).map_err(|source| {
RedbCacheError::CacheDeserialization {
source,
cached_value: raw.to_vec(),
}
})?;
if let Some(ttl) = ttl
&& now
.duration_since(cached.created_at)
.unwrap_or(Duration::from_secs(0))
>= ttl
{
expired_keys.push(key.value().to_string());
}
}
}
if !expired_keys.is_empty() {
let wtxn = begin_write(&self.connection, self.durable)?;
{
let mut table = wtxn.open_table(TABLE)?;
for key in &expired_keys {
table.remove(key.as_str())?;
}
}
wtxn.commit()?;
}
Ok(expired_keys.len())
}
pub fn flush(&self) -> Result<(), RedbCacheError> {
redb_flush(&self.connection)
}
}
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
impl<K, V> RedbCache<K, V> {
pub async fn async_flush(&self) -> Result<(), RedbCacheError> {
let connection = self.connection.clone();
blocking::unblock(move || redb_flush(&connection)).await
}
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum RedbCacheError {
#[error("Storage error")]
Storage {
#[from]
source: redb::Error,
},
#[error("Error deserializing cached value")]
CacheDeserialization {
#[source]
source: rmp_serde::decode::Error,
cached_value: Vec<u8>,
},
#[error("Error serializing cached value")]
CacheSerialization {
#[from]
source: rmp_serde::encode::Error,
},
}
impl_from_redb!(
RedbCacheError;
redb::TransactionError,
redb::TableError,
redb::StorageError,
redb::CommitError,
redb::SetDurabilityError,
);
#[derive(serde::Serialize, serde::Deserialize)]
struct CachedDiskValue<V> {
value: V,
created_at: SystemTime,
}
impl<V> CachedDiskValue<V> {
fn new(value: V) -> Self {
Self {
value,
created_at: SystemTime::now(),
}
}
fn refresh_created_at(&mut self) {
self.created_at = SystemTime::now();
}
}
#[derive(serde::Serialize)]
struct CachedDiskValueRef<'a, V> {
value: &'a V,
created_at: SystemTime,
}
impl<'a, V> CachedDiskValueRef<'a, V> {
fn new(value: &'a V) -> Self {
Self {
value,
created_at: SystemTime::now(),
}
}
}
fn begin_write(
connection: &Database,
durable: bool,
) -> Result<redb::WriteTransaction, RedbCacheError> {
let mut wtxn = connection.begin_write()?;
if !durable {
wtxn.set_durability(Durability::None)?;
}
Ok(wtxn)
}
fn disk_cache_get<V>(
connection: &Database,
key: &str,
ttl: Option<Duration>,
refresh: bool,
durable: bool,
) -> Result<Option<V>, RedbCacheError>
where
V: Serialize + DeserializeOwned,
{
let mut cached = {
let rtxn = connection.begin_read()?;
let table = rtxn.open_table(TABLE)?;
let Some(guard) = table.get(key)? else {
return Ok(None);
};
let raw = guard.value();
rmp_serde::from_slice::<CachedDiskValue<V>>(raw).map_err(|source| {
RedbCacheError::CacheDeserialization {
source,
cached_value: raw.to_vec(),
}
})?
};
if let Some(ttl) = ttl {
if SystemTime::now()
.duration_since(cached.created_at)
.unwrap_or(Duration::from_secs(0))
< ttl
{
if refresh {
cached.refresh_created_at();
let serialized = rmp_serde::to_vec(&cached)?;
let wtxn = begin_write(connection, durable)?;
{
let mut table = wtxn.open_table(TABLE)?;
table.insert(key, serialized.as_slice())?;
}
wtxn.commit()?;
}
Ok(Some(cached.value))
} else {
let wtxn = begin_write(connection, durable)?;
{
let mut table = wtxn.open_table(TABLE)?;
table.remove(key)?;
}
wtxn.commit()?;
Ok(None)
}
} else {
Ok(Some(cached.value))
}
}
fn disk_cache_set<V>(
connection: &Database,
key: &str,
serialized: Vec<u8>,
durable: bool,
) -> Result<Option<V>, RedbCacheError>
where
V: DeserializeOwned,
{
let wtxn = begin_write(connection, durable)?;
let previous_bytes: Option<Vec<u8>> = {
let mut table = wtxn.open_table(TABLE)?;
table
.insert(key, serialized.as_slice())?
.map(|guard| guard.value().to_vec())
};
wtxn.commit()?;
Ok(previous_bytes
.and_then(|bytes| rmp_serde::from_slice::<CachedDiskValue<V>>(&bytes).ok())
.map(|cached| cached.value))
}
fn disk_cache_remove<V>(
connection: &Database,
key: &str,
ttl: Option<Duration>,
durable: bool,
) -> Result<Option<V>, RedbCacheError>
where
V: DeserializeOwned,
{
let wtxn = begin_write(connection, durable)?;
let removed_bytes: Option<Vec<u8>> = {
let mut table = wtxn.open_table(TABLE)?;
table.remove(key)?.map(|guard| guard.value().to_vec())
};
wtxn.commit()?;
let removed =
removed_bytes.and_then(|bytes| rmp_serde::from_slice::<CachedDiskValue<V>>(&bytes).ok());
let result = if let Some(cached) = removed {
if let Some(ttl) = ttl {
if SystemTime::now()
.duration_since(cached.created_at)
.unwrap_or(Duration::from_secs(0))
< ttl
{
Some(cached.value)
} else {
None
}
} else {
Some(cached.value)
}
} else {
None
};
Ok(result)
}
fn disk_cache_remove_entry<V>(
connection: &Database,
key: &str,
durable: bool,
) -> Result<Option<V>, RedbCacheError>
where
V: DeserializeOwned,
{
let wtxn = begin_write(connection, durable)?;
let removed_bytes: Option<Vec<u8>> = {
let mut table = wtxn.open_table(TABLE)?;
table.remove(key)?.map(|guard| guard.value().to_vec())
};
wtxn.commit()?;
Ok(removed_bytes
.and_then(|bytes| rmp_serde::from_slice::<CachedDiskValue<V>>(&bytes).ok())
.map(|cached| cached.value))
}
fn disk_cache_delete(
connection: &Database,
key: &str,
durable: bool,
) -> Result<bool, RedbCacheError> {
let wtxn = begin_write(connection, durable)?;
let removed = {
let mut table = wtxn.open_table(TABLE)?;
table.remove(key)?.is_some()
};
wtxn.commit()?;
Ok(removed)
}
fn disk_cache_clear(connection: &Database, durable: bool) -> Result<(), RedbCacheError> {
let wtxn = begin_write(connection, durable)?;
wtxn.delete_table(TABLE)?;
wtxn.open_table(TABLE)?;
wtxn.commit()?;
Ok(())
}
fn redb_flush(connection: &Database) -> Result<(), RedbCacheError> {
let mut wtxn = connection.begin_write()?;
wtxn.set_durability(Durability::Immediate)?;
wtxn.commit()?;
Ok(())
}
impl<K, V> ConcurrentCacheBase for RedbCache<K, V> {
type Error = RedbCacheError;
}
impl<K, V> ConcurrentCacheTtl for RedbCache<K, V> {
fn ttl(&self) -> Option<Duration> {
*self.ttl.lock()
}
fn set_ttl(&self, ttl: Duration) -> Option<Duration> {
let mut guard = self.ttl.lock();
if ttl.is_zero() {
guard.take()
} else {
guard.replace(ttl)
}
}
fn unset_ttl(&self) -> Option<Duration> {
self.ttl.lock().take()
}
fn refresh_on_hit(&self) -> bool {
self.refresh.load(Ordering::Relaxed)
}
fn set_refresh_on_hit(&self, refresh: bool) -> bool {
self.refresh.swap(refresh, Ordering::Relaxed)
}
}
impl<K, V> ConcurrentCached<K, V> for RedbCache<K, V>
where
K: ToString + Clone,
V: Serialize + DeserializeOwned,
{
fn cache_get(&self, key: &K) -> Result<Option<V>, RedbCacheError> {
let ttl = *self.ttl.lock();
let refresh = self.refresh.load(Ordering::Relaxed);
disk_cache_get(
&self.connection,
&key.to_string(),
ttl,
refresh,
self.durable,
)
}
fn cache_set(&self, key: K, value: V) -> Result<Option<V>, RedbCacheError> {
let serialized = rmp_serde::to_vec(&CachedDiskValue::new(value))?;
disk_cache_set(&self.connection, &key.to_string(), serialized, self.durable)
}
fn cache_remove(&self, key: &K) -> Result<Option<V>, RedbCacheError> {
let ttl = *self.ttl.lock();
disk_cache_remove(&self.connection, &key.to_string(), ttl, self.durable)
}
fn cache_remove_entry(&self, key: &K) -> Result<Option<(K, V)>, Self::Error> {
disk_cache_remove_entry(&self.connection, &key.to_string(), self.durable)
.map(|opt| opt.map(|v| (key.clone(), v)))
}
fn cache_delete(&self, key: &K) -> Result<bool, RedbCacheError> {
disk_cache_delete(&self.connection, &key.to_string(), self.durable)
}
fn cache_clear(&self) -> Result<(), RedbCacheError> {
disk_cache_clear(&self.connection, self.durable)
}
fn cache_reset(&self) -> Result<(), RedbCacheError> {
disk_cache_clear(&self.connection, self.durable)
}
}
impl<K, V> crate::SerializeCached<K, V> for RedbCache<K, V>
where
K: ToString + Clone,
V: Serialize + DeserializeOwned,
{
fn cache_set_ref(&self, key: &K, value: &V) -> Result<Option<V>, RedbCacheError> {
let serialized = rmp_serde::to_vec(&CachedDiskValueRef::new(value))?;
disk_cache_set(&self.connection, &key.to_string(), serialized, self.durable)
}
}
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
impl<K, V> crate::ConcurrentCachedAsync<K, V> for RedbCache<K, V>
where
K: ToString + Clone + Send + Sync,
V: Serialize + DeserializeOwned + Send + 'static,
{
async fn async_cache_get(&self, key: &K) -> Result<Option<V>, RedbCacheError> {
let connection = self.connection.clone();
let key = key.to_string();
let (ttl, refresh, durable) = (
*self.ttl.lock(),
self.refresh.load(Ordering::Relaxed),
self.durable,
);
blocking::unblock(move || disk_cache_get::<V>(&connection, &key, ttl, refresh, durable))
.await
}
async fn async_cache_set(&self, key: K, value: V) -> Result<Option<V>, RedbCacheError> {
let connection = self.connection.clone();
let key = key.to_string();
let durable = self.durable;
let serialized = rmp_serde::to_vec(&CachedDiskValue::new(value))?;
blocking::unblock(move || disk_cache_set::<V>(&connection, &key, serialized, durable)).await
}
async fn async_cache_remove(&self, key: &K) -> Result<Option<V>, RedbCacheError> {
let connection = self.connection.clone();
let key = key.to_string();
let (ttl, durable) = (*self.ttl.lock(), self.durable);
blocking::unblock(move || disk_cache_remove::<V>(&connection, &key, ttl, durable)).await
}
async fn async_cache_remove_entry(&self, key: &K) -> Result<Option<(K, V)>, Self::Error> {
let connection = self.connection.clone();
let key_str = key.to_string();
let durable = self.durable;
let v: Option<V> =
blocking::unblock(move || disk_cache_remove_entry::<V>(&connection, &key_str, durable))
.await?;
Ok(v.map(|v| (key.clone(), v)))
}
async fn async_cache_delete(&self, key: &K) -> Result<bool, RedbCacheError> {
let connection = self.connection.clone();
let key = key.to_string();
let durable = self.durable;
blocking::unblock(move || disk_cache_delete(&connection, &key, durable)).await
}
async fn async_cache_clear(&self) -> Result<(), RedbCacheError> {
let connection = self.connection.clone();
let durable = self.durable;
blocking::unblock(move || disk_cache_clear(&connection, durable)).await
}
async fn async_cache_reset(&self) -> Result<(), RedbCacheError> {
let connection = self.connection.clone();
let durable = self.durable;
blocking::unblock(move || disk_cache_clear(&connection, durable)).await
}
}
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
impl<K, V> crate::SerializeCachedAsync<K, V> for RedbCache<K, V>
where
K: ToString + Clone + Send + Sync,
V: Serialize + DeserializeOwned + Send + 'static,
{
fn async_cache_set_ref(
&self,
key: &K,
value: &V,
) -> impl std::future::Future<Output = Result<Option<V>, RedbCacheError>> + Send {
let connection = self.connection.clone();
let key = key.to_string();
let durable = self.durable;
let serialized = rmp_serde::to_vec(&CachedDiskValueRef::new(value))
.map_err(|source| RedbCacheError::CacheSerialization { source });
async move {
let serialized = serialized?;
blocking::unblock(move || disk_cache_set::<V>(&connection, &key, serialized, durable))
.await
}
}
}
#[cfg(test)]
mod tests {
use crate::time::Duration;
use googletest::{
assert_that,
matchers::{anything, eq, none, ok, some},
};
use std::thread::sleep;
use tempfile::TempDir;
use super::*;
macro_rules! temp_dir {
() => {
TempDir::new().expect("Error creating temp dir")
};
}
#[test]
fn ttl_secs_and_ttl_millis_set_duration() {
let b = RedbCache::<u32, u32>::builder()
.name("ttl-secs-builder")
.ttl_secs(7);
assert_eq!(b.ttl, Some(Duration::from_secs(7)));
let b = RedbCache::<u32, u32>::builder()
.name("ttl-millis-builder")
.ttl_millis(250);
assert_eq!(b.ttl, Some(Duration::from_millis(250)));
}
#[test]
fn ttl_setters_override_last_writer_wins() {
let b = RedbCache::<u32, u32>::builder()
.name("ttl-override-a")
.ttl(Duration::from_secs(10))
.ttl_secs(5);
assert_eq!(b.ttl, Some(Duration::from_secs(5)));
let b = RedbCache::<u32, u32>::builder()
.name("ttl-override-b")
.ttl_secs(10)
.ttl_millis(500);
assert_eq!(b.ttl, Some(Duration::from_millis(500)));
let b = RedbCache::<u32, u32>::builder()
.name("ttl-override-c")
.ttl_millis(500)
.ttl(Duration::from_secs(3));
assert_eq!(b.ttl, Some(Duration::from_secs(3)));
}
#[test]
fn new_returns_ready_cache_via_builder_with_ttl_secs() {
let dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("ttl-secs-roundtrip")
.disk_directory(dir.path())
.ttl_secs(60)
.build()
.expect("build must succeed");
assert_eq!(cache.cache_set(1, 100).unwrap(), None);
assert_eq!(cache.cache_get(&1).unwrap(), Some(100));
}
#[test]
fn set_ttl_zero_disables_expiry() {
let dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("set-ttl-zero-disables")
.disk_directory(dir.path())
.ttl_millis(20)
.build()
.expect("build must succeed");
assert_eq!(cache.cache_set(1, 100).unwrap(), None);
assert_eq!(
cache.set_ttl(Duration::ZERO),
Some(Duration::from_millis(20))
);
assert_eq!(cache.ttl(), None);
std::thread::sleep(Duration::from_millis(60));
assert_eq!(cache.cache_get(&1).unwrap(), Some(100));
}
fn raw_insert(
cache: &RedbCache<u32, impl Serialize + DeserializeOwned>,
key: &str,
value: Vec<u8>,
) {
let wtxn = cache
.connection
.begin_write()
.expect("error beginning write txn");
{
let mut table = wtxn.open_table(TABLE).expect("error opening table");
table
.insert(key, value.as_slice())
.expect("error inserting fixture");
}
wtxn.commit().expect("error committing fixture");
}
fn raw_get(
cache: &RedbCache<u32, impl Serialize + DeserializeOwned>,
key: &str,
) -> Option<Vec<u8>> {
let rtxn = cache
.connection
.begin_read()
.expect("error beginning read txn");
let table = rtxn.open_table(TABLE).expect("error opening table");
table
.get(key)
.expect("error reading fixture")
.map(|guard| guard.value().to_vec())
}
fn now_millis() -> u128 {
crate::time::SystemTime::now()
.duration_since(crate::time::UNIX_EPOCH)
.unwrap()
.as_millis()
}
#[derive(Debug)]
struct SerializeFailsAfterDeserialize {
fail: bool,
}
impl serde::Serialize for SerializeFailsAfterDeserialize {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if self.fail {
Err(serde::ser::Error::custom("intentional serialize failure"))
} else {
serializer.serialize_bool(false)
}
}
}
impl<'de> serde::Deserialize<'de> for SerializeFailsAfterDeserialize {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let _ = bool::deserialize(deserializer)?;
Ok(Self { fail: true })
}
}
const TEST_KEY: u32 = 1;
const TEST_VAL: u32 = 100;
const TEST_VAL_1: u32 = 200;
#[test]
fn cache_get_returns_serialize_error_when_refresh_fails() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, SerializeFailsAfterDeserialize> = RedbCache::builder()
.name("serialize_error_on_refresh")
.disk_directory(tmp_dir.path())
.ttl(Duration::from_secs(10))
.refresh_on_hit(true)
.build()
.expect("error building disk cache");
let cached = CachedDiskValue::new(SerializeFailsAfterDeserialize { fail: false });
raw_insert(
&cache,
&TEST_KEY.to_string(),
rmp_serde::to_vec(&cached).expect("error serializing fixture"),
);
assert!(matches!(
cache.cache_get(&TEST_KEY),
Err(RedbCacheError::CacheSerialization { .. })
));
}
#[test]
fn cache_get_returns_decode_error_for_corrupted_value() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("corrupted-cache-get")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert!(matches!(
cache.cache_get(&TEST_KEY),
Err(RedbCacheError::CacheDeserialization { .. })
));
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_some());
}
#[test]
fn cache_delete_removes_corrupted_value_without_decoding() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("corrupted-cache-delete")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert!(cache.cache_delete(&TEST_KEY).unwrap());
assert!(!cache.cache_delete(&TEST_KEY).unwrap());
assert_that!(cache.cache_get(&TEST_KEY), ok(none()));
}
#[test]
fn cache_set_overwrites_corrupted_value() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("corrupted-cache-set")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert_that!(cache.cache_set(TEST_KEY, TEST_VAL), ok(none()));
assert_that!(cache.cache_get(&TEST_KEY), ok(some(eq(&TEST_VAL))));
}
#[test]
fn cache_remove_removes_corrupted_value() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("corrupted-cache-remove")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert_that!(cache.cache_remove(&TEST_KEY), ok(none()));
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_none());
}
#[test]
fn cache_remove_entry_round_trips_and_removes_corrupted_value() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("remove-entry-roundtrip")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
assert_eq!(
cache.cache_remove_entry(&TEST_KEY).unwrap(),
Some((TEST_KEY, TEST_VAL))
);
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_none());
assert_eq!(cache.cache_remove_entry(&TEST_KEY).unwrap(), None);
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert_that!(cache.cache_remove_entry(&TEST_KEY), ok(none()));
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_none());
}
#[test]
fn cache_remove_entry_returns_expired_but_present_entry() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("remove-entry-expired")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN_1_SEC)
.build()
.expect("error building disk cache");
cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
cache.cache_set(2, TEST_VAL_1).unwrap();
sleep(LIFE_SPAN_1_SEC + Duration::from_millis(50));
assert_eq!(cache.cache_remove(&TEST_KEY).unwrap(), None);
assert_eq!(cache.cache_remove_entry(&2).unwrap(), Some((2, TEST_VAL_1)));
}
#[test]
fn flush_forces_durable_commit_and_preserves_data() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("flush-test")
.disk_directory(tmp_dir.path())
.durable(false)
.build()
.expect("error building disk cache");
cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
cache.cache_set(2, TEST_VAL_1).unwrap();
cache.flush().expect("flush should succeed");
cache.flush().expect("flush is idempotent");
assert_that!(cache.cache_get(&TEST_KEY), ok(some(eq(&TEST_VAL))));
assert_that!(cache.cache_get(&2), ok(some(eq(&TEST_VAL_1))));
drop(cache);
let reopened: RedbCache<u32, u32> = RedbCache::builder()
.name("flush-test")
.disk_directory(tmp_dir.path())
.build()
.expect("error re-opening cache");
assert_that!(reopened.cache_get(&TEST_KEY), ok(some(eq(&TEST_VAL))));
let durable: RedbCache<u32, u32> = RedbCache::builder()
.name("flush-test-durable")
.disk_directory(tmp_dir.path())
.durable(true)
.build()
.unwrap();
durable
.flush()
.expect("flush on a durable/empty cache should succeed");
}
#[test]
fn flush_makes_durability_none_writes_visible_to_a_fresh_instance() {
const NAME: &str = "flush-visibility";
let file_name = format!("{NAME}_v{DISK_FILE_VERSION}.redb");
let dir_a = temp_dir!();
let src = dir_a.path().join(&file_name);
let a: RedbCache<u32, u32> = RedbCache::builder()
.name(NAME)
.disk_directory(dir_a.path())
.durable(false) .build()
.unwrap();
a.cache_set(TEST_KEY, TEST_VAL).unwrap();
let dir_before = temp_dir!();
std::fs::copy(&src, dir_before.path().join(&file_name)).unwrap();
let before: RedbCache<u32, u32> = RedbCache::builder()
.name(NAME)
.disk_directory(dir_before.path())
.build()
.unwrap();
assert_that!(
before.cache_get(&TEST_KEY),
ok(none()),
"an un-flushed Durability::None write must not be durable"
);
a.flush().unwrap();
let dir_after = temp_dir!();
std::fs::copy(&src, dir_after.path().join(&file_name)).unwrap();
let after: RedbCache<u32, u32> = RedbCache::builder()
.name(NAME)
.disk_directory(dir_after.path())
.build()
.unwrap();
assert_that!(
after.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"after flush the write is durable and visible to a fresh instance"
);
}
#[test]
fn remove_expired_entries_returns_decode_error_for_corrupted_value() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("corrupted-sweep")
.disk_directory(tmp_dir.path())
.ttl(Duration::from_secs(1))
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert!(matches!(
cache.remove_expired_entries(),
Err(RedbCacheError::CacheDeserialization { .. })
));
}
#[test]
fn remove_expired_entries_returns_count_of_removed_entries() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("sweep-count")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN_1_SEC)
.build()
.expect("error building disk cache");
cache.cache_set(1, 10).unwrap();
cache.cache_set(2, 20).unwrap();
sleep(LIFE_SPAN_1_SEC + Duration::from_millis(50));
cache.cache_set(3, 30).unwrap();
assert_eq!(cache.remove_expired_entries().unwrap(), 2);
assert!(raw_get(&cache, &3u32.to_string()).is_some());
assert!(raw_get(&cache, &1u32.to_string()).is_none());
assert!(raw_get(&cache, &2u32.to_string()).is_none());
}
const LIFE_SPAN_2_SECS: Duration = Duration::from_secs(2);
const LIFE_SPAN_1_SEC: Duration = Duration::from_secs(1);
#[googletest::test]
fn cache_get_after_cache_remove_returns_none() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache")
.disk_directory(tmp_dir.path())
.build()
.unwrap();
let cached = cache.cache_get(&TEST_KEY).unwrap();
assert_that!(
cached,
none(),
"Getting a non-existent key-value should return None"
);
let cached = cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
assert_that!(cached, none(), "Setting a new key-value should return None");
let cached = cache.cache_set(TEST_KEY, TEST_VAL_1).unwrap();
assert_that!(
cached,
some(eq(TEST_VAL)),
"Setting an existing key-value should return the old value"
);
let cached = cache.cache_get(&TEST_KEY).unwrap();
assert_that!(
cached,
some(eq(TEST_VAL_1)),
"Getting an existing key-value should return the value"
);
let cached = cache.cache_remove(&TEST_KEY).unwrap();
assert_that!(
cached,
some(eq(TEST_VAL_1)),
"Removing an existing key-value should return the value"
);
let cached = cache.cache_get(&TEST_KEY).unwrap();
assert_that!(cached, none(), "Getting a removed key should return None");
drop(cache);
}
#[googletest::test]
fn cache_clear_empties_the_table() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache-clear")
.disk_directory(tmp_dir.path())
.build()
.unwrap();
cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
cache.cache_set(TEST_KEY + 1, TEST_VAL_1).unwrap();
cache.cache_clear().expect("error clearing cache");
assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a key after cache_clear should return None"
);
assert_that!(
cache.cache_get(&(TEST_KEY + 1)),
ok(none()),
"Getting a second key after cache_clear should return None"
);
}
#[googletest::test]
fn values_expire_when_lifespan_elapses_returning_none() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN_2_SECS)
.build()
.unwrap();
assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a non-existent key-value should return None"
);
assert_that!(
cache.cache_set(TEST_KEY, 100),
ok(none()),
"Setting a new key-value should return None"
);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(anything())),
"Getting an existing key-value before it expires should return the value"
);
sleep(LIFE_SPAN_2_SECS);
sleep(Duration::from_micros(500)); assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting an expired key-value should return None"
);
}
#[googletest::test]
fn set_ttl_to_a_different_ttl_is_respected() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN_2_SECS)
.build()
.unwrap();
assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a non-existent key-value should return None"
);
assert_that!(
cache.cache_set(TEST_KEY, TEST_VAL),
ok(none()),
"Setting a new key-value should return None"
);
sleep(LIFE_SPAN_2_SECS);
sleep(Duration::from_micros(500)); assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting an expired key-value should return None"
);
let old_from_setting_lifespan =
ConcurrentCacheTtl::set_ttl(&cache, LIFE_SPAN_1_SEC).expect("error setting new ttl");
assert_that!(
old_from_setting_lifespan,
eq(LIFE_SPAN_2_SECS),
"Setting ttl should return the old ttl"
);
assert_that!(
cache.cache_set(TEST_KEY, TEST_VAL),
ok(none()),
"Setting a previously expired key-value should return None"
);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a newly set (previously expired) key-value should return the value"
);
sleep(LIFE_SPAN_1_SEC);
sleep(Duration::from_micros(500)); assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting an expired key-value should return None"
);
ConcurrentCacheTtl::set_ttl(&cache, Duration::from_secs(10)).expect("error setting ttl");
assert_that!(
cache.cache_set(TEST_KEY, TEST_VAL),
ok(none()),
"Setting a previously expired key-value should return None"
);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a newly set (previously expired) key-value should return the value"
);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting the same value again should return the value"
);
}
#[googletest::test]
fn refreshing_on_cache_get_delays_cache_expiry() {
const LIFE_SPAN: Duration = LIFE_SPAN_2_SECS;
const HALF_LIFE_SPAN: Duration = LIFE_SPAN_1_SEC;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN)
.refresh_on_hit(true) .build()
.unwrap();
assert_that!(cache.cache_set(TEST_KEY, TEST_VAL), ok(none()));
sleep(HALF_LIFE_SPAN);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a value before expiry should return the value"
);
sleep(HALF_LIFE_SPAN);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a value after the initial expiry should return the value as we have refreshed"
);
sleep(LIFE_SPAN);
assert_that!(
cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a value after the refreshed expiry should return None"
);
drop(cache);
}
#[googletest::test]
fn does_not_break_when_constructed_using_default_disk_directory() {
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name(format!("{}:disk-cache-test-default-dir", now_millis()))
.build()
.unwrap();
let cached = cache.cache_get(&TEST_KEY).unwrap();
assert_that!(
cached,
none(),
"Getting a non-existent key-value should return None"
);
let cached = cache.cache_set(TEST_KEY, TEST_VAL).unwrap();
assert_that!(cached, none(), "Setting a new key-value should return None");
let cached = cache.cache_set(TEST_KEY, TEST_VAL_1).unwrap();
assert_that!(
cached,
some(eq(TEST_VAL)),
"Setting an existing key-value should return the old value"
);
std::fs::remove_file(cache.disk_path()).expect("error in clean up removing the cache file")
}
mod set_durable {
mod persistence_across_reopen {
use super::super::*;
fn check_on_recovered_cache(
set_durable: bool,
run_on_original_cache: fn(&RedbCache<u32, u32>) -> (),
run_on_recovered_cache: fn(&RedbCache<u32, u32>) -> (),
) {
let cache_tmp_dir = temp_dir!();
const CACHE_NAME: &str = "test-cache";
{
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name(CACHE_NAME)
.disk_directory(cache_tmp_dir.path())
.durable(set_durable) .build()
.unwrap();
run_on_original_cache(&cache);
}
let recovered_cache: RedbCache<u32, u32> = RedbCache::builder()
.name(CACHE_NAME)
.disk_directory(cache_tmp_dir.path())
.durable(set_durable)
.build()
.expect("error re-opening cache from persisted file");
run_on_recovered_cache(&recovered_cache);
}
mod changes_persist_after_recovery {
use super::*;
#[googletest::test]
fn for_cache_set() {
check_on_recovered_cache(
true,
|cache| {
cache
.cache_set(TEST_KEY, TEST_VAL)
.expect("error setting cache in assemble stage");
},
|recovered_cache| {
assert_that!(
recovered_cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a set key should return the value after re-opening the file"
);
},
)
}
#[googletest::test]
fn for_cache_remove() {
check_on_recovered_cache(
true,
|cache| {
cache
.cache_set(TEST_KEY, TEST_VAL)
.expect("error setting cache in assemble stage");
cache
.cache_remove(&TEST_KEY)
.expect("error removing cache in assemble stage");
},
|recovered_cache| {
assert_that!(
recovered_cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a removed key should return None after re-opening the file"
);
},
)
}
}
mod changes_persist_after_recovery_non_durable {
use super::*;
#[googletest::test]
fn for_cache_set() {
check_on_recovered_cache(
false,
|cache| {
cache
.cache_set(TEST_KEY, TEST_VAL)
.expect("error setting cache in assemble stage");
},
|recovered_cache| {
assert_that!(
recovered_cache.cache_get(&TEST_KEY),
ok(some(eq(&TEST_VAL))),
"Getting a set key should return the value after re-opening the file"
);
},
)
}
#[googletest::test]
fn for_cache_remove() {
check_on_recovered_cache(
false,
|cache| {
cache
.cache_set(TEST_KEY, TEST_VAL)
.expect("error setting cache in assemble stage");
cache
.cache_remove(&TEST_KEY)
.expect("error removing cache in assemble stage");
},
|recovered_cache| {
assert_that!(
recovered_cache.cache_get(&TEST_KEY),
ok(none()),
"Getting a removed key should return None after re-opening the file"
);
},
)
}
}
}
}
#[cfg(feature = "async")]
#[tokio::test]
async fn async_path_respects_ttl_and_round_trips() {
use crate::ConcurrentCachedAsync;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("test-cache-async")
.disk_directory(tmp_dir.path())
.ttl(LIFE_SPAN_1_SEC)
.build()
.unwrap();
assert_eq!(
cache.async_cache_set(TEST_KEY, TEST_VAL).await.unwrap(),
None
);
assert_eq!(
cache.async_cache_get(&TEST_KEY).await.unwrap(),
Some(TEST_VAL)
);
tokio::time::sleep(LIFE_SPAN_1_SEC + Duration::from_millis(50)).await;
assert_eq!(cache.async_cache_get(&TEST_KEY).await.unwrap(), None);
assert_eq!(
cache.async_cache_set(TEST_KEY, TEST_VAL).await.unwrap(),
None
);
assert_eq!(
cache.async_cache_remove(&TEST_KEY).await.unwrap(),
Some(TEST_VAL)
);
assert!(!cache.async_cache_delete(&TEST_KEY).await.unwrap());
cache.async_cache_set(TEST_KEY, TEST_VAL).await.unwrap();
cache.async_cache_set(2, TEST_VAL_1).await.unwrap();
cache.async_cache_clear().await.unwrap();
assert_eq!(cache.async_cache_get(&TEST_KEY).await.unwrap(), None);
assert_eq!(cache.async_cache_get(&2).await.unwrap(), None);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn async_cache_remove_entry_round_trips_and_removes_corrupted_value() {
use crate::ConcurrentCachedAsync;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("remove-entry-async")
.disk_directory(tmp_dir.path())
.build()
.unwrap();
cache.async_cache_set(TEST_KEY, TEST_VAL).await.unwrap();
assert_eq!(
cache.async_cache_remove_entry(&TEST_KEY).await.unwrap(),
Some((TEST_KEY, TEST_VAL))
);
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_none());
assert_eq!(
cache.async_cache_remove_entry(&TEST_KEY).await.unwrap(),
None
);
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
assert_eq!(
cache.async_cache_remove_entry(&TEST_KEY).await.unwrap(),
None
);
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_none());
}
#[test]
fn cache_set_ref_round_trips() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("set-ref-roundtrip")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
let key = TEST_KEY;
let value = TEST_VAL;
assert_that!(
crate::SerializeCached::cache_set_ref(&cache, &key, &value),
ok(none()),
"cache_set_ref on a new key should return None"
);
assert_that!(
cache.cache_get(&key),
ok(some(eq(&value))),
"cache_get after cache_set_ref should return the written value"
);
let value2 = TEST_VAL_1;
assert_that!(
crate::SerializeCached::cache_set_ref(&cache, &key, &value2),
ok(some(eq(&value))),
"cache_set_ref over an existing entry should return the old value"
);
assert_that!(
cache.cache_get(&key),
ok(some(eq(&value2))),
"cache_get should return the most recently set value"
);
}
#[test]
fn debug_smoke_exposes_non_secret_fields_only() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("debug-smoke")
.disk_directory(tmp_dir.path())
.ttl_secs(60)
.refresh_on_hit(true)
.build()
.expect("error building disk cache");
let s = format!("{:?}", cache);
assert!(!s.is_empty(), "Debug output must be non-empty");
assert!(s.contains("RedbCache"), "Debug must name the type: {s}");
assert!(s.contains("ttl"), "Debug must show ttl: {s}");
assert!(s.contains("refresh"), "Debug must show refresh: {s}");
assert!(s.contains("durable"), "Debug must show durable: {s}");
assert!(
s.contains(".."),
"Debug must be non-exhaustive (trailing ..): {s}"
);
assert!(
!s.contains("connection"),
"Debug must not expose the connection handle: {s}"
);
assert!(
!s.contains("redis://") && !s.contains("rediss://"),
"Debug must not contain a connection scheme: {s}"
);
}
#[test]
fn build_rejects_cache_name_with_path_separator_or_dot_components() {
let tmp_dir = temp_dir!();
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name("")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"empty cache_name must return InvalidCacheName"
);
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name("bad/name")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"cache_name containing '/' must return InvalidCacheName"
);
assert!(
RedbCache::<u32, u32>::builder()
.name("ok:name")
.disk_directory(tmp_dir.path())
.build()
.is_ok(),
"cache_name containing ':' must be accepted"
);
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name("bad\\name")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"cache_name containing '\\\\' must return InvalidCacheName"
);
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name("..")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"cache_name '..' must return InvalidCacheName"
);
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name(".")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"cache_name '.' must return InvalidCacheName"
);
assert!(
RedbCache::<u32, u32>::builder()
.name("valid-cache-name")
.disk_directory(tmp_dir.path())
.build()
.is_ok(),
"a valid cache_name must build successfully"
);
}
#[test]
fn build_rejects_cache_name_with_nul_byte() {
let tmp_dir = temp_dir!();
assert!(
matches!(
RedbCache::<u32, u32>::builder()
.name("bad\0name")
.disk_directory(tmp_dir.path())
.build(),
Err(RedbCacheBuildError::InvalidCacheName)
),
"cache_name containing a NUL byte must return InvalidCacheName"
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn async_cache_set_ref_round_trips() {
use crate::SerializeCachedAsync;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("set-ref-roundtrip-async")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
let key = TEST_KEY;
let value = TEST_VAL;
assert_eq!(
cache.async_cache_set_ref(&key, &value).await.unwrap(),
None,
"async_cache_set_ref on a new key should return None"
);
use crate::ConcurrentCachedAsync;
assert_eq!(
cache.async_cache_get(&key).await.unwrap(),
Some(value),
"async_cache_get after async_cache_set_ref should return the written value"
);
let value2 = TEST_VAL_1;
assert_eq!(
cache.async_cache_set_ref(&key, &value2).await.unwrap(),
Some(value),
"async_cache_set_ref over an existing entry should return the old value"
);
assert_eq!(
cache.async_cache_get(&key).await.unwrap(),
Some(value2),
"async_cache_get should return the most recently set value"
);
}
#[cfg(feature = "async")]
#[tokio::test]
async fn async_flush_succeeds_and_preserves_data() {
use crate::ConcurrentCachedAsync;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("flush-test-async")
.disk_directory(tmp_dir.path())
.build()
.unwrap();
cache.async_cache_set(TEST_KEY, TEST_VAL).await.unwrap();
cache
.async_flush()
.await
.expect("async_flush should succeed");
assert_eq!(
cache.async_cache_get(&TEST_KEY).await.unwrap(),
Some(TEST_VAL)
);
}
#[cfg(feature = "async")]
#[test]
fn async_redb_cache_works_under_futures_block_on() {
use crate::ConcurrentCachedAsync;
use futures::executor::block_on;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("futures-block-on-test")
.disk_directory(tmp_dir.path())
.build()
.unwrap();
let prev = block_on(cache.async_cache_set(TEST_KEY, TEST_VAL)).unwrap();
assert_eq!(prev, None, "first set returns no prior value");
let got = block_on(cache.async_cache_get(&TEST_KEY)).unwrap();
assert_eq!(got, Some(TEST_VAL), "get returns the value that was set");
let removed = block_on(cache.async_cache_remove(&TEST_KEY)).unwrap();
assert_eq!(
removed,
Some(TEST_VAL),
"remove returns the previously set value"
);
let after = block_on(cache.async_cache_get(&TEST_KEY)).unwrap();
assert_eq!(after, None, "get after remove returns None");
block_on(cache.async_flush()).expect("async_flush under futures::block_on should succeed");
}
#[test]
fn build_error_storage_variant_name_and_display() {
let err = RedbCacheBuildError::Storage {
source: redb::Error::Io(std::io::Error::other("synthetic redb io error")),
};
let display = err.to_string();
assert!(
display.to_lowercase().contains("storage"),
"Storage variant display must mention storage: {display}"
);
assert!(
!display.to_lowercase().contains("connection"),
"Storage variant display must not mention connection: {display}"
);
}
#[test]
fn cache_get_decode_error_carries_raw_bytes() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("decode-error-carries-bytes")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
let corrupt: Vec<u8> = vec![0xc1, 0xc1, 0xc1];
raw_insert(&cache, &TEST_KEY.to_string(), corrupt.clone());
match cache.cache_get(&TEST_KEY) {
Err(RedbCacheError::CacheDeserialization {
cached_value,
source: _,
}) => {
assert_eq!(
cached_value, corrupt,
"cached_value must carry the exact bytes that failed to decode"
);
}
other => panic!("expected CacheDeserialization, got {other:?}"),
}
assert!(raw_get(&cache, &TEST_KEY.to_string()).is_some());
}
#[test]
fn remove_expired_entries_decode_error_carries_raw_bytes() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("sweep-decode-error-bytes")
.disk_directory(tmp_dir.path())
.ttl(Duration::from_secs(1))
.build()
.expect("error building disk cache");
let corrupt: Vec<u8> = vec![0xc1, 0xc1, 0xc1];
raw_insert(&cache, &TEST_KEY.to_string(), corrupt.clone());
match cache.remove_expired_entries() {
Err(RedbCacheError::CacheDeserialization {
cached_value,
source: _,
}) => {
assert_eq!(
cached_value, corrupt,
"cached_value must carry the exact bytes that failed to decode"
);
}
other => panic!("expected CacheDeserialization, got {other:?}"),
}
}
#[test]
fn cache_serialization_error_is_struct_variant() {
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, SerializeFailsAfterDeserialize> = RedbCache::builder()
.name("ser-error-struct-variant")
.disk_directory(tmp_dir.path())
.ttl(Duration::from_secs(10))
.refresh_on_hit(true)
.build()
.expect("error building disk cache");
let fixture = CachedDiskValue::new(SerializeFailsAfterDeserialize { fail: false });
raw_insert(
&cache,
&TEST_KEY.to_string(),
rmp_serde::to_vec(&fixture).expect("error serializing fixture"),
);
match cache.cache_get(&TEST_KEY) {
Err(RedbCacheError::CacheSerialization { source: _ }) => {}
other => panic!("expected CacheSerialization, got {other:?}"),
}
}
#[test]
fn cache_deserialization_error_source_is_wired() {
use std::error::Error;
let tmp_dir = temp_dir!();
let cache: RedbCache<u32, u32> = RedbCache::builder()
.name("deser-source-wired")
.disk_directory(tmp_dir.path())
.build()
.expect("error building disk cache");
raw_insert(&cache, &TEST_KEY.to_string(), vec![0xc1, 0xc1, 0xc1]);
let err = cache
.cache_get(&TEST_KEY)
.expect_err("expected a decode error");
assert!(
err.source().is_some(),
"CacheDeserialization must expose its inner error via source()"
);
}
#[test]
fn build_error_storage_source_is_wired() {
use std::error::Error;
let inner = redb::Error::Io(std::io::Error::other("synthetic redb io error"));
let err = RedbCacheBuildError::Storage { source: inner };
assert!(
err.source().is_some(),
"RedbCacheBuildError::Storage must expose its inner error via source()"
);
}
}