Skip to main content

cache_manager/
cache_providers.rs

1//! Port map for upstream `lib/cache.providers.ts`.
2
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use async_trait::async_trait;
7use moka::future::Cache as MokaCache;
8use serde::Serialize;
9use serde_json::Value;
10use thiserror::Error;
11
12use crate::cache_constants::CACHE_MANAGER;
13use crate::cache_module::{Provider, ProviderKind};
14use crate::cache_module_definition::MODULE_OPTIONS_TOKEN;
15use crate::interfaces::CacheManagerOptions;
16
17pub type CacheValue = Value;
18
19#[derive(Debug, Error)]
20pub enum CacheManagerError {
21    #[error("cache store error: {0}")]
22    Store(String),
23    #[error("failed to serialize cache value: {0}")]
24    Serialize(#[from] serde_json::Error),
25}
26
27#[async_trait]
28pub trait KeyvStoreAdapter: Send + Sync {
29    async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError>;
30    async fn set(
31        &self,
32        key: &str,
33        value: CacheValue,
34        ttl: Option<Duration>,
35    ) -> Result<(), CacheManagerError>;
36    async fn del(&self, key: &str) -> Result<bool, CacheManagerError>;
37    async fn reset(&self) -> Result<(), CacheManagerError>;
38    async fn disconnect(&self) -> Result<(), CacheManagerError> {
39        Ok(())
40    }
41}
42
43#[derive(Clone)]
44pub struct Keyv {
45    store: Arc<dyn KeyvStoreAdapter>,
46    ttl: Option<Duration>,
47    namespace: Option<String>,
48}
49
50impl Keyv {
51    pub fn new(
52        store: Arc<dyn KeyvStoreAdapter>,
53        ttl: Option<Duration>,
54        namespace: Option<String>,
55    ) -> Self {
56        Self {
57            store,
58            ttl,
59            namespace,
60        }
61    }
62
63    fn key(&self, key: &str) -> String {
64        match &self.namespace {
65            Some(namespace) => format!("{namespace}:{key}"),
66            None => key.to_string(),
67        }
68    }
69}
70
71#[async_trait]
72impl KeyvStoreAdapter for Keyv {
73    async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
74        self.store.get(&self.key(key)).await
75    }
76
77    async fn set(
78        &self,
79        key: &str,
80        value: CacheValue,
81        ttl: Option<Duration>,
82    ) -> Result<(), CacheManagerError> {
83        self.store
84            .set(&self.key(key), value, ttl.or(self.ttl))
85            .await
86    }
87
88    async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
89        self.store.del(&self.key(key)).await
90    }
91
92    async fn reset(&self) -> Result<(), CacheManagerError> {
93        self.store.reset().await
94    }
95
96    async fn disconnect(&self) -> Result<(), CacheManagerError> {
97        self.store.disconnect().await
98    }
99}
100
101#[derive(Clone)]
102struct CacheEntry {
103    value: CacheValue,
104    expires_at: Option<Instant>,
105}
106
107impl CacheEntry {
108    fn new(value: CacheValue, ttl: Option<Duration>) -> Self {
109        Self {
110            value,
111            expires_at: ttl.map(|ttl| Instant::now() + ttl),
112        }
113    }
114
115    fn is_expired(&self) -> bool {
116        self.expires_at
117            .map(|expires_at| Instant::now() >= expires_at)
118            .unwrap_or(false)
119    }
120}
121
122#[derive(Clone)]
123pub struct CacheManager {
124    cache: MokaCache<String, CacheEntry>,
125    options: CacheManagerOptions,
126    stores: Vec<Arc<dyn KeyvStoreAdapter>>,
127}
128
129impl CacheManager {
130    pub fn new(options: CacheManagerOptions) -> Self {
131        Self {
132            cache: MokaCache::builder().build(),
133            options,
134            stores: Vec::new(),
135        }
136    }
137
138    pub fn with_stores(
139        options: CacheManagerOptions,
140        stores: Vec<Arc<dyn KeyvStoreAdapter>>,
141    ) -> Self {
142        let mut manager = Self::new(options);
143        manager.stores = stores;
144        manager
145    }
146
147    pub async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
148        if let Some(entry) = self.cache.get(key).await {
149            if entry.is_expired() {
150                self.cache.invalidate(key).await;
151            } else {
152                return Ok(Some(entry.value));
153            }
154        }
155
156        for store in &self.stores {
157            if let Some(value) = store.get(key).await? {
158                self.cache
159                    .insert(
160                        key.to_string(),
161                        CacheEntry::new(value.clone(), self.options.ttl_duration()),
162                    )
163                    .await;
164                return Ok(Some(value));
165            }
166        }
167        Ok(None)
168    }
169
170    pub async fn set<T: Serialize + Send + Sync>(
171        &self,
172        key: &str,
173        value: T,
174        ttl: Option<u64>,
175    ) -> Result<(), CacheManagerError> {
176        let value = serde_json::to_value(value)?;
177        self.set_value(key, value, ttl.map(Duration::from_millis))
178            .await
179    }
180
181    pub async fn set_value(
182        &self,
183        key: &str,
184        value: CacheValue,
185        ttl: Option<Duration>,
186    ) -> Result<(), CacheManagerError> {
187        match ttl.or_else(|| self.options.ttl_duration()) {
188            Some(ttl) => {
189                self.cache
190                    .insert(key.to_string(), CacheEntry::new(value.clone(), Some(ttl)))
191                    .await
192            }
193            None => {
194                self.cache
195                    .insert(key.to_string(), CacheEntry::new(value.clone(), None))
196                    .await
197            }
198        }
199
200        for store in &self.stores {
201            store.set(key, value.clone(), ttl).await?;
202        }
203        Ok(())
204    }
205
206    pub async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
207        self.cache.invalidate(key).await;
208        let mut deleted = true;
209        for store in &self.stores {
210            deleted &= store.del(key).await?;
211        }
212        Ok(deleted)
213    }
214
215    pub async fn reset(&self) -> Result<(), CacheManagerError> {
216        self.cache.invalidate_all();
217        for store in &self.stores {
218            store.reset().await?;
219        }
220        Ok(())
221    }
222
223    pub async fn onModuleDestroy(&self) -> Result<(), CacheManagerError> {
224        for store in &self.stores {
225            store.disconnect().await?;
226        }
227        Ok(())
228    }
229
230    pub fn options(&self) -> &CacheManagerOptions {
231        &self.options
232    }
233}
234
235pub fn isCacheable(store: &dyn KeyvStoreAdapter) -> bool {
236    let _ = store;
237    true
238}
239
240pub fn createCacheManager() -> Provider {
241    Provider {
242        provide: CACHE_MANAGER.to_string(),
243        kind: ProviderKind::Factory,
244        use_value: None,
245        inject: vec![MODULE_OPTIONS_TOKEN.to_string()],
246        use_existing: None,
247    }
248}
249
250pub fn create_cache(options: CacheManagerOptions) -> CacheManager {
251    CacheManager::new(options)
252}