do_memory_storage_turso/cache/
adaptive_ttl.rs1use super::ttl_config::{TTLConfig, TTLConfigError};
33#[path = "adaptive_ttl_stats.rs"]
34mod stats;
35pub use stats::{CacheStats, CacheStatsSnapshot};
36use std::collections::HashMap;
37use std::hash::Hash;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::RwLock;
41use tokio::task::JoinHandle;
42use tokio::time::{Duration as TokioDuration, interval};
43use tracing::{debug, info, trace};
44
45#[derive(Debug, Clone)]
47pub struct CacheEntry<V> {
48 pub value: V,
50 pub created_at: Instant,
52 pub access_count: u64,
54 pub last_accessed: Instant,
56 pub current_ttl: Duration,
58 access_history: Vec<Instant>,
60}
61
62impl<V> CacheEntry<V> {
63 pub fn new(value: V, initial_ttl: Duration) -> Self {
65 let now = Instant::now();
66 Self {
67 value,
68 created_at: now,
69 access_count: 0,
70 last_accessed: now,
71 current_ttl: initial_ttl,
72 access_history: Vec::with_capacity(20),
73 }
74 }
75
76 pub fn record_access(&mut self, config: &TTLConfig) {
78 let now = Instant::now();
79 self.access_count += 1;
80 self.last_accessed = now;
81
82 self.access_history.push(now);
84
85 let window_start = now - Duration::from_secs(config.access_window_secs);
87 self.access_history.retain(|&t| t >= window_start);
88
89 if config.enable_adaptive_ttl {
91 let window_accesses = self.access_history.len() as u64;
92 self.current_ttl = config.calculate_ttl(self.current_ttl, window_accesses);
93 }
94 }
95
96 pub fn is_expired(&self) -> bool {
98 Instant::now().duration_since(self.created_at) > self.current_ttl
99 }
100
101 pub fn remaining_ttl(&self) -> Duration {
103 let elapsed = Instant::now().duration_since(self.created_at);
104 self.current_ttl.saturating_sub(elapsed)
105 }
106
107 pub fn access_frequency(&self, window_secs: u64) -> f64 {
109 if self.access_history.is_empty() {
110 return 0.0;
111 }
112
113 let window_duration = Duration::from_secs(window_secs);
114 let actual_window = Instant::now()
115 .duration_since(self.created_at)
116 .min(window_duration);
117
118 if actual_window.as_secs() == 0 {
119 return self.access_history.len() as f64;
120 }
121
122 let accesses = self.access_history.len() as f64;
123 let minutes = actual_window.as_secs_f64() / 60.0;
124 accesses / minutes
125 }
126
127 pub fn reset(&mut self, value: V, ttl: Duration) {
129 let now = Instant::now();
130 self.value = value;
131 self.created_at = now;
132 self.access_count = 0;
133 self.last_accessed = now;
134 self.current_ttl = ttl;
135 self.access_history.clear();
136 }
137}
138
139pub struct AdaptiveTTLCache<K, V> {
146 entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
148 config: TTLConfig,
150 stats: Arc<CacheStats>,
152 cleanup_task: Option<JoinHandle<()>>,
154}
155
156impl<K, V> AdaptiveTTLCache<K, V>
157where
158 K: Eq + Hash + Clone + Send + Sync + 'static,
159 V: Clone + Send + Sync + 'static,
160{
161 pub fn new(config: TTLConfig) -> Result<Self, TTLConfigError> {
167 config.validate()?;
168
169 let entries = Arc::new(RwLock::new(HashMap::new()));
170 let stats = Arc::new(CacheStats::new());
171
172 let cleanup_task = if config.enable_background_cleanup {
174 Some(Self::start_cleanup_task(
175 Arc::clone(&entries),
176 Arc::clone(&stats),
177 config.cleanup_interval,
178 config.max_entries,
179 ))
180 } else {
181 None
182 };
183
184 info!(
185 "AdaptiveTTLCache initialized: max_entries={}, base_ttl={:?}, cleanup_interval={:?}",
186 config.max_entries, config.base_ttl, config.cleanup_interval
187 );
188
189 Ok(Self {
190 entries,
191 config,
192 stats,
193 cleanup_task,
194 })
195 }
196
197 pub fn default_config() -> Result<Self, TTLConfigError> {
199 Self::new(TTLConfig::default())
200 }
201
202 pub async fn get(&self, key: &K) -> Option<V> {
207 let mut entries = self.entries.write().await;
208
209 if let Some(entry) = entries.get_mut(key) {
210 if entry.is_expired() {
211 trace!("Entry expired for key, removing");
213 self.stats.record_ttl_expiration();
214 entries.remove(key);
215 self.stats.update_entry_count(entries.len());
216 return None;
217 }
218
219 entry.record_access(&self.config);
221 self.stats.record_hit();
222 self.stats.record_ttl_adaptation(entry.current_ttl);
223
224 Some(entry.value.clone())
225 } else {
226 self.stats.record_miss();
227 None
228 }
229 }
230
231 pub async fn insert(&self, key: K, value: V) {
235 let mut entries = self.entries.write().await;
236
237 if entries.len() >= self.config.max_entries && !entries.contains_key(&key) {
239 self.evict_oldest(&mut entries).await;
240 }
241
242 let entry = CacheEntry::new(value, self.config.base_ttl);
243 entries.insert(key, entry);
244 self.stats.update_entry_count(entries.len());
245
246 debug!(
247 "Inserted entry, cache size: {}/{}",
248 entries.len(),
249 self.config.max_entries
250 );
251 }
252
253 pub async fn remove(&self, key: &K) -> bool {
257 let mut entries = self.entries.write().await;
258
259 if entries.remove(key).is_some() {
260 self.stats.record_removal();
261 self.stats.update_entry_count(entries.len());
262 true
263 } else {
264 false
265 }
266 }
267
268 pub async fn contains(&self, key: &K) -> bool {
270 let entries = self.entries.read().await;
271
272 if let Some(entry) = entries.get(key) {
273 !entry.is_expired()
274 } else {
275 false
276 }
277 }
278
279 pub async fn ttl(&self, key: &K) -> Option<Duration> {
281 let entries = self.entries.read().await;
282 entries.get(key).map(|e| e.current_ttl)
283 }
284
285 pub async fn remaining_ttl(&self, key: &K) -> Option<Duration> {
287 let entries = self.entries.read().await;
288 entries.get(key).map(|e| e.remaining_ttl())
289 }
290
291 #[cfg(test)]
292 pub async fn force_set_entry_created_at(&self, key: &K, created_at: Instant) {
293 let mut entries = self.entries.write().await;
294 if let Some(entry) = entries.get_mut(key) {
295 entry.created_at = created_at;
296 entry.last_accessed = created_at;
297 entry.access_history.clear();
298 }
299 }
300
301 pub async fn access_count(&self, key: &K) -> Option<u64> {
303 let entries = self.entries.read().await;
304 entries.get(key).map(|e| e.access_count)
305 }
306
307 pub async fn len(&self) -> usize {
309 let entries = self.entries.read().await;
310 entries.len()
311 }
312
313 pub async fn is_empty(&self) -> bool {
315 self.len().await == 0
316 }
317
318 pub async fn clear(&self) {
320 let mut entries = self.entries.write().await;
321 let count = entries.len();
322 entries.clear();
323 self.stats.update_entry_count(0);
324 info!("Cleared {} entries from cache", count);
325 }
326
327 pub fn stats(&self) -> CacheStatsSnapshot {
329 self.stats.snapshot()
330 }
331
332 pub fn config(&self) -> &TTLConfig {
334 &self.config
335 }
336
337 pub async fn cleanup_expired(&self) -> usize {
341 let mut entries = self.entries.write().await;
342 let before_count = entries.len();
343
344 entries.retain(|_key, entry| {
345 if entry.is_expired() {
346 self.stats.record_ttl_expiration();
347 false
348 } else {
349 true
350 }
351 });
352
353 let removed = before_count - entries.len();
354 self.stats.update_entry_count(entries.len());
355 self.stats.record_cleanup();
356
357 if removed > 0 {
358 debug!("Cleaned up {} expired entries", removed);
359 }
360
361 removed
362 }
363
364 pub async fn keys(&self) -> Vec<K> {
366 let entries = self.entries.read().await;
367 entries
368 .iter()
369 .filter(|(_, entry)| !entry.is_expired())
370 .map(|(key, _)| key.clone())
371 .collect()
372 }
373
374 async fn evict_oldest(&self, entries: &mut HashMap<K, CacheEntry<V>>) {
376 if let Some(oldest_key) = entries
377 .iter()
378 .min_by_key(|(_, entry)| entry.last_accessed)
379 .map(|(key, _)| key.clone())
380 {
381 if let Some(entry) = entries.remove(&oldest_key) {
382 let estimated_bytes = std::mem::size_of_val(&entry.value) as u64;
384 self.stats.record_eviction(estimated_bytes);
385 debug!("Evicted oldest entry");
386 }
387 }
388 }
389
390 fn start_cleanup_task(
392 entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
393 stats: Arc<CacheStats>,
394 interval_duration: Duration,
395 _max_entries: usize,
396 ) -> JoinHandle<()> {
397 tokio::spawn(async move {
398 let mut ticker = interval(TokioDuration::from_secs(interval_duration.as_secs()));
399
400 loop {
401 ticker.tick().await;
402
403 let mut entries_guard = entries.write().await;
404 let before_count = entries_guard.len();
405
406 entries_guard.retain(|_key, entry| !entry.is_expired());
408 let expired_count = before_count - entries_guard.len();
409 for _ in 0..expired_count {
410 stats.record_ttl_expiration();
411 }
412
413 let removed = before_count - entries_guard.len();
414 stats.update_entry_count(entries_guard.len());
415 stats.record_cleanup();
416
417 if removed > 0 {
418 debug!("Background cleanup removed {} expired entries", removed);
419 }
420
421 drop(entries_guard);
422 }
423 })
424 }
425
426 pub fn stop_cleanup(&mut self) {
428 if let Some(task) = self.cleanup_task.take() {
429 task.abort();
430 info!("Background cleanup task stopped");
431 }
432 }
433}
434
435impl<K, V> Drop for AdaptiveTTLCache<K, V> {
436 fn drop(&mut self) {
437 if let Some(task) = self.cleanup_task.take() {
438 task.abort();
439 }
440 }
441}
442
443#[cfg(test)]
444#[path = "adaptive_ttl_tests.rs"]
445mod tests;