Skip to main content

do_memory_storage_turso/cache/
query_cache.rs

1//! Advanced Query Result Caching with Smart Invalidation
2//!
3//! Provides sophisticated caching for query results with:
4//! - Dependency tracking for smart invalidation
5//! - Configurable TTL per query type
6//! - LRU eviction with size limits
7//! - Cache hit/miss statistics
8//! - Background refresh for hot queries
9//! - Thread-safe concurrent access
10
11use parking_lot::RwLock;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16use tracing::{debug, info, trace};
17
18#[path = "query_cache_types.rs"]
19mod types;
20pub use types::{
21    AdvancedCacheStats, AdvancedQueryCacheConfig, CachedResult, InvalidationMessage, QueryKey,
22    QueryType, TableDependency,
23};
24
25/// Advanced query cache with smart invalidation
26pub struct AdvancedQueryCache {
27    /// Cached results
28    results: Arc<RwLock<HashMap<QueryKey, CachedResult>>>,
29    /// Reverse index: table -> query keys that depend on it
30    dependency_index: Arc<RwLock<HashMap<TableDependency, HashSet<QueryKey>>>>,
31    /// LRU queue for eviction (front = oldest)
32    lru_queue: Arc<RwLock<VecDeque<QueryKey>>>,
33    /// Configuration
34    config: AdvancedQueryCacheConfig,
35    /// Statistics
36    stats: Arc<RwLock<AdvancedCacheStats>>,
37    /// Invalidation channel sender
38    invalidation_tx: mpsc::UnboundedSender<InvalidationMessage>,
39    /// Hot queries set (for background refresh)
40    hot_queries: Arc<RwLock<HashSet<QueryKey>>>,
41}
42
43impl AdvancedQueryCache {
44    /// Create a new advanced query cache
45    pub fn new(
46        config: AdvancedQueryCacheConfig,
47    ) -> (Self, mpsc::UnboundedReceiver<InvalidationMessage>) {
48        let (invalidation_tx, invalidation_rx) = mpsc::unbounded_channel();
49
50        let cache = Self {
51            results: Arc::new(RwLock::new(HashMap::new())),
52            dependency_index: Arc::new(RwLock::new(HashMap::new())),
53            lru_queue: Arc::new(RwLock::new(VecDeque::new())),
54            config,
55            stats: Arc::new(RwLock::new(AdvancedCacheStats::default())),
56            invalidation_tx,
57            hot_queries: Arc::new(RwLock::new(HashSet::new())),
58        };
59
60        (cache, invalidation_rx)
61    }
62
63    /// Create with default configuration
64    pub fn new_with_receiver() -> (Self, mpsc::UnboundedReceiver<InvalidationMessage>) {
65        Self::new(AdvancedQueryCacheConfig::default())
66    }
67
68    /// Get cached result
69    pub fn get(&self, key: &QueryKey) -> Option<Vec<u8>> {
70        let results = self.results.read();
71
72        if let Some(result) = results.get(key) {
73            if result.is_expired() {
74                drop(results);
75                self.handle_expired(key);
76                return None;
77            }
78
79            result.record_access();
80
81            // Check if this is becoming a hot query
82            if result.access_count() >= self.config.hot_threshold {
83                let mut hot = self.hot_queries.write();
84                hot.insert(key.clone());
85            }
86
87            // Update LRU queue
88            self.update_lru(key.clone());
89
90            // Update stats
91            self.stats.write().hits += 1;
92
93            trace!("Cache hit for query key: {:?}", key);
94            Some(result.data.clone())
95        } else {
96            self.stats.write().misses += 1;
97            trace!("Cache miss for query key: {:?}", key);
98            None
99        }
100    }
101
102    /// Store result in cache
103    pub fn put(&self, key: QueryKey, data: Vec<u8>, dependencies: Vec<TableDependency>) {
104        let ttl = self.config.ttl_for_type(key.query_type);
105
106        // Evict if at capacity
107        self.evict_if_needed();
108
109        // Build dependency index
110        if self.config.enable_dependency_tracking {
111            let mut index = self.dependency_index.write();
112            for dep in &dependencies {
113                index.entry(dep.clone()).or_default().insert(key.clone());
114            }
115        }
116
117        // Store result
118        let result = CachedResult::new(data, ttl, dependencies, key.query_type);
119
120        let mut results = self.results.write();
121        results.insert(key.clone(), result);
122        drop(results);
123
124        // Update LRU queue
125        self.lru_queue.write().push_back(key);
126
127        // Update stats
128        self.stats.write().current_size = self.results.read().len();
129
130        debug!("Cached query result with TTL: {:?}", ttl);
131    }
132
133    /// Invalidate cache entries by table dependency
134    pub fn invalidate_by_table(&self, table: &TableDependency) {
135        if !self.config.enable_dependency_tracking {
136            return;
137        }
138
139        let keys_to_invalidate: Vec<QueryKey> = {
140            let index = self.dependency_index.read();
141            index
142                .get(table)
143                .map(|keys| keys.iter().cloned().collect())
144                .unwrap_or_default()
145        };
146
147        let mut invalidated = 0;
148        for key in keys_to_invalidate {
149            self.remove_entry(&key);
150            invalidated += 1;
151        }
152
153        if invalidated > 0 {
154            self.stats.write().invalidations += invalidated;
155            info!(
156                "Invalidated {} cache entries for table: {:?}",
157                invalidated, table
158            );
159        }
160    }
161
162    /// Invalidate specific query key
163    pub fn invalidate_key(&self, key: &QueryKey) {
164        self.remove_entry(key);
165    }
166
167    /// Clear all cached results
168    pub fn clear(&self) {
169        self.results.write().clear();
170        self.dependency_index.write().clear();
171        self.lru_queue.write().clear();
172        self.hot_queries.write().clear();
173        self.stats.write().current_size = 0;
174
175        info!("Cleared all query cache entries");
176    }
177
178    /// Get cache statistics
179    pub fn stats(&self) -> AdvancedCacheStats {
180        self.stats.read().clone()
181    }
182
183    /// Get hot queries that need background refresh
184    pub fn get_hot_queries_needing_refresh(&self) -> Vec<QueryKey> {
185        let results = self.results.read();
186        let hot = self.hot_queries.read();
187
188        hot.iter()
189            .filter(|key| {
190                results.get(key).is_some_and(|r| {
191                    r.should_refresh(self.config.hot_threshold, self.config.refresh_interval)
192                })
193            })
194            .cloned()
195            .collect()
196    }
197
198    /// Mark a query as refreshed
199    pub fn mark_refreshed(&self, key: &QueryKey) {
200        let mut results = self.results.write();
201        if let Some(result) = results.get_mut(key) {
202            // Reset creation time to extend TTL
203            result.created_at = Instant::now();
204            self.stats.write().refreshes += 1;
205        }
206    }
207
208    #[cfg(test)]
209    pub(crate) fn force_set_created_at(&self, key: &QueryKey, created_at: Instant) {
210        if let Some(result) = self.results.write().get_mut(key) {
211            result.created_at = created_at;
212            *result.last_accessed.write() = created_at;
213        }
214    }
215
216    /// Get the invalidation sender
217    pub fn invalidation_sender(&self) -> mpsc::UnboundedSender<InvalidationMessage> {
218        self.invalidation_tx.clone()
219    }
220
221    /// Handle expired entry
222    fn handle_expired(&self, key: &QueryKey) {
223        self.remove_entry(key);
224        self.stats.write().expirations += 1;
225        trace!("Removed expired cache entry: {:?}", key);
226    }
227
228    /// Remove a cache entry and clean up dependencies
229    fn remove_entry(&self, key: &QueryKey) {
230        let result = self.results.write().remove(key);
231
232        if let Some(result) = result {
233            self.cleanup_dependency_index(key, &result.dependencies);
234        }
235
236        // Remove from LRU queue
237        self.lru_queue.write().retain(|k| k != key);
238
239        // Remove from hot queries
240        self.hot_queries.write().remove(key);
241
242        // Update stats
243        self.stats.write().current_size = self.results.read().len();
244    }
245
246    /// Clean up dependency index for removed entry
247    fn cleanup_dependency_index(&self, key: &QueryKey, dependencies: &[TableDependency]) {
248        if !self.config.enable_dependency_tracking {
249            return;
250        }
251        let mut index = self.dependency_index.write();
252        for dep in dependencies {
253            if let Some(keys) = index.get_mut(dep) {
254                keys.remove(key);
255                if keys.is_empty() {
256                    index.remove(dep);
257                }
258            }
259        }
260    }
261
262    /// Evict oldest entries if at capacity
263    fn evict_if_needed(&self) {
264        let current_size = self.results.read().len();
265
266        if current_size >= self.config.max_queries {
267            let keys_to_evict: Vec<QueryKey> = {
268                let lru = self.lru_queue.read();
269                lru.iter()
270                    .take(current_size - self.config.max_queries + 1)
271                    .cloned()
272                    .collect()
273            };
274
275            for key in keys_to_evict {
276                self.remove_entry(&key);
277                self.stats.write().evictions += 1;
278                debug!("Evicted LRU cache entry: {:?}", key);
279            }
280        }
281    }
282
283    /// Update LRU queue (move accessed key to back)
284    fn update_lru(&self, key: QueryKey) {
285        let mut lru = self.lru_queue.write();
286        lru.retain(|k| k != &key);
287        lru.push_back(key);
288    }
289
290    /// Clear expired entries
291    pub fn clear_expired(&self) -> usize {
292        let expired_keys: Vec<QueryKey> = {
293            let results = self.results.read();
294            results
295                .iter()
296                .filter(|(_, result)| result.is_expired())
297                .map(|(key, _)| key.clone())
298                .collect()
299        };
300
301        let count = expired_keys.len();
302        for key in expired_keys {
303            self.remove_entry(&key);
304        }
305
306        if count > 0 {
307            self.stats.write().expirations += count as u64;
308            debug!("Cleared {} expired cache entries", count);
309        }
310
311        count
312    }
313
314    /// Get cache size
315    pub fn len(&self) -> usize {
316        self.results.read().len()
317    }
318
319    /// Check if cache is empty
320    pub fn is_empty(&self) -> bool {
321        self.len() == 0
322    }
323}
324
325impl Clone for AdvancedQueryCache {
326    fn clone(&self) -> Self {
327        Self {
328            results: Arc::clone(&self.results),
329            dependency_index: Arc::clone(&self.dependency_index),
330            lru_queue: Arc::clone(&self.lru_queue),
331            config: self.config.clone(),
332            stats: Arc::clone(&self.stats),
333            invalidation_tx: self.invalidation_tx.clone(),
334            hot_queries: Arc::clone(&self.hot_queries),
335        }
336    }
337}
338
339#[cfg(test)]
340#[path = "query_cache_tests.rs"]
341mod tests;