1use std::sync::Arc;
4use std::time::{Duration, Instant};
5use std::collections::{HashSet, VecDeque};
6
7use skp_cache_core::{
8 CacheBackend, CacheEntry, CacheKey, CacheMetrics, CacheOperation, CacheOptions,
9 CacheResult, CacheTier, DependencyBackend, JsonSerializer, NoopMetrics, Result, Serializer,
10 TaggableBackend,
11};
12
13mod coalescer;
14use coalescer::Coalescer;
15
16mod read_through;
17pub use read_through::{Loader, ReadThroughCache, CacheManagerReadThroughExt};
18
19mod groups;
20pub use groups::CacheGroup;
21
22#[derive(Debug, Clone)]
24pub struct CacheManagerConfig {
25 pub default_ttl: Option<Duration>,
27 pub namespace: Option<String>,
29 pub ttl_jitter: f64,
31}
32
33impl Default for CacheManagerConfig {
34 fn default() -> Self {
35 Self {
36 default_ttl: Some(Duration::from_secs(300)),
37 namespace: None,
38 ttl_jitter: 0.1, }
40 }
41}
42
43impl CacheManagerConfig {
44 pub fn with_ttl(ttl: Duration) -> Self {
46 Self {
47 default_ttl: Some(ttl),
48 ..Default::default()
49 }
50 }
51
52 pub fn with_namespace(namespace: impl Into<String>) -> Self {
54 Self {
55 namespace: Some(namespace.into()),
56 ..Default::default()
57 }
58 }
59
60 pub fn no_jitter(mut self) -> Self {
62 self.ttl_jitter = 0.0;
63 self
64 }
65}
66
67pub struct CacheManager<B, S = JsonSerializer, M = NoopMetrics>
74where
75 B: CacheBackend + DependencyBackend,
76 S: Serializer,
77 M: CacheMetrics,
78{
79 backend: Arc<B>,
80 serializer: Arc<S>,
81 metrics: Arc<M>,
82 config: CacheManagerConfig,
83 coalescer: Coalescer,
84}
85
86impl<B: CacheBackend + DependencyBackend> CacheManager<B, JsonSerializer, NoopMetrics> {
88 pub fn new(backend: B) -> Self {
90 Self::with_config(backend, CacheManagerConfig::default())
91 }
92
93 pub fn with_config(backend: B, config: CacheManagerConfig) -> Self {
95 Self {
96 backend: Arc::new(backend),
97 serializer: Arc::new(JsonSerializer),
98 metrics: Arc::new(NoopMetrics),
99 config,
100 coalescer: Coalescer::new(),
101 }
102 }
103}
104
105impl<B, S, M> CacheManager<B, S, M>
107where
108 B: CacheBackend + DependencyBackend,
109 S: Serializer,
110 M: CacheMetrics,
111{
112 pub fn with_serializer_and_metrics(
114 backend: B,
115 serializer: S,
116 metrics: M,
117 config: CacheManagerConfig,
118 ) -> Self {
119 Self {
120 backend: Arc::new(backend),
121 serializer: Arc::new(serializer),
122 metrics: Arc::new(metrics),
123 config,
124 coalescer: Coalescer::new(),
125 }
126 }
127
128 pub fn group(&self, namespace: impl Into<String>) -> CacheGroup<'_, B, S, M> {
130 CacheGroup::new(self, namespace.into())
131 }
132
133 fn full_key(&self, key: &str) -> String {
135 match &self.config.namespace {
136 Some(ns) => format!("{}:{}", ns, key),
137 None => key.to_string(),
138 }
139 }
140
141 fn apply_ttl_jitter(&self, ttl: Duration) -> Duration {
143 if self.config.ttl_jitter > 0.0 {
144 let jitter_range = (ttl.as_secs_f64() * self.config.ttl_jitter) as u64;
145 if jitter_range > 0 {
146 let jitter = rand::random::<u64>() % jitter_range;
147 return ttl + Duration::from_secs(jitter);
148 }
149 }
150 ttl
151 }
152
153 pub async fn get<T>(&self, key: impl CacheKey) -> Result<CacheResult<T>>
155 where
156 T: serde::de::DeserializeOwned,
157 {
158 let full_key = self.full_key(&key.full_key());
159 let start = Instant::now();
160
161 let backend = self.backend.clone();
163 let key_clone = full_key.clone();
164
165 let req_result = self.coalescer.do_request(&full_key, move || async move {
166 backend.get(&key_clone).await
167 }).await?;
168
169 let result = match req_result {
170 Some(entry) => {
171 if entry.is_expired() && !entry.is_stale() {
172 self.metrics.record_miss(&full_key);
173 CacheResult::Miss
174 } else if entry.is_stale() {
175 self.metrics.record_stale_hit(&full_key);
176 CacheResult::Stale(self.deserialize_entry(entry)?)
177 } else {
178 self.metrics.record_hit(&full_key, CacheTier::L1Memory);
179 CacheResult::Hit(self.deserialize_entry(entry)?)
180 }
181 }
182 None => {
183 self.metrics.record_miss(&full_key);
184 CacheResult::Miss
185 }
186 };
187
188 self.metrics
189 .record_latency(CacheOperation::Get, start.elapsed());
190 Ok(result)
191 }
192
193 pub async fn set<T>(
195 &self,
196 key: impl CacheKey,
197 value: T,
198 options: impl Into<CacheOptions>,
199 ) -> Result<()>
200 where
201 T: serde::Serialize,
202 {
203 let full_key = self.full_key(&key.full_key());
204 let options = options.into();
205
206 let serialize_start = Instant::now();
208 let serialized = self.serializer.serialize(&value)?;
209 self.metrics
210 .record_latency(CacheOperation::Serialize, serialize_start.elapsed());
211
212 self.set_raw(&full_key, serialized, options).await
213 }
214
215 async fn set_raw(&self, full_key: &str, value: Vec<u8>, mut options: CacheOptions) -> Result<()> {
217 if options.ttl.is_none() {
219 options.ttl = self.config.default_ttl;
220 }
221
222 if let Some(ttl) = options.ttl {
224 options.ttl = Some(self.apply_ttl_jitter(ttl));
225 }
226
227 let dependents = self.backend.get_dependents(full_key).await.unwrap_or_default();
232
233 let set_start = Instant::now();
235 self.backend.set(full_key, value, &options).await?;
236 self.metrics
237 .record_latency(CacheOperation::Set, set_start.elapsed());
238
239 for dep in dependents {
241 let _ = self.invalidate_recursive(&dep).await;
242 }
243
244 Ok(())
245 }
246
247 pub async fn get_or_compute<T, F, Fut>(
249 &self,
250 key: impl CacheKey,
251 computer: F,
252 options: Option<CacheOptions>,
253 ) -> Result<CacheResult<T>>
254 where
255 T: serde::Serialize + serde::de::DeserializeOwned + Send + 'static,
256 F: FnOnce() -> Fut + Send + 'static,
257 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
258 {
259 let full_key = self.full_key(&key.full_key());
260 let backend = self.backend.clone();
261 let key_str = full_key.clone();
262 let opts = options.unwrap_or_default();
263 let manager = self.clone();
264
265 let req_result = self.coalescer.do_request(&full_key, move || async move {
267 if let Some(entry) = backend.get(&key_str).await? {
269 if !entry.is_expired() {
270 return Ok(Some(entry));
271 }
272
273 if entry.is_stale() {
275 let manager_bg = manager.clone();
276 let key_bg = key_str.clone();
277 let opts_bg = opts.clone();
278
279 manager.coalescer.try_spawn_refresh(&key_str, move || async move {
280 if let Ok(val) = computer().await {
281 if let Ok(serialized) = manager_bg.serializer.serialize(&val) {
290 let _ = manager_bg.set_raw(&key_bg, serialized, opts_bg).await;
291 }
292 }
293 });
294
295 return Ok(Some(entry));
296 }
297 }
298
299 let val = computer().await?;
301 let serialized = manager.serializer.serialize(&val)?;
302 let size = serialized.len();
303
304 manager.set_raw(&key_str, serialized.clone(), opts).await?;
306
307 Ok(Some(CacheEntry::new(serialized, size)))
308 }).await?;
309
310 match req_result {
311 Some(entry) => {
312 if entry.is_stale() {
313 Ok(CacheResult::Stale(self.deserialize_entry(entry)?))
314 } else {
315 Ok(CacheResult::Hit(self.deserialize_entry(entry)?))
316 }
317 },
318 None => Err(skp_cache_core::CacheError::Internal("Compute returned None".into()))
319 }
320 }
321
322 pub async fn delete(&self, key: impl CacheKey) -> Result<bool> {
324 let full_key = self.full_key(&key.full_key());
325 let start = Instant::now();
326
327 let result = self.invalidate_recursive(&full_key).await?;
329
330 self.metrics
331 .record_latency(CacheOperation::Delete, start.elapsed());
332 Ok(result.0)
333 }
334
335 pub async fn invalidate(&self, key: impl CacheKey) -> Result<u64> {
339 let full_key = self.full_key(&key.full_key());
340 let start = Instant::now();
341
342 let result = self.invalidate_recursive(&full_key).await?;
343
344 self.metrics
345 .record_latency(CacheOperation::Invalidate, start.elapsed());
346 Ok(result.1)
347 }
348
349 async fn invalidate_recursive(&self, key: &str) -> Result<(bool, u64)> {
352 let mut queue = VecDeque::new();
353 queue.push_back(key.to_string());
354 let mut visited = HashSet::new();
355 visited.insert(key.to_string());
356
357 let mut initial_deleted = false;
358 let mut first = true;
359 let mut count = 0u64;
360
361 while let Some(k) = queue.pop_front() {
362 if let Ok(deps) = self.backend.get_dependents(&k).await {
364 for dep in deps {
365 if visited.insert(dep.clone()) {
366 queue.push_back(dep);
367 }
368 }
369 }
370 let deleted = self.backend.delete(&k).await?;
372 if deleted {
373 count += 1;
374 }
375 if first {
376 initial_deleted = deleted;
377 first = false;
378 }
379 }
380 Ok((initial_deleted, count))
381 }
382
383 pub async fn exists(&self, key: impl CacheKey) -> Result<bool> {
385 let full_key = self.full_key(&key.full_key());
386 self.backend.exists(&full_key).await
387 }
388
389 pub async fn clear(&self) -> Result<()> {
391 self.backend.clear().await
392 }
393
394 pub async fn stats(&self) -> Result<skp_cache_core::CacheStats> {
396 self.backend.stats().await
397 }
398
399 pub async fn len(&self) -> Result<usize> {
401 self.backend.len().await
402 }
403
404 pub async fn is_empty(&self) -> Result<bool> {
406 self.backend.is_empty().await
407 }
408
409 fn deserialize_entry<T>(&self, entry: CacheEntry<Vec<u8>>) -> Result<CacheEntry<T>>
411 where
412 T: serde::de::DeserializeOwned,
413 {
414 let deserialize_start = Instant::now();
415 let value: T = self.serializer.deserialize(&entry.value)?;
416 self.metrics
417 .record_latency(CacheOperation::Deserialize, deserialize_start.elapsed());
418
419 Ok(CacheEntry {
420 value,
421 created_at: entry.created_at,
422 last_accessed: entry.last_accessed,
423 access_count: entry.access_count,
424 ttl: entry.ttl,
425 stale_while_revalidate: entry.stale_while_revalidate,
426 tags: entry.tags,
427 dependencies: entry.dependencies,
428 cost: entry.cost,
429 size: entry.size,
430 etag: entry.etag,
431 version: entry.version,
432 })
433 }
434}
435
436impl<B, S, M> Clone for CacheManager<B, S, M>
437where
438 B: CacheBackend + DependencyBackend,
439 S: Serializer,
440 M: CacheMetrics,
441{
442 fn clone(&self) -> Self {
443 Self {
444 backend: self.backend.clone(),
445 serializer: self.serializer.clone(),
446 metrics: self.metrics.clone(),
447 config: self.config.clone(),
448 coalescer: self.coalescer.clone(),
449 }
450 }
451}
452
453impl<B, S, M> CacheManager<B, S, M>
455where
456 B: CacheBackend + DependencyBackend + TaggableBackend,
457 S: Serializer,
458 M: CacheMetrics,
459{
460 pub async fn delete_by_tag(&self, tag: &str) -> Result<u64> {
462 let start = Instant::now();
463 let count = self.backend.delete_by_tag(tag).await?;
464 self.metrics
465 .record_latency(CacheOperation::Invalidate, start.elapsed());
466 Ok(count)
467 }
468
469 pub async fn get_keys_by_tag(&self, tag: &str) -> Result<Vec<String>> {
471 self.backend.get_by_tag(tag).await
472 }
473}