use crate::backend::interface::{BackendKind, CacheConnector, CacheReader, CacheWriter};
use crate::backend::score::{BackendScore, Scores};
use crate::error::Result;
use crate::impl_backend_builder;
use async_trait::async_trait;
use moka::ops::compute::{CompResult, Op};
use moka::Expiry;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct MokaEntry {
pub value: Vec<u8>,
pub expires_at: Option<Instant>,
}
#[derive(Default, Clone)]
pub struct MokaExpiry;
impl Expiry<String, MokaEntry> for MokaExpiry {
fn expire_after_create(&self, _key: &String, val: &MokaEntry, created_at: Instant) -> Option<Duration> {
val.expires_at.map(|e| e.saturating_duration_since(created_at))
}
fn expire_after_update(
&self,
_key: &String,
val: &MokaEntry,
updated_at: Instant,
_duration_until_expiry: Option<Duration>,
) -> Option<Duration> {
val.expires_at.map(|e| e.saturating_duration_since(updated_at))
}
}
#[derive(Clone)]
pub struct MokaMemoryBackend {
cache: Arc<moka::future::Cache<String, MokaEntry>>,
capacity: u64,
}
impl_backend_builder!(MokaMemoryBackend, MokaMemoryBackendBuilder);
impl MokaMemoryBackend {
pub fn capacity(&self) -> u64 {
self.capacity
}
pub fn entry_count(&self) -> u64 {
self.cache.entry_count()
}
}
impl Default for MokaMemoryBackend {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for MokaMemoryBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MokaMemoryBackend")
.field("capacity", &self.capacity)
.field("entry_count", &self.cache.entry_count())
.finish()
}
}
#[async_trait]
impl CacheReader for MokaMemoryBackend {
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
Ok(self.cache.get(key).await.map(|e| e.value))
}
async fn exists(&self, key: &str) -> Result<bool> {
Ok(self.cache.contains_key(key))
}
async fn ttl(&self, key: &str) -> Result<Option<Duration>> {
let now = Instant::now();
Ok(self
.cache
.get(key)
.await
.and_then(|e| e.expires_at.and_then(|exp| exp.checked_duration_since(now))))
}
async fn len(&self) -> Result<u64> {
Ok(self.cache.entry_count())
}
async fn is_empty(&self) -> Result<bool> {
Ok(self.cache.entry_count() == 0)
}
async fn capacity(&self) -> Result<u64> {
Ok(self.capacity)
}
async fn stats(&self) -> Result<HashMap<String, String>> {
let mut stats = HashMap::new();
stats.insert("type".to_string(), "moka".to_string());
stats.insert("capacity".to_string(), self.capacity.to_string());
stats.insert("entry_count".to_string(), self.cache.entry_count().to_string());
Ok(stats)
}
}
#[async_trait]
impl CacheWriter for MokaMemoryBackend {
async fn set(&self, key: &str, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
let expires_at = ttl.map(|d| Instant::now() + d);
let entry = MokaEntry { value, expires_at };
self.cache.insert(key.to_string(), entry).await;
Ok(())
}
async fn delete(&self, key: &str) -> Result<()> {
self.cache.invalidate(key).await;
Ok(())
}
async fn clear(&self) -> Result<()> {
self.cache.invalidate_all();
Ok(())
}
async fn expire(&self, key: &str, ttl: Duration) -> Result<bool> {
let new_expires_at = Instant::now() + ttl;
let result = self
.cache
.entry(key.to_string())
.and_compute_with(|maybe_entry: Option<moka::Entry<String, MokaEntry>>| async move {
match maybe_entry {
Some(entry) => {
let mut old = entry.into_value();
old.expires_at = Some(new_expires_at);
Op::Put(old)
}
None => Op::Nop,
}
})
.await;
match result {
CompResult::ReplacedWith(_) => Ok(true),
_ => Ok(false),
}
}
}
#[async_trait]
impl CacheConnector for MokaMemoryBackend {
async fn health_check(&self) -> Result<()> {
Ok(())
}
async fn shutdown(&self) {
self.cache.invalidate_all();
}
fn backend_kind(&self) -> BackendKind {
BackendKind::Moka
}
}
fn sync_block_on<F: std::future::Future>(fut: F) -> F::Output {
match tokio::runtime::Handle::try_current() {
Ok(handle) if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| handle.block_on(fut))
}
_ => {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
let rt = RT.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("failed to build tokio runtime for sync moka ops")
});
rt.block_on(fut)
}
}
}
impl crate::backend::interface::SyncCacheReader for MokaMemoryBackend {
fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
Ok(sync_block_on(self.cache.get(key)).map(|e| e.value))
}
fn exists(&self, key: &str) -> Result<bool> {
Ok(self.cache.contains_key(key))
}
fn ttl(&self, key: &str) -> Result<Option<Duration>> {
let now = Instant::now();
Ok(sync_block_on(self.cache.get(key))
.and_then(|e| e.expires_at.and_then(|exp| exp.checked_duration_since(now))))
}
fn len(&self) -> Result<u64> {
Ok(self.cache.entry_count())
}
fn capacity(&self) -> Result<u64> {
Ok(self.capacity)
}
fn stats(&self) -> Result<HashMap<String, String>> {
let mut stats = HashMap::new();
stats.insert("type".to_string(), "moka".to_string());
stats.insert("capacity".to_string(), self.capacity.to_string());
stats.insert("entry_count".to_string(), self.cache.entry_count().to_string());
Ok(stats)
}
}
impl crate::backend::interface::SyncCacheWriter for MokaMemoryBackend {
fn set(&self, key: &str, value: Vec<u8>, ttl: Option<Duration>) -> Result<()> {
let expires_at = ttl.map(|d| Instant::now() + d);
let entry = MokaEntry { value, expires_at };
sync_block_on(self.cache.insert(key.to_string(), entry));
Ok(())
}
fn delete(&self, key: &str) -> Result<()> {
sync_block_on(self.cache.invalidate(key));
Ok(())
}
fn clear(&self) -> Result<()> {
self.cache.invalidate_all();
Ok(())
}
fn expire(&self, key: &str, ttl: Duration) -> Result<bool> {
let new_expires_at = Instant::now() + ttl;
let result = sync_block_on(self.cache.entry(key.to_string()).and_compute_with(
|maybe_entry: Option<moka::Entry<String, MokaEntry>>| async move {
match maybe_entry {
Some(entry) => {
let mut old = entry.into_value();
old.expires_at = Some(new_expires_at);
Op::Put(old)
}
None => Op::Nop,
}
},
));
match result {
CompResult::ReplacedWith(_) => Ok(true),
_ => Ok(false),
}
}
}
impl crate::backend::interface::SyncCacheConnector for MokaMemoryBackend {
fn health_check(&self) -> Result<()> {
Ok(())
}
fn shutdown(&self) {
self.cache.invalidate_all();
}
fn backend_kind(&self) -> BackendKind {
BackendKind::Moka
}
}
impl BackendScore for MokaMemoryBackend {
fn score(&self) -> u8 {
Scores::MOKA
}
fn is_persistent(&self) -> bool {
false
}
fn backend_name(&self) -> &'static str {
"moka"
}
}
#[derive(Default)]
pub struct MokaMemoryBackendBuilder {
capacity: u64,
ttl: Option<Duration>,
time_to_idle: Option<Duration>,
}
impl MokaMemoryBackendBuilder {
pub fn capacity(mut self, capacity: u64) -> Self {
self.capacity = capacity;
self
}
pub fn ttl(mut self, ttl: Duration) -> Self {
self.ttl = Some(ttl);
self
}
pub fn time_to_idle(mut self, ttl: Duration) -> Self {
self.time_to_idle = Some(ttl);
self
}
pub fn build(self) -> MokaMemoryBackend {
let capacity = if self.capacity > 0 {
self.capacity
} else {
10_000 };
let mut builder = moka::future::Cache::builder()
.max_capacity(capacity)
.expire_after(MokaExpiry);
if let Some(ttl) = self.ttl {
builder = builder.time_to_live(ttl);
}
if let Some(tti) = self.time_to_idle {
builder = builder.time_to_idle(tti);
}
let cache = Arc::new(builder.build());
MokaMemoryBackend { cache, capacity }
}
}
pub fn moka_memory() -> MokaMemoryBackend {
MokaMemoryBackend::new()
}
pub fn moka_memory_with_capacity(capacity: u64) -> MokaMemoryBackend {
MokaMemoryBackend::builder().capacity(capacity).build()
}
pub fn moka_memory_with_capacity_and_ttl(capacity: u64, ttl: Duration) -> MokaMemoryBackend {
MokaMemoryBackend::builder().capacity(capacity).ttl(ttl).build()
}
pub fn default_memory_backend() -> MokaMemoryBackend {
moka_memory()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_moka_backend_builder() {
let backend = MokaMemoryBackend::builder()
.capacity(1000)
.ttl(Duration::from_secs(3600))
.time_to_idle(Duration::from_secs(1800))
.build();
assert_eq!(backend.capacity(), 1000);
}
#[test]
fn test_moka_backend_default() {
let backend = MokaMemoryBackend::default();
assert!(backend.capacity() > 0);
}
#[tokio::test]
async fn test_moka_basic_operations() {
let backend = MokaMemoryBackend::new();
backend.set("key1", b"value1".to_vec(), None).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let result = backend.get("key1").await.unwrap();
assert_eq!(result, Some(b"value1".to_vec()));
let exists = backend.exists("key1").await.unwrap();
assert!(exists);
backend.delete("key1").await.unwrap();
let exists_after = backend.exists("key1").await.unwrap();
assert!(!exists_after);
}
#[test]
fn test_convenience_functions() {
let backend1 = moka_memory();
let backend2 = moka_memory_with_capacity(1000);
let backend3 = moka_memory_with_capacity_and_ttl(1000, Duration::from_secs(3600));
assert!(backend1.capacity() > 0);
assert_eq!(backend2.capacity(), 1000);
assert_eq!(backend3.capacity(), 1000);
}
#[tokio::test]
async fn test_moka_set_with_ttl_expires_after_timeout() {
let backend = MokaMemoryBackend::new();
backend
.set("k", b"v".to_vec(), Some(Duration::from_millis(50)))
.await
.unwrap();
assert_eq!(backend.get("k").await.unwrap(), Some(b"v".to_vec()));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut expired = false;
for _ in 0..10 {
if backend.get("k").await.unwrap().is_none() {
expired = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(expired, "entry should expire after TTL");
}
#[tokio::test]
async fn test_moka_set_with_ttl_readable_within_window() {
let backend = MokaMemoryBackend::new();
backend
.set("k", b"v".to_vec(), Some(Duration::from_secs(60)))
.await
.unwrap();
assert_eq!(backend.get("k").await.unwrap(), Some(b"v".to_vec()));
}
#[tokio::test]
async fn test_moka_set_without_ttl_uses_global_ttl() {
let backend = MokaMemoryBackend::builder()
.capacity(1000)
.ttl(Duration::from_secs(30))
.build();
backend.set("k", b"v".to_vec(), None).await.unwrap();
assert_eq!(backend.get("k").await.unwrap(), Some(b"v".to_vec()));
let ttl = backend.ttl("k").await.unwrap();
assert_eq!(ttl, None, "set(None) with global TTL should report None per-entry");
}
#[tokio::test]
async fn test_moka_ttl_returns_remaining() {
let backend = MokaMemoryBackend::new();
backend
.set("k", b"v".to_vec(), Some(Duration::from_secs(60)))
.await
.unwrap();
let ttl = backend.ttl("k").await.unwrap().expect("ttl should be Some");
assert!(
ttl > Duration::from_secs(58),
"ttl={} should be > 58s",
ttl.as_secs_f64()
);
assert!(
ttl <= Duration::from_secs(60),
"ttl={} should be <= 60s",
ttl.as_secs_f64()
);
}
#[tokio::test]
async fn test_moka_ttl_returns_none_for_missing_key() {
let backend = MokaMemoryBackend::new();
assert_eq!(backend.ttl("missing").await.unwrap(), None);
}
#[tokio::test]
async fn test_moka_ttl_returns_none_for_no_ttl_key() {
let backend = MokaMemoryBackend::new();
backend.set("k", b"v".to_vec(), None).await.unwrap();
assert_eq!(backend.ttl("k").await.unwrap(), None);
}
#[tokio::test]
async fn test_moka_expire_extends_ttl() {
let backend = MokaMemoryBackend::new();
backend
.set("k", b"v".to_vec(), Some(Duration::from_secs(60)))
.await
.unwrap();
let ok = backend.expire("k", Duration::from_secs(120)).await.unwrap();
assert!(ok, "expire on existing key should return true");
let ttl = backend
.ttl("k")
.await
.unwrap()
.expect("ttl should be Some after expire");
assert!(
ttl > Duration::from_secs(118),
"ttl={} should be > 118s",
ttl.as_secs_f64()
);
}
#[tokio::test]
async fn test_moka_expire_shrinks_ttl() {
let backend = MokaMemoryBackend::new();
backend
.set("k", b"v".to_vec(), Some(Duration::from_secs(60)))
.await
.unwrap();
let ok = backend.expire("k", Duration::from_millis(50)).await.unwrap();
assert!(ok, "expire on existing key should return true");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut expired = false;
for _ in 0..10 {
if backend.get("k").await.unwrap().is_none() {
expired = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(expired, "entry should expire after shrunk TTL");
}
#[tokio::test]
async fn test_moka_expire_missing_key_returns_false() {
let backend = MokaMemoryBackend::new();
let ok = backend.expire("missing", Duration::from_secs(60)).await.unwrap();
assert!(!ok, "expire on missing key should return false");
}
mod sync_tests {
use super::MokaMemoryBackend;
use crate::backend::interface::{BackendKind, SyncCacheConnector, SyncCacheReader, SyncCacheWriter};
use std::time::Duration;
#[test]
fn test_moka_sync_get_set_basic() {
let backend = MokaMemoryBackend::new();
let writer: &dyn SyncCacheWriter = &backend;
writer.set("key1", b"value1".to_vec(), None).unwrap();
let reader: &dyn SyncCacheReader = &backend;
assert_eq!(reader.get("key1").unwrap(), Some(b"value1".to_vec()));
assert!(reader.exists("key1").unwrap());
assert!(!reader.exists("key2").unwrap());
assert!(reader.capacity().unwrap() > 0);
let stats = reader.stats().unwrap();
assert_eq!(stats.get("type"), Some(&"moka".to_string()));
}
#[test]
fn test_moka_sync_set_with_ttl_expires() {
let backend = MokaMemoryBackend::new();
let writer: &dyn SyncCacheWriter = &backend;
writer.set("k", b"v".to_vec(), Some(Duration::from_millis(50))).unwrap();
let reader: &dyn SyncCacheReader = &backend;
assert_eq!(reader.get("k").unwrap(), Some(b"v".to_vec()));
std::thread::sleep(Duration::from_millis(120));
let mut expired = false;
for _ in 0..10 {
if reader.get("k").unwrap().is_none() {
expired = true;
break;
}
std::thread::sleep(Duration::from_millis(50));
}
assert!(expired, "entry should expire after TTL via sync get");
}
#[test]
fn test_moka_sync_ttl_returns_remaining() {
let backend = MokaMemoryBackend::new();
let writer: &dyn SyncCacheWriter = &backend;
writer.set("k", b"v".to_vec(), Some(Duration::from_secs(60))).unwrap();
let reader: &dyn SyncCacheReader = &backend;
let ttl = reader.ttl("k").unwrap().expect("ttl should be Some for TTL'd key");
assert!(
ttl > Duration::from_secs(58),
"ttl={} should be > 58s",
ttl.as_secs_f64()
);
assert!(
ttl <= Duration::from_secs(60),
"ttl={} should be <= 60s",
ttl.as_secs_f64()
);
writer.set("no_ttl", b"v".to_vec(), None).unwrap();
assert_eq!(reader.ttl("no_ttl").unwrap(), None);
assert_eq!(reader.ttl("missing").unwrap(), None);
}
#[test]
fn test_moka_sync_expire_works() {
let backend = MokaMemoryBackend::new();
let writer: &dyn SyncCacheWriter = &backend;
writer.set("k", b"v".to_vec(), Some(Duration::from_secs(60))).unwrap();
let ok = writer.expire("k", Duration::from_secs(120)).unwrap();
assert!(ok, "expire on existing key should return true");
let reader: &dyn SyncCacheReader = &backend;
let new_ttl = reader.ttl("k").unwrap().expect("ttl should be Some after expire");
assert!(
new_ttl > Duration::from_secs(118),
"new_ttl={} should be > 118s",
new_ttl.as_secs_f64()
);
let ok = writer.expire("missing", Duration::from_secs(10)).unwrap();
assert!(!ok, "expire on missing key should return false");
}
#[test]
fn test_moka_sync_delete_clear() {
let backend = MokaMemoryBackend::new();
let writer: &dyn SyncCacheWriter = &backend;
writer.set("k1", b"v1".to_vec(), None).unwrap();
writer.set("k2", b"v2".to_vec(), None).unwrap();
let reader: &dyn SyncCacheReader = &backend;
assert!(reader.exists("k1").unwrap());
assert!(reader.exists("k2").unwrap());
writer.delete("k1").unwrap();
assert!(!reader.exists("k1").unwrap());
assert!(reader.exists("k2").unwrap());
writer.clear().unwrap();
assert!(!reader.exists("k2").unwrap());
assert_eq!(reader.len().unwrap(), 0);
assert!(reader.is_empty().unwrap());
let connector: &dyn SyncCacheConnector = &backend;
connector.health_check().unwrap();
assert_eq!(connector.backend_kind(), BackendKind::Moka);
connector.shutdown();
}
}
}