1use async_trait::async_trait;
2use serde::de::DeserializeOwned;
3use serde::Serialize;
4use std::marker::PhantomData;
5use std::sync::Arc;
6
7use skp_cache_core::{
8 CacheBackend, CacheError, CacheKey, CacheMetrics, CacheOptions, CacheResult, DependencyBackend,
9 Result, Serializer,
10};
11
12use crate::CacheManager;
13
14#[async_trait]
16pub trait Loader<K, V>: Send + Sync + 'static {
17 async fn load(&self, key: &K) -> Result<Option<V>>;
19}
20
21pub struct ReadThroughCache<B, S, M, K, V, L>
23where
24 B: CacheBackend + DependencyBackend,
25 S: Serializer,
26 M: CacheMetrics,
27{
28 manager: CacheManager<B, S, M>,
29 loader: Arc<L>,
30 options: CacheOptions,
31 _phantom: PhantomData<(K, V)>,
32}
33
34impl<B, S, M, K, V, L> ReadThroughCache<B, S, M, K, V, L>
35where
36 B: CacheBackend + DependencyBackend,
37 S: Serializer,
38 M: CacheMetrics,
39 K: CacheKey + Clone + Send + Sync + 'static,
40 V: Serialize + DeserializeOwned + Send + Sync + Clone + 'static,
41 L: Loader<K, V>,
42{
43 pub fn new(manager: CacheManager<B, S, M>, loader: L, options: CacheOptions) -> Self {
45 Self {
46 manager,
47 loader: Arc::new(loader),
48 options,
49 _phantom: PhantomData,
50 }
51 }
52
53 pub async fn get(&self, key: K) -> Result<Option<V>> {
55 match self.manager.get::<V>(key.clone()).await? {
57 CacheResult::Hit(entry) => Ok(Some(entry.value)),
58 CacheResult::Stale(entry) => {
59 self.refresh_background(key.clone());
61 Ok(Some(entry.value))
62 }
63 CacheResult::Miss | CacheResult::NegativeHit => {
64 let loader = self.loader.clone();
66 let key_clone = key.clone();
67
68 let result = self.manager.get_or_compute(
69 key,
70 move || async move {
71 loader.load(&key_clone).await?
72 .ok_or_else(|| CacheError::NotFound("Loader returned None".into()))
73 },
74 Some(self.options.clone())
75 ).await;
76
77 match result {
78 Ok(CacheResult::Hit(entry)) => Ok(Some(entry.value)),
79 Ok(CacheResult::Stale(entry)) => Ok(Some(entry.value)),
80 Err(CacheError::NotFound(_)) => Ok(None),
81 Err(e) => Err(e),
82 _ => Ok(None),
83 }
84 }
85 }
86 }
87
88 pub async fn refresh(&self, key: K) -> Result<()> {
90 if let Some(val) = self.loader.load(&key).await? {
91 self.manager.set(key, val, self.options.clone()).await?;
92 }
93 Ok(())
94 }
95
96 fn refresh_background(&self, key: K) {
98 let loader = self.loader.clone();
99 let manager = self.manager.clone();
100 let options = self.options.clone();
101
102 tokio::spawn(async move {
103 if let Ok(Some(val)) = loader.load(&key).await {
104 let _ = manager.set(key, val, options).await;
105 }
106 });
107 }
108}
109
110pub trait CacheManagerReadThroughExt<B, S, M> {
112 fn read_through<K, V, L>(
113 self,
114 loader: L,
115 options: CacheOptions
116 ) -> ReadThroughCache<B, S, M, K, V, L>
117 where
118 B: CacheBackend + DependencyBackend,
119 S: Serializer,
120 M: CacheMetrics,
121 L: Loader<K, V>,
122 K: CacheKey + Clone + Send + Sync + 'static,
124 V: Serialize + DeserializeOwned + Send + Sync + Clone + 'static;
125}
126
127impl<B, S, M> CacheManagerReadThroughExt<B, S, M> for CacheManager<B, S, M>
128where
129 B: CacheBackend + DependencyBackend,
130 S: Serializer,
131 M: CacheMetrics,
132{
133 fn read_through<K, V, L>(
134 self,
135 loader: L,
136 options: CacheOptions
137 ) -> ReadThroughCache<B, S, M, K, V, L>
138 where
139 L: Loader<K, V>,
140 K: CacheKey + Clone + Send + Sync + 'static,
141 V: Serialize + DeserializeOwned + Send + Sync + Clone + 'static,
142 {
143 ReadThroughCache::new(self, loader, options)
144 }
145}