do_memory_storage_redb/cache/adaptive/
mod.rs1mod entry;
19mod state;
20mod types;
21
22pub use types::{AdaptiveCacheConfig, AdaptiveCacheMetrics};
23
24use self::entry::AdaptiveCacheEntry;
25use self::state::AdaptiveCacheState;
26use std::sync::Arc;
27use std::sync::atomic::Ordering;
28use std::time::{Duration, Instant};
29use tokio::sync::RwLock;
30use tokio::task::JoinHandle;
31use tokio::time::{Duration as TokioDuration, interval};
32use tracing::{debug, info};
33use uuid::Uuid;
34
35pub struct AdaptiveCache<V: Clone + Send + Sync + 'static> {
49 config: AdaptiveCacheConfig,
50 state: Arc<RwLock<AdaptiveCacheState<V>>>,
51 cleanup_task: Option<JoinHandle<()>>,
52}
53
54impl<V: Clone + Send + Sync + 'static> AdaptiveCache<V> {
55 pub fn new(config: AdaptiveCacheConfig) -> Self {
57 let state = Arc::new(RwLock::new(AdaptiveCacheState::<V>::new()));
58
59 let cleanup_task = if config.enable_background_cleanup && config.cleanup_interval_secs > 0 {
60 Some(Self::start_cleanup_task(
61 Arc::clone(&state),
62 config.cleanup_interval_secs,
63 ))
64 } else {
65 None
66 };
67
68 info!(
69 "Initialized adaptive cache: default_ttl={}s, min_ttl={}s, max_ttl={}s, hot={}, cold={}",
70 config.default_ttl.as_secs(),
71 config.min_ttl.as_secs(),
72 config.max_ttl.as_secs(),
73 config.hot_threshold,
74 config.cold_threshold
75 );
76
77 Self {
78 config,
79 state,
80 cleanup_task,
81 }
82 }
83
84 pub async fn record_access(&self, id: Uuid, hit: bool, value: Option<V>) -> bool {
96 let now = Instant::now();
97 let mut state = self.state.write().await;
98
99 if hit {
100 if let Some(entry) = state.entries.get_mut(&id) {
102 if entry.is_expired(now) {
104 debug!("Cache entry expired on access: {}", id);
105 state.metrics.base.expirations += 1;
106 state.metrics.base.misses += 1;
107
108 state.remove_entry(&id);
110 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
111 return false;
112 }
113
114 entry.record_access(now, &self.config);
116
117 state.lru_queue.retain(|&qid| qid != id);
119 state.lru_queue.push_back(id);
120
121 state.metrics.base.hits += 1;
122 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
123 true
124 } else {
125 state.metrics.base.misses += 1;
127 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
128 false
129 }
130 } else {
131 state.metrics.base.misses += 1;
133
134 if let Some(v) = value {
135 if state.entries.len() >= self.config.max_size {
137 if let Some(oldest_id) = state.lru_queue.pop_front() {
139 state.entries.remove(&oldest_id);
140 state.metrics.base.evictions += 1;
141 debug!("Evicted LRU entry: {}", oldest_id);
142 }
143 }
144
145 let entry = AdaptiveCacheEntry::new(v, self.config.default_ttl);
147 state.entries.insert(id, entry);
148 state.lru_queue.push_back(id);
149 }
150
151 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
152 false
153 }
154 }
155
156 pub async fn get(&self, id: Uuid) -> Option<V> {
158 let state = self.state.read().await;
159 state.entries.get(&id).map(|entry| entry.value.clone())
160 }
161
162 pub async fn get_and_record(&self, id: Uuid) -> Option<V> {
164 let now = Instant::now();
165 let mut state = self.state.write().await;
166
167 if let Some(entry) = state.entries.get_mut(&id) {
168 if !entry.is_expired(now) {
169 let value = entry.value.clone();
171 entry.record_access(now, &self.config);
172 state.metrics.base.hits += 1;
173 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
174 return Some(value);
175 }
176 }
177
178 state.metrics.base.misses += 1;
179 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
180 None
181 }
182
183 pub async fn remove(&self, id: Uuid) {
185 let mut state = self.state.write().await;
186 state.remove_entry(&id);
187 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
188 }
189
190 pub async fn contains(&self, id: Uuid) -> bool {
192 let now = Instant::now();
193 let state = self.state.read().await;
194 if let Some(entry) = state.entries.get(&id) {
195 !entry.is_expired(now)
196 } else {
197 false
198 }
199 }
200
201 pub async fn access_count(&self, id: Uuid) -> Option<usize> {
203 let state = self.state.read().await;
204 state.entries.get(&id).map(|entry| entry.access_count())
205 }
206
207 pub async fn ttl(&self, id: Uuid) -> Option<Duration> {
209 let state = self.state.read().await;
210 state.entries.get(&id).map(|entry| entry.ttl())
211 }
212
213 pub async fn get_metrics(&self) -> AdaptiveCacheMetrics {
215 let state = self.state.read().await;
216 state.metrics.clone()
217 }
218
219 pub async fn clear(&self) {
221 let mut state = self.state.write().await;
222 state.clear();
223 }
224
225 pub async fn cleanup_expired(&self) -> usize {
227 let now = Instant::now();
228 let mut state = self.state.write().await;
229 let mut expired_ids = Vec::new();
230
231 for (id, entry) in &state.entries {
233 let ttl_secs = entry.ttl_seconds.load(Ordering::SeqCst);
234 let created_at = entry.created_at;
235 let expires_at = created_at + Duration::from_secs(ttl_secs);
236 let is_expired = now >= expires_at;
237 if is_expired {
238 expired_ids.push(*id);
239 }
240 }
241
242 let count = expired_ids.len();
244 for id in expired_ids {
245 state.remove_entry(&id);
246 state.metrics.base.expirations += 1;
247 }
248
249 state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
250
251 if count > 0 {
252 debug!("Cleaned up {} expired cache entries", count);
253 }
254
255 count
256 }
257
258 pub async fn hot_count(&self) -> usize {
260 let state = self.state.read().await;
261 state.metrics.hot_item_count
262 }
263
264 pub async fn cold_count(&self) -> usize {
266 let state = self.state.read().await;
267 state.metrics.cold_item_count
268 }
269
270 pub async fn len(&self) -> usize {
272 let state = self.state.read().await;
273 state.entries.len()
274 }
275
276 pub async fn is_empty(&self) -> bool {
278 self.len().await == 0
279 }
280
281 fn start_cleanup_task(
283 state: Arc<RwLock<AdaptiveCacheState<V>>>,
284 interval_secs: u64,
285 ) -> JoinHandle<()> {
286 tokio::spawn(async move {
287 let mut ticker = interval(TokioDuration::from_secs(interval_secs));
288 loop {
289 ticker.tick().await;
290
291 let now = Instant::now();
292 let mut state_guard = state.write().await;
293 let mut expired_ids = Vec::new();
294
295 for (id, entry) in &state_guard.entries {
297 let ttl_secs = entry.ttl_seconds.load(Ordering::SeqCst);
298 let expires_at = entry.created_at + Duration::from_secs(ttl_secs);
299 if now >= expires_at {
300 expired_ids.push(*id);
301 }
302 }
303
304 let count = expired_ids.len();
306 for id in expired_ids {
307 state_guard.remove_entry(&id);
308 state_guard.metrics.base.expirations += 1;
309 }
310
311 state_guard.update_metrics(10, 2); drop(state_guard);
313
314 if count > 0 {
315 debug!("Background cleanup removed {} expired entries", count);
316 }
317 }
318 })
319 }
320
321 pub fn stop_cleanup(&mut self) {
323 if let Some(task) = self.cleanup_task.take() {
324 task.abort();
325 }
326 }
327}
328
329impl<V: Clone + Send + Sync + 'static> Drop for AdaptiveCache<V> {
330 fn drop(&mut self) {
331 self.stop_cleanup();
332 }
333}
334
335impl From<AdaptiveCacheConfig> for super::super::CacheConfig {
337 fn from(config: AdaptiveCacheConfig) -> Self {
338 Self {
339 max_size: 1000,
340 default_ttl_secs: config.default_ttl.as_secs(),
341 cleanup_interval_secs: config.cleanup_interval_secs,
342 enable_background_cleanup: config.enable_background_cleanup,
343 }
344 }
345}