use crate::error::CacheResult;
use bytes::Bytes;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use std::any::Any;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tracing::{debug, info};
#[derive(Debug, Clone)]
struct CacheEntry {
value: Bytes,
expires_at: Instant,
}
impl CacheEntry {
fn new(value: Bytes, ttl: Duration) -> Self {
Self {
value,
expires_at: Instant::now() + ttl,
}
}
fn is_expired(&self) -> bool {
Instant::now() > self.expires_at
}
}
#[derive(Clone)]
pub struct TypedCacheEntry {
pub value: Arc<dyn Any + Send + Sync>,
pub expires_at: Instant,
}
impl TypedCacheEntry {
pub fn new(value: Arc<dyn Any + Send + Sync>, ttl: Duration) -> Self {
Self {
value,
expires_at: Instant::now() + ttl,
}
}
#[must_use]
pub fn is_expired(&self) -> bool {
Instant::now() > self.expires_at
}
}
#[derive(Debug, Clone, Copy)]
pub struct MokaCacheConfig {
pub max_capacity: u64,
pub time_to_live: Duration,
pub time_to_idle: Duration,
}
impl Default for MokaCacheConfig {
fn default() -> Self {
Self {
max_capacity: 5000,
time_to_live: Duration::from_secs(3600),
time_to_idle: Duration::from_secs(120),
}
}
}
pub struct MokaCache {
cache: Cache<String, CacheEntry>,
typed_cache: Cache<String, TypedCacheEntry>,
hits: Arc<AtomicU64>,
misses: Arc<AtomicU64>,
sets: Arc<AtomicU64>,
#[allow(dead_code)]
coalesced_requests: Arc<AtomicU64>,
}
impl MokaCache {
pub fn new(config: MokaCacheConfig) -> CacheResult<Self> {
info!("Initializing Moka Cache");
let cache = Cache::builder()
.max_capacity(config.max_capacity)
.time_to_live(config.time_to_live)
.time_to_idle(config.time_to_idle)
.build();
let typed_cache = Cache::builder()
.max_capacity(config.max_capacity)
.time_to_live(config.time_to_live)
.time_to_idle(config.time_to_idle)
.build();
info!(
capacity = config.max_capacity,
"Moka Cache initialized with Byte and Typed storage"
);
Ok(Self {
cache,
typed_cache,
hits: Arc::new(AtomicU64::new(0)),
misses: Arc::new(AtomicU64::new(0)),
sets: Arc::new(AtomicU64::new(0)),
coalesced_requests: Arc::new(AtomicU64::new(0)),
})
}
pub async fn set_typed(
&self,
key: &str,
value: Arc<dyn Any + Send + Sync>,
ttl: Duration,
) -> CacheResult<()> {
let entry = TypedCacheEntry::new(value, ttl);
self.typed_cache.insert(key.to_string(), entry).await;
self.sets.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub async fn get_typed(&self, key: &str) -> Option<Arc<dyn Any + Send + Sync>> {
match self.typed_cache.get(key).await {
Some(entry) => {
if entry.is_expired() {
let _ = self.typed_cache.remove(key).await;
self.misses.fetch_add(1, Ordering::Relaxed);
None
} else {
self.hits.fetch_add(1, Ordering::Relaxed);
Some(entry.value)
}
}
_ => None,
}
}
}
use crate::traits::{CacheBackend, L2CacheBackend};
impl CacheBackend for MokaCache {
fn get<'a>(&'a self, key: &'a str) -> BoxFuture<'a, Option<Bytes>> {
Box::pin(async move {
if let Some(entry) = self.cache.get(key).await {
if entry.is_expired() {
let _ = self.cache.remove(key).await;
self.misses.fetch_add(1, Ordering::Relaxed);
None
} else {
self.hits.fetch_add(1, Ordering::Relaxed);
Some(entry.value)
}
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
})
}
fn set_with_ttl<'a>(
&'a self,
key: &'a str,
value: Bytes,
ttl: Duration,
) -> BoxFuture<'a, CacheResult<()>> {
Box::pin(async move {
let entry = CacheEntry::new(value, ttl);
self.cache.insert(key.to_string(), entry).await;
self.sets.fetch_add(1, Ordering::Relaxed);
debug!(key = %key, ttl_secs = %ttl.as_secs(), "[Moka] Cached key bytes with TTL");
Ok(())
})
}
fn remove<'a>(&'a self, key: &'a str) -> BoxFuture<'a, CacheResult<()>> {
Box::pin(async move {
self.cache.invalidate(key).await;
self.typed_cache.invalidate(key).await;
Ok(())
})
}
fn health_check(&self) -> BoxFuture<'_, bool> {
Box::pin(async move {
let test_key = "health_check_moka";
let test_value = Bytes::from("health_check");
match self
.set_with_ttl(test_key, test_value.clone(), Duration::from_secs(60))
.await
{
Ok(()) => match self.get(test_key).await {
Some(retrieved) => {
let _ = self.remove(test_key).await;
retrieved == test_value
}
None => false,
},
Err(_) => false,
}
})
}
fn remove_pattern<'a>(&'a self, pattern: &'a str) -> BoxFuture<'a, CacheResult<()>> {
Box::pin(async move {
self.cache.invalidate_all();
self.typed_cache.invalidate_all();
self.cache.run_pending_tasks().await;
self.typed_cache.run_pending_tasks().await;
debug!(pattern = %pattern, "[Moka] Invalidated all entries due to pattern '{}' request", pattern);
Ok(())
})
}
fn name(&self) -> &'static str {
"Moka"
}
}
impl L2CacheBackend for MokaCache {
fn get_with_ttl<'a>(
&'a self,
key: &'a str,
) -> BoxFuture<'a, Option<(Bytes, Option<Duration>)>> {
Box::pin(async move {
if let Some(entry) = self.cache.get(key).await {
if entry.is_expired() {
let _ = self.cache.remove(key).await;
self.misses.fetch_add(1, Ordering::Relaxed);
None
} else {
self.hits.fetch_add(1, Ordering::Relaxed);
let now = Instant::now();
let remaining = if entry.expires_at > now {
Some(entry.expires_at.duration_since(now))
} else {
None
};
Some((entry.value, remaining))
}
} else {
self.misses.fetch_add(1, Ordering::Relaxed);
None
}
})
}
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct CacheStats {
pub hits: u64,
pub misses: u64,
pub sets: u64,
pub coalesced_requests: u64,
pub size: u64,
}