use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use moka::future::Cache as MokaCache;
use serde::Serialize;
use serde_json::Value;
use thiserror::Error;
use crate::cache_constants::CACHE_MANAGER;
use crate::cache_module::{Provider, ProviderKind};
use crate::cache_module_definition::MODULE_OPTIONS_TOKEN;
use crate::interfaces::CacheManagerOptions;
pub type CacheValue = Value;
#[derive(Debug, Error)]
pub enum CacheManagerError {
#[error("cache store error: {0}")]
Store(String),
#[error("failed to serialize cache value: {0}")]
Serialize(#[from] serde_json::Error),
}
#[async_trait]
pub trait KeyvStoreAdapter: Send + Sync {
async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError>;
async fn set(
&self,
key: &str,
value: CacheValue,
ttl: Option<Duration>,
) -> Result<(), CacheManagerError>;
async fn del(&self, key: &str) -> Result<bool, CacheManagerError>;
async fn reset(&self) -> Result<(), CacheManagerError>;
async fn disconnect(&self) -> Result<(), CacheManagerError> {
Ok(())
}
}
#[derive(Clone)]
pub struct Keyv {
store: Arc<dyn KeyvStoreAdapter>,
ttl: Option<Duration>,
namespace: Option<String>,
}
impl Keyv {
pub fn new(
store: Arc<dyn KeyvStoreAdapter>,
ttl: Option<Duration>,
namespace: Option<String>,
) -> Self {
Self {
store,
ttl,
namespace,
}
}
fn key(&self, key: &str) -> String {
match &self.namespace {
Some(namespace) => format!("{namespace}:{key}"),
None => key.to_string(),
}
}
}
#[async_trait]
impl KeyvStoreAdapter for Keyv {
async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
self.store.get(&self.key(key)).await
}
async fn set(
&self,
key: &str,
value: CacheValue,
ttl: Option<Duration>,
) -> Result<(), CacheManagerError> {
self.store
.set(&self.key(key), value, ttl.or(self.ttl))
.await
}
async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
self.store.del(&self.key(key)).await
}
async fn reset(&self) -> Result<(), CacheManagerError> {
self.store.reset().await
}
async fn disconnect(&self) -> Result<(), CacheManagerError> {
self.store.disconnect().await
}
}
#[derive(Clone)]
struct CacheEntry {
value: CacheValue,
expires_at: Option<Instant>,
}
impl CacheEntry {
fn new(value: CacheValue, ttl: Option<Duration>) -> Self {
Self {
value,
expires_at: ttl.map(|ttl| Instant::now() + ttl),
}
}
fn is_expired(&self) -> bool {
self.expires_at
.map(|expires_at| Instant::now() >= expires_at)
.unwrap_or(false)
}
}
#[derive(Clone)]
pub struct CacheManager {
cache: MokaCache<String, CacheEntry>,
options: CacheManagerOptions,
stores: Vec<Arc<dyn KeyvStoreAdapter>>,
}
impl CacheManager {
pub fn new(options: CacheManagerOptions) -> Self {
Self {
cache: MokaCache::builder().build(),
options,
stores: Vec::new(),
}
}
pub fn with_stores(
options: CacheManagerOptions,
stores: Vec<Arc<dyn KeyvStoreAdapter>>,
) -> Self {
let mut manager = Self::new(options);
manager.stores = stores;
manager
}
pub async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
if let Some(entry) = self.cache.get(key).await {
if entry.is_expired() {
self.cache.invalidate(key).await;
} else {
return Ok(Some(entry.value));
}
}
for store in &self.stores {
if let Some(value) = store.get(key).await? {
self.cache
.insert(
key.to_string(),
CacheEntry::new(value.clone(), self.options.ttl_duration()),
)
.await;
return Ok(Some(value));
}
}
Ok(None)
}
pub async fn set<T: Serialize + Send + Sync>(
&self,
key: &str,
value: T,
ttl: Option<u64>,
) -> Result<(), CacheManagerError> {
let value = serde_json::to_value(value)?;
self.set_value(key, value, ttl.map(Duration::from_millis))
.await
}
pub async fn set_value(
&self,
key: &str,
value: CacheValue,
ttl: Option<Duration>,
) -> Result<(), CacheManagerError> {
match ttl.or_else(|| self.options.ttl_duration()) {
Some(ttl) => {
self.cache
.insert(key.to_string(), CacheEntry::new(value.clone(), Some(ttl)))
.await
}
None => {
self.cache
.insert(key.to_string(), CacheEntry::new(value.clone(), None))
.await
}
}
for store in &self.stores {
store.set(key, value.clone(), ttl).await?;
}
Ok(())
}
pub async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
self.cache.invalidate(key).await;
let mut deleted = true;
for store in &self.stores {
deleted &= store.del(key).await?;
}
Ok(deleted)
}
pub async fn reset(&self) -> Result<(), CacheManagerError> {
self.cache.invalidate_all();
for store in &self.stores {
store.reset().await?;
}
Ok(())
}
pub async fn onModuleDestroy(&self) -> Result<(), CacheManagerError> {
for store in &self.stores {
store.disconnect().await?;
}
Ok(())
}
pub fn options(&self) -> &CacheManagerOptions {
&self.options
}
}
pub fn isCacheable(store: &dyn KeyvStoreAdapter) -> bool {
let _ = store;
true
}
pub fn createCacheManager() -> Provider {
Provider {
provide: CACHE_MANAGER.to_string(),
kind: ProviderKind::Factory,
use_value: None,
inject: vec![MODULE_OPTIONS_TOKEN.to_string()],
use_existing: None,
}
}
pub fn create_cache(options: CacheManagerOptions) -> CacheManager {
CacheManager::new(options)
}