1use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6use async_trait::async_trait;
7use moka::future::Cache as MokaCache;
8use serde::Serialize;
9use serde_json::Value;
10use thiserror::Error;
11
12use crate::cache_constants::CACHE_MANAGER;
13use crate::cache_module::{Provider, ProviderKind};
14use crate::cache_module_definition::MODULE_OPTIONS_TOKEN;
15use crate::interfaces::CacheManagerOptions;
16
17pub type CacheValue = Value;
18
19#[derive(Debug, Error)]
20pub enum CacheManagerError {
21 #[error("cache store error: {0}")]
22 Store(String),
23 #[error("failed to serialize cache value: {0}")]
24 Serialize(#[from] serde_json::Error),
25}
26
27#[async_trait]
28pub trait KeyvStoreAdapter: Send + Sync {
29 async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError>;
30 async fn set(
31 &self,
32 key: &str,
33 value: CacheValue,
34 ttl: Option<Duration>,
35 ) -> Result<(), CacheManagerError>;
36 async fn del(&self, key: &str) -> Result<bool, CacheManagerError>;
37 async fn reset(&self) -> Result<(), CacheManagerError>;
38 async fn disconnect(&self) -> Result<(), CacheManagerError> {
39 Ok(())
40 }
41}
42
43#[derive(Clone)]
44pub struct Keyv {
45 store: Arc<dyn KeyvStoreAdapter>,
46 ttl: Option<Duration>,
47 namespace: Option<String>,
48}
49
50impl Keyv {
51 pub fn new(
52 store: Arc<dyn KeyvStoreAdapter>,
53 ttl: Option<Duration>,
54 namespace: Option<String>,
55 ) -> Self {
56 Self {
57 store,
58 ttl,
59 namespace,
60 }
61 }
62
63 fn key(&self, key: &str) -> String {
64 match &self.namespace {
65 Some(namespace) => format!("{namespace}:{key}"),
66 None => key.to_string(),
67 }
68 }
69}
70
71#[async_trait]
72impl KeyvStoreAdapter for Keyv {
73 async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
74 self.store.get(&self.key(key)).await
75 }
76
77 async fn set(
78 &self,
79 key: &str,
80 value: CacheValue,
81 ttl: Option<Duration>,
82 ) -> Result<(), CacheManagerError> {
83 self.store
84 .set(&self.key(key), value, ttl.or(self.ttl))
85 .await
86 }
87
88 async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
89 self.store.del(&self.key(key)).await
90 }
91
92 async fn reset(&self) -> Result<(), CacheManagerError> {
93 self.store.reset().await
94 }
95
96 async fn disconnect(&self) -> Result<(), CacheManagerError> {
97 self.store.disconnect().await
98 }
99}
100
101#[derive(Clone)]
102struct CacheEntry {
103 value: CacheValue,
104 expires_at: Option<Instant>,
105}
106
107impl CacheEntry {
108 fn new(value: CacheValue, ttl: Option<Duration>) -> Self {
109 Self {
110 value,
111 expires_at: ttl.map(|ttl| Instant::now() + ttl),
112 }
113 }
114
115 fn is_expired(&self) -> bool {
116 self.expires_at
117 .map(|expires_at| Instant::now() >= expires_at)
118 .unwrap_or(false)
119 }
120}
121
122#[derive(Clone)]
123pub struct CacheManager {
124 cache: MokaCache<String, CacheEntry>,
125 options: CacheManagerOptions,
126 stores: Vec<Arc<dyn KeyvStoreAdapter>>,
127}
128
129impl CacheManager {
130 pub fn new(options: CacheManagerOptions) -> Self {
131 Self {
132 cache: MokaCache::builder().build(),
133 options,
134 stores: Vec::new(),
135 }
136 }
137
138 pub fn with_stores(
139 options: CacheManagerOptions,
140 stores: Vec<Arc<dyn KeyvStoreAdapter>>,
141 ) -> Self {
142 let mut manager = Self::new(options);
143 manager.stores = stores;
144 manager
145 }
146
147 pub async fn get(&self, key: &str) -> Result<Option<CacheValue>, CacheManagerError> {
148 if let Some(entry) = self.cache.get(key).await {
149 if entry.is_expired() {
150 self.cache.invalidate(key).await;
151 } else {
152 return Ok(Some(entry.value));
153 }
154 }
155
156 for store in &self.stores {
157 if let Some(value) = store.get(key).await? {
158 self.cache
159 .insert(
160 key.to_string(),
161 CacheEntry::new(value.clone(), self.options.ttl_duration()),
162 )
163 .await;
164 return Ok(Some(value));
165 }
166 }
167 Ok(None)
168 }
169
170 pub async fn set<T: Serialize + Send + Sync>(
171 &self,
172 key: &str,
173 value: T,
174 ttl: Option<u64>,
175 ) -> Result<(), CacheManagerError> {
176 let value = serde_json::to_value(value)?;
177 self.set_value(key, value, ttl.map(Duration::from_millis))
178 .await
179 }
180
181 pub async fn set_value(
182 &self,
183 key: &str,
184 value: CacheValue,
185 ttl: Option<Duration>,
186 ) -> Result<(), CacheManagerError> {
187 match ttl.or_else(|| self.options.ttl_duration()) {
188 Some(ttl) => {
189 self.cache
190 .insert(key.to_string(), CacheEntry::new(value.clone(), Some(ttl)))
191 .await
192 }
193 None => {
194 self.cache
195 .insert(key.to_string(), CacheEntry::new(value.clone(), None))
196 .await
197 }
198 }
199
200 for store in &self.stores {
201 store.set(key, value.clone(), ttl).await?;
202 }
203 Ok(())
204 }
205
206 pub async fn del(&self, key: &str) -> Result<bool, CacheManagerError> {
207 self.cache.invalidate(key).await;
208 let mut deleted = true;
209 for store in &self.stores {
210 deleted &= store.del(key).await?;
211 }
212 Ok(deleted)
213 }
214
215 pub async fn reset(&self) -> Result<(), CacheManagerError> {
216 self.cache.invalidate_all();
217 for store in &self.stores {
218 store.reset().await?;
219 }
220 Ok(())
221 }
222
223 pub async fn onModuleDestroy(&self) -> Result<(), CacheManagerError> {
224 for store in &self.stores {
225 store.disconnect().await?;
226 }
227 Ok(())
228 }
229
230 pub fn options(&self) -> &CacheManagerOptions {
231 &self.options
232 }
233}
234
235pub fn isCacheable(store: &dyn KeyvStoreAdapter) -> bool {
236 let _ = store;
237 true
238}
239
240pub fn createCacheManager() -> Provider {
241 Provider {
242 provide: CACHE_MANAGER.to_string(),
243 kind: ProviderKind::Factory,
244 use_value: None,
245 inject: vec![MODULE_OPTIONS_TOKEN.to_string()],
246 use_existing: None,
247 }
248}
249
250pub fn create_cache(options: CacheManagerOptions) -> CacheManager {
251 CacheManager::new(options)
252}