Skip to main content

do_memory_storage_redb/cache/adaptive/
mod.rs

1//! Adaptive LRU cache with intelligent TTL adjustment
2//!
3//! This module provides an intelligent caching layer that dynamically adjusts
4//! TTL (Time-To-Live) based on access patterns:
5//! - "Hot" items (frequently accessed) get extended TTL
6//! - "Cold" items (rarely accessed) get reduced TTL
7//! - Min/max TTL bounds prevent extreme values
8//! - Exponential backoff smooths TTL adjustments
9//!
10//! ## Algorithm
11//!
12//! 1. Track access count per entry in a sliding window
13//! 2. On each access, update TTL based on access frequency:
14//!    - If access_count > hot_threshold: Increase TTL (max: max_ttl)
15//!    - If access_count < cold_threshold: Decrease TTL (min: min_ttl)
16//! 3. Use exponential backoff: new_ttl = current_ttl * (1 ± adaptation_rate)
17
18mod entry;
19mod state;
20mod types;
21
22pub use types::{AdaptiveCacheConfig, AdaptiveCacheMetrics};
23
24use self::entry::AdaptiveCacheEntry;
25use self::state::AdaptiveCacheState;
26use std::sync::Arc;
27use std::sync::atomic::Ordering;
28use std::time::{Duration, Instant};
29use tokio::sync::RwLock;
30use tokio::task::JoinHandle;
31use tokio::time::{Duration as TokioDuration, interval};
32use tracing::{debug, info};
33use uuid::Uuid;
34
35/// Adaptive LRU cache with intelligent TTL adjustment
36///
37/// This cache monitors access patterns and adjusts TTL dynamically:
38/// - Frequently accessed items get longer TTL
39/// - Rarely accessed items get shorter TTL
40/// - Reduces memory waste on cold items
41/// - Improves hit rate for hot items
42///
43/// ## Thread Safety
44///
45/// This cache is designed for single-threaded async access. All public methods
46/// acquire the write lock internally. For higher concurrency, consider using
47/// fine-grained locking strategies.
48pub struct AdaptiveCache<V: Clone + Send + Sync + 'static> {
49    config: AdaptiveCacheConfig,
50    state: Arc<RwLock<AdaptiveCacheState<V>>>,
51    cleanup_task: Option<JoinHandle<()>>,
52}
53
54impl<V: Clone + Send + Sync + 'static> AdaptiveCache<V> {
55    /// Create a new adaptive cache with default configuration
56    pub fn new(config: AdaptiveCacheConfig) -> Self {
57        let state = Arc::new(RwLock::new(AdaptiveCacheState::<V>::new()));
58
59        let cleanup_task = if config.enable_background_cleanup && config.cleanup_interval_secs > 0 {
60            Some(Self::start_cleanup_task(
61                Arc::clone(&state),
62                config.cleanup_interval_secs,
63            ))
64        } else {
65            None
66        };
67
68        info!(
69            "Initialized adaptive cache: default_ttl={}s, min_ttl={}s, max_ttl={}s, hot={}, cold={}",
70            config.default_ttl.as_secs(),
71            config.min_ttl.as_secs(),
72            config.max_ttl.as_secs(),
73            config.hot_threshold,
74            config.cold_threshold
75        );
76
77        Self {
78            config,
79            state,
80            cleanup_task,
81        }
82    }
83
84    /// Record a cache access (hit or miss)
85    ///
86    /// # Arguments
87    ///
88    /// * `id` - Unique identifier for the cache entry
89    /// * `hit` - Whether this was a cache hit (true) or miss (false)
90    /// * `value` - Value to store (only used on cache miss)
91    ///
92    /// # Returns
93    ///
94    /// `true` if the entry was found and is valid, `false` otherwise
95    pub async fn record_access(&self, id: Uuid, hit: bool, value: Option<V>) -> bool {
96        let now = Instant::now();
97        let mut state = self.state.write().await;
98
99        if hit {
100            // Cache hit: update access time and move to back of LRU queue
101            if let Some(entry) = state.entries.get_mut(&id) {
102                // Check if expired
103                if entry.is_expired(now) {
104                    debug!("Cache entry expired on access: {}", id);
105                    state.metrics.base.expirations += 1;
106                    state.metrics.base.misses += 1;
107
108                    // Remove expired entry
109                    state.remove_entry(&id);
110                    state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
111                    return false;
112                }
113
114                // Record access and update TTL
115                entry.record_access(now, &self.config);
116
117                // Move to back of LRU queue (most recently used)
118                state.lru_queue.retain(|&qid| qid != id);
119                state.lru_queue.push_back(id);
120
121                state.metrics.base.hits += 1;
122                state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
123                true
124            } else {
125                // Entry not found - treat as miss
126                state.metrics.base.misses += 1;
127                state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
128                false
129            }
130        } else {
131            // Cache miss: add new entry
132            state.metrics.base.misses += 1;
133
134            if let Some(v) = value {
135                // Check if we need to evict
136                if state.entries.len() >= self.config.max_size {
137                    // Evict oldest entry (front of queue)
138                    if let Some(oldest_id) = state.lru_queue.pop_front() {
139                        state.entries.remove(&oldest_id);
140                        state.metrics.base.evictions += 1;
141                        debug!("Evicted LRU entry: {}", oldest_id);
142                    }
143                }
144
145                // Add new entry
146                let entry = AdaptiveCacheEntry::new(v, self.config.default_ttl);
147                state.entries.insert(id, entry);
148                state.lru_queue.push_back(id);
149            }
150
151            state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
152            false
153        }
154    }
155
156    /// Get the current value for an entry without recording access
157    pub async fn get(&self, id: Uuid) -> Option<V> {
158        let state = self.state.read().await;
159        state.entries.get(&id).map(|entry| entry.value.clone())
160    }
161
162    /// Get the current value and record access
163    pub async fn get_and_record(&self, id: Uuid) -> Option<V> {
164        let now = Instant::now();
165        let mut state = self.state.write().await;
166
167        if let Some(entry) = state.entries.get_mut(&id) {
168            if !entry.is_expired(now) {
169                // Clone value first before any further borrows
170                let value = entry.value.clone();
171                entry.record_access(now, &self.config);
172                state.metrics.base.hits += 1;
173                state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
174                return Some(value);
175            }
176        }
177
178        state.metrics.base.misses += 1;
179        state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
180        None
181    }
182
183    /// Remove an entry from the cache
184    pub async fn remove(&self, id: Uuid) {
185        let mut state = self.state.write().await;
186        state.remove_entry(&id);
187        state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
188    }
189
190    /// Check if an entry exists and is not expired
191    pub async fn contains(&self, id: Uuid) -> bool {
192        let now = Instant::now();
193        let state = self.state.read().await;
194        if let Some(entry) = state.entries.get(&id) {
195            !entry.is_expired(now)
196        } else {
197            false
198        }
199    }
200
201    /// Get access count for an entry
202    pub async fn access_count(&self, id: Uuid) -> Option<usize> {
203        let state = self.state.read().await;
204        state.entries.get(&id).map(|entry| entry.access_count())
205    }
206
207    /// Get current TTL for an entry
208    pub async fn ttl(&self, id: Uuid) -> Option<Duration> {
209        let state = self.state.read().await;
210        state.entries.get(&id).map(|entry| entry.ttl())
211    }
212
213    /// Get current cache metrics
214    pub async fn get_metrics(&self) -> AdaptiveCacheMetrics {
215        let state = self.state.read().await;
216        state.metrics.clone()
217    }
218
219    /// Clear all entries from cache
220    pub async fn clear(&self) {
221        let mut state = self.state.write().await;
222        state.clear();
223    }
224
225    /// Manually cleanup expired entries
226    pub async fn cleanup_expired(&self) -> usize {
227        let now = Instant::now();
228        let mut state = self.state.write().await;
229        let mut expired_ids = Vec::new();
230
231        // Find expired entries
232        for (id, entry) in &state.entries {
233            let ttl_secs = entry.ttl_seconds.load(Ordering::SeqCst);
234            let created_at = entry.created_at;
235            let expires_at = created_at + Duration::from_secs(ttl_secs);
236            let is_expired = now >= expires_at;
237            if is_expired {
238                expired_ids.push(*id);
239            }
240        }
241
242        // Remove them
243        let count = expired_ids.len();
244        for id in expired_ids {
245            state.remove_entry(&id);
246            state.metrics.base.expirations += 1;
247        }
248
249        state.update_metrics(self.config.hot_threshold, self.config.cold_threshold);
250
251        if count > 0 {
252            debug!("Cleaned up {} expired cache entries", count);
253        }
254
255        count
256    }
257
258    /// Get the number of hot items
259    pub async fn hot_count(&self) -> usize {
260        let state = self.state.read().await;
261        state.metrics.hot_item_count
262    }
263
264    /// Get the number of cold items
265    pub async fn cold_count(&self) -> usize {
266        let state = self.state.read().await;
267        state.metrics.cold_item_count
268    }
269
270    /// Get cache size (number of entries)
271    pub async fn len(&self) -> usize {
272        let state = self.state.read().await;
273        state.entries.len()
274    }
275
276    /// Check if cache is empty
277    pub async fn is_empty(&self) -> bool {
278        self.len().await == 0
279    }
280
281    /// Start background cleanup task
282    fn start_cleanup_task(
283        state: Arc<RwLock<AdaptiveCacheState<V>>>,
284        interval_secs: u64,
285    ) -> JoinHandle<()> {
286        tokio::spawn(async move {
287            let mut ticker = interval(TokioDuration::from_secs(interval_secs));
288            loop {
289                ticker.tick().await;
290
291                let now = Instant::now();
292                let mut state_guard = state.write().await;
293                let mut expired_ids = Vec::new();
294
295                // Find expired entries
296                for (id, entry) in &state_guard.entries {
297                    let ttl_secs = entry.ttl_seconds.load(Ordering::SeqCst);
298                    let expires_at = entry.created_at + Duration::from_secs(ttl_secs);
299                    if now >= expires_at {
300                        expired_ids.push(*id);
301                    }
302                }
303
304                // Remove them
305                let count = expired_ids.len();
306                for id in expired_ids {
307                    state_guard.remove_entry(&id);
308                    state_guard.metrics.base.expirations += 1;
309                }
310
311                state_guard.update_metrics(10, 2); // Use defaults for metrics
312                drop(state_guard);
313
314                if count > 0 {
315                    debug!("Background cleanup removed {} expired entries", count);
316                }
317            }
318        })
319    }
320
321    /// Stop the background cleanup task
322    pub fn stop_cleanup(&mut self) {
323        if let Some(task) = self.cleanup_task.take() {
324            task.abort();
325        }
326    }
327}
328
329impl<V: Clone + Send + Sync + 'static> Drop for AdaptiveCache<V> {
330    fn drop(&mut self) {
331        self.stop_cleanup();
332    }
333}
334
335/// Conversion from AdaptiveCacheConfig to standard CacheConfig
336impl From<AdaptiveCacheConfig> for super::super::CacheConfig {
337    fn from(config: AdaptiveCacheConfig) -> Self {
338        Self {
339            max_size: 1000,
340            default_ttl_secs: config.default_ttl.as_secs(),
341            cleanup_interval_secs: config.cleanup_interval_secs,
342            enable_background_cleanup: config.enable_background_cleanup,
343        }
344    }
345}