nestrs-cache-manager 0.1.0

Rust port of @nestjs/cache-manager backed by moka.
Documentation
//! Port map for upstream `lib/cache.providers.ts`.

use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use moka::future::Cache as MokaCache;
use serde::Serialize;
use serde_json::Value;
use thiserror::Error;

use crate::cache_constants::CACHE_MANAGER;
use crate::cache_module::{Provider, ProviderKind};
use crate::cache_module_definition::MODULE_OPTIONS_TOKEN;
use crate::interfaces::CacheManagerOptions;

pub type CacheValue = Value;

#[derive(Debug, Error)]
pub enum CacheManagerError {
    #[error("cache store error: {0}")]
    Store(String),
    #[error("failed to serialize cache value: {0}")]
    Serialize(#[from] serde_json::Error),
}

#[async_trait]
pub trait KeyvStoreAdapter: Send + Sync {
    async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError>;
    async fn set(
        &self,
        key: &str,
        value: CacheValue,
        ttl: Option<Duration>,
    ) -> Result<(), CacheManagerError>;
    async fn del(&self, key: &str) -> Result<bool, CacheManagerError>;
    async fn reset(&self) -> Result<(), CacheManagerError>;
    async fn disconnect(&self) -> Result<(), CacheManagerError> {
        Ok(())
    }
}

#[derive(Clone)]
pub struct Keyv {
    store: Arc<dyn KeyvStoreAdapter>,
    ttl: Option<Duration>,
    namespace: Option<String>,
}

impl Keyv {
    pub fn new(
        store: Arc<dyn KeyvStoreAdapter>,
        ttl: Option<Duration>,
        namespace: Option<String>,
    ) -> Self {
        Self {
            store,
            ttl,
            namespace,
        }
    }

    fn key(&self, key: &str) -> String {
        match &self.namespace {
            Some(namespace) => format!("{namespace}:{key}"),
            None => key.to_string(),
        }
    }
}

#[async_trait]
impl KeyvStoreAdapter for Keyv {
    async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
        self.store.get(&self.key(key)).await
    }

    async fn set(
        &self,
        key: &str,
        value: CacheValue,
        ttl: Option<Duration>,
    ) -> Result<(), CacheManagerError> {
        self.store
            .set(&self.key(key), value, ttl.or(self.ttl))
            .await
    }

    async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
        self.store.del(&self.key(key)).await
    }

    async fn reset(&self) -> Result<(), CacheManagerError> {
        self.store.reset().await
    }

    async fn disconnect(&self) -> Result<(), CacheManagerError> {
        self.store.disconnect().await
    }
}

#[derive(Clone)]
struct CacheEntry {
    value: CacheValue,
    expires_at: Option<Instant>,
}

impl CacheEntry {
    fn new(value: CacheValue, ttl: Option<Duration>) -> Self {
        Self {
            value,
            expires_at: ttl.map(|ttl| Instant::now() + ttl),
        }
    }

    fn is_expired(&self) -> bool {
        self.expires_at
            .map(|expires_at| Instant::now() >= expires_at)
            .unwrap_or(false)
    }
}

#[derive(Clone)]
pub struct CacheManager {
    cache: MokaCache<String, CacheEntry>,
    options: CacheManagerOptions,
    stores: Vec<Arc<dyn KeyvStoreAdapter>>,
}

impl CacheManager {
    pub fn new(options: CacheManagerOptions) -> Self {
        Self {
            cache: MokaCache::builder().build(),
            options,
            stores: Vec::new(),
        }
    }

    pub fn with_stores(
        options: CacheManagerOptions,
        stores: Vec<Arc<dyn KeyvStoreAdapter>>,
    ) -> Self {
        let mut manager = Self::new(options);
        manager.stores = stores;
        manager
    }

    pub async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
        if let Some(entry) = self.cache.get(key).await {
            if entry.is_expired() {
                self.cache.invalidate(key).await;
            } else {
                return Ok(Some(entry.value));
            }
        }

        for store in &self.stores {
            if let Some(value) = store.get(key).await? {
                self.cache
                    .insert(
                        key.to_string(),
                        CacheEntry::new(value.clone(), self.options.ttl_duration()),
                    )
                    .await;
                return Ok(Some(value));
            }
        }
        Ok(None)
    }

    pub async fn set<T: Serialize + Send + Sync>(
        &self,
        key: &str,
        value: T,
        ttl: Option<u64>,
    ) -> Result<(), CacheManagerError> {
        let value = serde_json::to_value(value)?;
        self.set_value(key, value, ttl.map(Duration::from_millis))
            .await
    }

    pub async fn set_value(
        &self,
        key: &str,
        value: CacheValue,
        ttl: Option<Duration>,
    ) -> Result<(), CacheManagerError> {
        match ttl.or_else(|| self.options.ttl_duration()) {
            Some(ttl) => {
                self.cache
                    .insert(key.to_string(), CacheEntry::new(value.clone(), Some(ttl)))
                    .await
            }
            None => {
                self.cache
                    .insert(key.to_string(), CacheEntry::new(value.clone(), None))
                    .await
            }
        }

        for store in &self.stores {
            store.set(key, value.clone(), ttl).await?;
        }
        Ok(())
    }

    pub async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
        self.cache.invalidate(key).await;
        let mut deleted = true;
        for store in &self.stores {
            deleted &= store.del(key).await?;
        }
        Ok(deleted)
    }

    pub async fn reset(&self) -> Result<(), CacheManagerError> {
        self.cache.invalidate_all();
        for store in &self.stores {
            store.reset().await?;
        }
        Ok(())
    }

    pub async fn onModuleDestroy(&self) -> Result<(), CacheManagerError> {
        for store in &self.stores {
            store.disconnect().await?;
        }
        Ok(())
    }

    pub fn options(&self) -> &CacheManagerOptions {
        &self.options
    }
}

pub fn isCacheable(store: &dyn KeyvStoreAdapter) -> bool {
    let _ = store;
    true
}

pub fn createCacheManager() -> Provider {
    Provider {
        provide: CACHE_MANAGER.to_string(),
        kind: ProviderKind::Factory,
        use_value: None,
        inject: vec![MODULE_OPTIONS_TOKEN.to_string()],
        use_existing: None,
    }
}

pub fn create_cache(options: CacheManagerOptions) -> CacheManager {
    CacheManager::new(options)
}