Skip to main content

do_memory_storage_turso/cache/
adaptive_ttl.rs

1//! Adaptive TTL Cache Implementation
2//!
3//! This module provides a generic adaptive TTL cache that:
4//! - Adjusts TTL based on access patterns (hot items get longer TTL, cold items get shorter)
5//! - Performs background cleanup of expired entries
6//! - Tracks statistics (hit rate, avg TTL, evictions)
7//! - Provides thread-safe operations
8//!
9//! ## Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────┐
13//! │                    AdaptiveTTLCache<K, V>                   │
14//! ├─────────────────────────────────────────────────────────────┤
15//! │  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────┐  │
16//! │  │   entries   │  │    stats    │  │   cleanup_task      │  │
17//! │  │  HashMap    │  │ CacheStats  │  │   (background)      │  │
18//! │  └─────────────┘  └─────────────┘  └─────────────────────┘  │
19//! └─────────────────────────────────────────────────────────────┘
20//!                              │
21//!                    ┌─────────┴─────────┐
22//!                    ▼                   ▼
23//!           ┌────────────────┐  ┌────────────────┐
24//!           │  CacheEntry<V> │  │   TTLConfig    │
25//!           │ - value: V     │  │ - base_ttl     │
26//!           │ - created_at   │  │ - min/max_ttl  │
27//!           │ - access_count │  │ - thresholds   │
28//!           │ - last_access  │  │ - adaptation   │
29//!           └────────────────┘  └────────────────┘
30//! ```
31
32use super::ttl_config::{TTLConfig, TTLConfigError};
33#[path = "adaptive_ttl_stats.rs"]
34mod stats;
35pub use stats::{CacheStats, CacheStatsSnapshot};
36use std::collections::HashMap;
37use std::hash::Hash;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::RwLock;
41use tokio::task::JoinHandle;
42use tokio::time::{Duration as TokioDuration, interval};
43use tracing::{debug, info, trace};
44
45/// A cache entry with metadata for adaptive TTL management
46#[derive(Debug, Clone)]
47pub struct CacheEntry<V> {
48    /// The cached value
49    pub value: V,
50    /// When the entry was created
51    pub created_at: Instant,
52    /// Number of times this entry has been accessed
53    pub access_count: u64,
54    /// When the entry was last accessed
55    pub last_accessed: Instant,
56    /// Current TTL for this entry (adaptive)
57    pub current_ttl: Duration,
58    /// Access timestamps for pattern analysis (sliding window)
59    access_history: Vec<Instant>,
60}
61
62impl<V> CacheEntry<V> {
63    /// Create a new cache entry with the given value and initial TTL
64    pub fn new(value: V, initial_ttl: Duration) -> Self {
65        let now = Instant::now();
66        Self {
67            value,
68            created_at: now,
69            access_count: 0,
70            last_accessed: now,
71            current_ttl: initial_ttl,
72            access_history: Vec::with_capacity(20),
73        }
74    }
75
76    /// Record an access to this entry
77    pub fn record_access(&mut self, config: &TTLConfig) {
78        let now = Instant::now();
79        self.access_count += 1;
80        self.last_accessed = now;
81
82        // Add to access history
83        self.access_history.push(now);
84
85        // Trim access history to window size
86        let window_start = now - Duration::from_secs(config.access_window_secs);
87        self.access_history.retain(|&t| t >= window_start);
88
89        // Update TTL based on access pattern
90        if config.enable_adaptive_ttl {
91            let window_accesses = self.access_history.len() as u64;
92            self.current_ttl = config.calculate_ttl(self.current_ttl, window_accesses);
93        }
94    }
95
96    /// Check if this entry has expired
97    pub fn is_expired(&self) -> bool {
98        Instant::now().duration_since(self.created_at) > self.current_ttl
99    }
100
101    /// Get the remaining TTL for this entry
102    pub fn remaining_ttl(&self) -> Duration {
103        let elapsed = Instant::now().duration_since(self.created_at);
104        self.current_ttl.saturating_sub(elapsed)
105    }
106
107    /// Get the access frequency (accesses per minute) over the window
108    pub fn access_frequency(&self, window_secs: u64) -> f64 {
109        if self.access_history.is_empty() {
110            return 0.0;
111        }
112
113        let window_duration = Duration::from_secs(window_secs);
114        let actual_window = Instant::now()
115            .duration_since(self.created_at)
116            .min(window_duration);
117
118        if actual_window.as_secs() == 0 {
119            return self.access_history.len() as f64;
120        }
121
122        let accesses = self.access_history.len() as f64;
123        let minutes = actual_window.as_secs_f64() / 60.0;
124        accesses / minutes
125    }
126
127    /// Reset the entry with a new value and TTL
128    pub fn reset(&mut self, value: V, ttl: Duration) {
129        let now = Instant::now();
130        self.value = value;
131        self.created_at = now;
132        self.access_count = 0;
133        self.last_accessed = now;
134        self.current_ttl = ttl;
135        self.access_history.clear();
136    }
137}
138
139/// Adaptive TTL cache with generic key and value types
140///
141/// This cache automatically adjusts the TTL of entries based on their access patterns:
142/// - Frequently accessed items (hot) get extended TTL
143/// - Rarely accessed items (cold) get reduced TTL
144/// - Expired entries are cleaned up in the background
145pub struct AdaptiveTTLCache<K, V> {
146    /// Cache entries
147    entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
148    /// Cache configuration
149    config: TTLConfig,
150    /// Cache statistics
151    stats: Arc<CacheStats>,
152    /// Background cleanup task handle
153    cleanup_task: Option<JoinHandle<()>>,
154}
155
156impl<K, V> AdaptiveTTLCache<K, V>
157where
158    K: Eq + Hash + Clone + Send + Sync + 'static,
159    V: Clone + Send + Sync + 'static,
160{
161    /// Create a new adaptive TTL cache with the given configuration
162    ///
163    /// # Errors
164    ///
165    /// Returns `TTLConfigError` if the configuration is invalid
166    pub fn new(config: TTLConfig) -> Result<Self, TTLConfigError> {
167        config.validate()?;
168
169        let entries = Arc::new(RwLock::new(HashMap::new()));
170        let stats = Arc::new(CacheStats::new());
171
172        // Start background cleanup task if enabled
173        let cleanup_task = if config.enable_background_cleanup {
174            Some(Self::start_cleanup_task(
175                Arc::clone(&entries),
176                Arc::clone(&stats),
177                config.cleanup_interval,
178                config.max_entries,
179            ))
180        } else {
181            None
182        };
183
184        info!(
185            "AdaptiveTTLCache initialized: max_entries={}, base_ttl={:?}, cleanup_interval={:?}",
186            config.max_entries, config.base_ttl, config.cleanup_interval
187        );
188
189        Ok(Self {
190            entries,
191            config,
192            stats,
193            cleanup_task,
194        })
195    }
196
197    /// Create a new cache with default configuration
198    pub fn default_config() -> Result<Self, TTLConfigError> {
199        Self::new(TTLConfig::default())
200    }
201
202    /// Get a value from the cache
203    ///
204    /// Returns `Some(value)` if the key exists and hasn't expired,
205    /// `None` otherwise.
206    pub async fn get(&self, key: &K) -> Option<V> {
207        let mut entries = self.entries.write().await;
208
209        if let Some(entry) = entries.get_mut(key) {
210            if entry.is_expired() {
211                // Entry has expired, remove it
212                trace!("Entry expired for key, removing");
213                self.stats.record_ttl_expiration();
214                entries.remove(key);
215                self.stats.update_entry_count(entries.len());
216                return None;
217            }
218
219            // Record the access and update TTL
220            entry.record_access(&self.config);
221            self.stats.record_hit();
222            self.stats.record_ttl_adaptation(entry.current_ttl);
223
224            Some(entry.value.clone())
225        } else {
226            self.stats.record_miss();
227            None
228        }
229    }
230
231    /// Insert a value into the cache
232    ///
233    /// If the key already exists, the value is updated and the entry is reset.
234    pub async fn insert(&self, key: K, value: V) {
235        let mut entries = self.entries.write().await;
236
237        // Check if we need to evict entries
238        if entries.len() >= self.config.max_entries && !entries.contains_key(&key) {
239            self.evict_oldest(&mut entries).await;
240        }
241
242        let entry = CacheEntry::new(value, self.config.base_ttl);
243        entries.insert(key, entry);
244        self.stats.update_entry_count(entries.len());
245
246        debug!(
247            "Inserted entry, cache size: {}/{}",
248            entries.len(),
249            self.config.max_entries
250        );
251    }
252
253    /// Remove a value from the cache
254    ///
255    /// Returns `true` if the key existed and was removed, `false` otherwise.
256    pub async fn remove(&self, key: &K) -> bool {
257        let mut entries = self.entries.write().await;
258
259        if entries.remove(key).is_some() {
260            self.stats.record_removal();
261            self.stats.update_entry_count(entries.len());
262            true
263        } else {
264            false
265        }
266    }
267
268    /// Check if a key exists in the cache (and hasn't expired)
269    pub async fn contains(&self, key: &K) -> bool {
270        let entries = self.entries.read().await;
271
272        if let Some(entry) = entries.get(key) {
273            !entry.is_expired()
274        } else {
275            false
276        }
277    }
278
279    /// Get the current TTL for an entry
280    pub async fn ttl(&self, key: &K) -> Option<Duration> {
281        let entries = self.entries.read().await;
282        entries.get(key).map(|e| e.current_ttl)
283    }
284
285    /// Get the remaining TTL for an entry
286    pub async fn remaining_ttl(&self, key: &K) -> Option<Duration> {
287        let entries = self.entries.read().await;
288        entries.get(key).map(|e| e.remaining_ttl())
289    }
290
291    #[cfg(test)]
292    pub async fn force_set_entry_created_at(&self, key: &K, created_at: Instant) {
293        let mut entries = self.entries.write().await;
294        if let Some(entry) = entries.get_mut(key) {
295            entry.created_at = created_at;
296            entry.last_accessed = created_at;
297            entry.access_history.clear();
298        }
299    }
300
301    /// Get the access count for an entry
302    pub async fn access_count(&self, key: &K) -> Option<u64> {
303        let entries = self.entries.read().await;
304        entries.get(key).map(|e| e.access_count)
305    }
306
307    /// Get the number of entries in the cache
308    pub async fn len(&self) -> usize {
309        let entries = self.entries.read().await;
310        entries.len()
311    }
312
313    /// Check if the cache is empty
314    pub async fn is_empty(&self) -> bool {
315        self.len().await == 0
316    }
317
318    /// Clear all entries from the cache
319    pub async fn clear(&self) {
320        let mut entries = self.entries.write().await;
321        let count = entries.len();
322        entries.clear();
323        self.stats.update_entry_count(0);
324        info!("Cleared {} entries from cache", count);
325    }
326
327    /// Get a snapshot of the current statistics
328    pub fn stats(&self) -> CacheStatsSnapshot {
329        self.stats.snapshot()
330    }
331
332    /// Get the cache configuration
333    pub fn config(&self) -> &TTLConfig {
334        &self.config
335    }
336
337    /// Manually trigger cleanup of expired entries
338    ///
339    /// Returns the number of entries removed.
340    pub async fn cleanup_expired(&self) -> usize {
341        let mut entries = self.entries.write().await;
342        let before_count = entries.len();
343
344        entries.retain(|_key, entry| {
345            if entry.is_expired() {
346                self.stats.record_ttl_expiration();
347                false
348            } else {
349                true
350            }
351        });
352
353        let removed = before_count - entries.len();
354        self.stats.update_entry_count(entries.len());
355        self.stats.record_cleanup();
356
357        if removed > 0 {
358            debug!("Cleaned up {} expired entries", removed);
359        }
360
361        removed
362    }
363
364    /// Get all keys in the cache (that haven't expired)
365    pub async fn keys(&self) -> Vec<K> {
366        let entries = self.entries.read().await;
367        entries
368            .iter()
369            .filter(|(_, entry)| !entry.is_expired())
370            .map(|(key, _)| key.clone())
371            .collect()
372    }
373
374    /// Evict the oldest entry (LRU eviction)
375    async fn evict_oldest(&self, entries: &mut HashMap<K, CacheEntry<V>>) {
376        if let Some(oldest_key) = entries
377            .iter()
378            .min_by_key(|(_, entry)| entry.last_accessed)
379            .map(|(key, _)| key.clone())
380        {
381            if let Some(entry) = entries.remove(&oldest_key) {
382                // Estimate bytes (simplified - just use size of value)
383                let estimated_bytes = std::mem::size_of_val(&entry.value) as u64;
384                self.stats.record_eviction(estimated_bytes);
385                debug!("Evicted oldest entry");
386            }
387        }
388    }
389
390    /// Start the background cleanup task
391    fn start_cleanup_task(
392        entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
393        stats: Arc<CacheStats>,
394        interval_duration: Duration,
395        _max_entries: usize,
396    ) -> JoinHandle<()> {
397        tokio::spawn(async move {
398            let mut ticker = interval(TokioDuration::from_secs(interval_duration.as_secs()));
399
400            loop {
401                ticker.tick().await;
402
403                let mut entries_guard = entries.write().await;
404                let before_count = entries_guard.len();
405
406                // Remove expired entries
407                entries_guard.retain(|_key, entry| !entry.is_expired());
408                let expired_count = before_count - entries_guard.len();
409                for _ in 0..expired_count {
410                    stats.record_ttl_expiration();
411                }
412
413                let removed = before_count - entries_guard.len();
414                stats.update_entry_count(entries_guard.len());
415                stats.record_cleanup();
416
417                if removed > 0 {
418                    debug!("Background cleanup removed {} expired entries", removed);
419                }
420
421                drop(entries_guard);
422            }
423        })
424    }
425
426    /// Stop the background cleanup task
427    pub fn stop_cleanup(&mut self) {
428        if let Some(task) = self.cleanup_task.take() {
429            task.abort();
430            info!("Background cleanup task stopped");
431        }
432    }
433}
434
435impl<K, V> Drop for AdaptiveTTLCache<K, V> {
436    fn drop(&mut self) {
437        if let Some(task) = self.cleanup_task.take() {
438            task.abort();
439        }
440    }
441}
442
443#[cfg(test)]
444#[path = "adaptive_ttl_tests.rs"]
445mod tests;